0%

Pigeon中的核心类和作用

ServiceFactory

这个类的作用就是服务的注册发布和get的最外层接口

内部有两个主要的类

1
2
private static ServiceProxy serviceProxy = ServiceProxyLoader.getServiceProxy();
private static PublishPolicy publishPolicy = PublishPolicyLoader.getPublishPolicy();

ServiceProxy用来获取service的,就是包一层proxy吧。

PublishPolicy简单说就是发布service的,但是内部还是借助于ServicePublisher进行操作的。

RegistryManager

这个类的作用其实是为ServicePublisheraddServicezookeeper中和unpublish指定的service提供API支持的。

更通俗点就是为invokerprovider提供了zookeeper的交流的。

但是内部还是使用的registry类进行和zookeeper调用的。

1
2
3
4
5
6
7
8
9
10
11
public interface Registry {
//删除了好多,可以自己去看源码
// for invoker
boolean isSupportNewProtocol(String serviceAddress, String serviceName) throws RegistryException;

// for provider
void setSupportNewProtocol(String serviceAddress, String serviceName, boolean support) throws RegistryException;

// for invoker/provider
RegistryConfig getRegistryConfig(String ip) throws RegistryException;
}

InvokerBootStrap

这个类看名字也知道是为了invoker提供方便的。

看调用也是AbstractServiceProxy这个类调用的最多

主要有两个方法,startupshutdown,也没什么重要的成员变量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public static void startup() {
if (!isStartup) {
synchronized (InvokerBootStrap.class) {
if (!isStartup) {
ServiceInvocationRepository.getInstance().init();
InvokerProcessHandlerFactory.init();
SerializerFactory.init();
LoadBalanceManager.init();
RegionPolicyManager.INSTANCE.init();
Monitor monitor = MonitorLoader.getMonitor();
if (monitor != null) {
monitor.init();
}
isStartup = true;
logger.warn("pigeon client[version:" + VersionUtils.VERSION + "] has been started");
}
}
}
}

其中ServiceInvocationRepositoryinit方法主要是启动了一个InvocationTimeoutListener线程,看样子是用来检测调用超时问题。

InvokerProcessHandlerFactoryinit方法中,就是在serviceproxy中,加入了一条责任链。

1
2
3
4
5
6
7
registerBizProcessFilter(new TraceFilter());
registerBizProcessFilter(new DegradationFilter());
registerBizProcessFilter(new ClusterInvokeFilter());
registerBizProcessFilter(new GatewayInvokeFilter());
registerBizProcessFilter(new ContextPrepareInvokeFilter());
registerBizProcessFilter(new SecurityFilter());
registerBizProcessFilter(new RemoteCallInvokeFilter());

SerializerFactory见下

LoadBalanceManagerinit方法,初始化了pigeon支持的四种负载均衡的算法。

  • RandomLoadBalance
  • AutoawareLoadBalance
  • RoundRobinLoadBalance
  • WeightedAutoawareLoadBalance

RegionPolicyManagerinit方法中,就是对Region的选择算法进行了初始化。

那个monitor的方法,我看好像还没写好。。。。

ProviderBootStrap

初始化

包含了SerializerFactoryProviderProcessHandlerFactory的初始化。

同时对jetty servernetty server两个server进行了初始化和启动。

同时启动了一个监听服务器的线程ShutdownHookListener

如果服务器关闭,那么就会调用一些类的关闭方法

1
2
3
4
//从zookeeper上unpublish掉所有的服务
ServiceFactory.unpublishAllServices();
InvokerBootStrap.shutdown();
ProviderBootStrap.shutdown();

关闭

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public static void shutdown() {
for (Server server : serversMap.values()) {
if (server != null) {
logger.info("start to stop " + server);
try {
unregisterConsoleServer(server.getServerConfig());
server.stop();
} catch (Throwable e) {
}
if (logger.isInfoEnabled()) {
logger.info(server + " has been shutdown");
}
}
}
try {
ProviderProcessHandlerFactory.destroy();
} catch (Throwable e) {
}
}

serverzookeeper上下掉,这个过程调用了RegistryManager的方法。

然后调用了ProviderProcessHandlerFactorydestroy方法。

SerializerFactory

对框架支持的序列化方法进行了注册。

内部维护了两个map

1
2
ConcurrentHashMap<String, Byte> serializerTypes;
ConcurrentHashMap<Byte, Serializer> serializers

这个Serializer类有啥用呢,这个其实还值得我们去研究研究。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public interface Serializer {

Object deserializeRequest(InputStream is) throws SerializationException;

void serializeRequest(OutputStream os, Object obj) throws SerializationException;

Object deserializeResponse(InputStream is) throws SerializationException;

void serializeResponse(OutputStream os, Object obj) throws SerializationException;

Object proxyRequest(InvokerConfig<?> invokerConfig) throws SerializationException;

InvocationResponse newResponse() throws SerializationException;

InvocationRequest newRequest(InvokerContext invokerContext) throws SerializationException;
}

Serializer接口中有七个方法,都是和序列化有关,其中proxyRequest方法请看pigeon的服务注册与发布.

我们进行方法调用,发送请求,获取调用结果。这中间的过程都是需要进行序列化的。

pigeon支持11种序列化方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public enum SerializerType {

INTERNAL_THRIFT((byte) 1, "internalThrift"),

HESSIAN((byte) 2, "hessian"),

JAVA((byte) 3, "java"),

PROTO((byte) 5, "proto"),

HESSIAN1((byte) 6, "hessian1"),

JSON((byte) 7, "json"),

FST((byte) 8, "fst"),

PROTOBUF((byte) 9, "protobuf"),

THRIFT((byte) 10, "thrift"),

PROTOBUF3((byte) 11, "protobuf3");

同时有八个进行序列化操作的实体类。

ProviderProcessHandlerFactory

我们知道在InvokerProcessHandlerFactory中,给invoke的过程加上了一天责任链。

那么这个的init方法就是给Provide的时候,也加上一条责任链。

1
2
3
4
5
6
7
8
9
10
registerBizProcessFilter(new TraceFilter());
if (Constants.MONITOR_ENABLE) {
registerBizProcessFilter(new MonitorProcessFilter());
}
registerBizProcessFilter(new WriteResponseProcessFilter());
registerBizProcessFilter(new ContextTransferProcessFilter());
registerBizProcessFilter(new ExceptionProcessFilter());
registerBizProcessFilter(new SecurityFilter());
registerBizProcessFilter(new GatewayProcessFilter());
registerBizProcessFilter(new BusinessProcessFilter());