ServiceFactory
这个类的作用就是服务的注册发布和get的最外层接口
内部有两个主要的类
1 2
| private static ServiceProxy serviceProxy = ServiceProxyLoader.getServiceProxy(); private static PublishPolicy publishPolicy = PublishPolicyLoader.getPublishPolicy();
|
ServiceProxy
用来获取service
的,就是包一层proxy
吧。
PublishPolicy
简单说就是发布service
的,但是内部还是借助于ServicePublisher
进行操作的。
RegistryManager
这个类的作用其实是为ServicePublisher
的addService
到zookeeper
中和unpublish
指定的service
提供API支持的。
更通俗点就是为invoker
和provider
提供了zookeeper
的交流的。
但是内部还是使用的registry
类进行和zookeeper
调用的。
1 2 3 4 5 6 7 8 9 10 11
| public interface Registry { boolean isSupportNewProtocol(String serviceAddress, String serviceName) throws RegistryException; void setSupportNewProtocol(String serviceAddress, String serviceName, boolean support) throws RegistryException; RegistryConfig getRegistryConfig(String ip) throws RegistryException; }
|
InvokerBootStrap
这个类看名字也知道是为了invoker
提供方便的。
看调用也是AbstractServiceProxy
这个类调用的最多
主要有两个方法,startup
和shutdown
,也没什么重要的成员变量。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public static void startup() { if (!isStartup) { synchronized (InvokerBootStrap.class) { if (!isStartup) { ServiceInvocationRepository.getInstance().init(); InvokerProcessHandlerFactory.init(); SerializerFactory.init(); LoadBalanceManager.init(); RegionPolicyManager.INSTANCE.init(); Monitor monitor = MonitorLoader.getMonitor(); if (monitor != null) { monitor.init(); } isStartup = true; logger.warn("pigeon client[version:" + VersionUtils.VERSION + "] has been started"); } } } }
|
其中ServiceInvocationRepository
的init
方法主要是启动了一个InvocationTimeoutListener
线程,看样子是用来检测调用超时问题。
在InvokerProcessHandlerFactory
的init
方法中,就是在service
的proxy
中,加入了一条责任链。
1 2 3 4 5 6 7
| registerBizProcessFilter(new TraceFilter()); registerBizProcessFilter(new DegradationFilter()); registerBizProcessFilter(new ClusterInvokeFilter()); registerBizProcessFilter(new GatewayInvokeFilter()); registerBizProcessFilter(new ContextPrepareInvokeFilter()); registerBizProcessFilter(new SecurityFilter()); registerBizProcessFilter(new RemoteCallInvokeFilter());
|
SerializerFactory
见下
LoadBalanceManager
的init
方法,初始化了pigeon
支持的四种负载均衡的算法。
RandomLoadBalance
AutoawareLoadBalance
RoundRobinLoadBalance
WeightedAutoawareLoadBalance
在RegionPolicyManager
的init
方法中,就是对Region
的选择算法进行了初始化。
那个monitor
的方法,我看好像还没写好。。。。
ProviderBootStrap
初始化
包含了SerializerFactory
和ProviderProcessHandlerFactory
的初始化。
同时对jetty server
和netty server
两个server
进行了初始化和启动。
同时启动了一个监听服务器的线程ShutdownHookListener
如果服务器关闭,那么就会调用一些类的关闭方法
1 2 3 4
| ServiceFactory.unpublishAllServices(); InvokerBootStrap.shutdown(); ProviderBootStrap.shutdown();
|
关闭
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public static void shutdown() { for (Server server : serversMap.values()) { if (server != null) { logger.info("start to stop " + server); try { unregisterConsoleServer(server.getServerConfig()); server.stop(); } catch (Throwable e) { } if (logger.isInfoEnabled()) { logger.info(server + " has been shutdown"); } } } try { ProviderProcessHandlerFactory.destroy(); } catch (Throwable e) { } }
|
把server
从zookeeper
上下掉,这个过程调用了RegistryManager
的方法。
然后调用了ProviderProcessHandlerFactory
的destroy
方法。
SerializerFactory
对框架支持的序列化方法进行了注册。
内部维护了两个map
1 2
| ConcurrentHashMap<String, Byte> serializerTypes; ConcurrentHashMap<Byte, Serializer> serializers
|
这个Serializer类有啥用呢,这个其实还值得我们去研究研究。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public interface Serializer { Object deserializeRequest(InputStream is) throws SerializationException; void serializeRequest(OutputStream os, Object obj) throws SerializationException; Object deserializeResponse(InputStream is) throws SerializationException; void serializeResponse(OutputStream os, Object obj) throws SerializationException; Object proxyRequest(InvokerConfig<?> invokerConfig) throws SerializationException; InvocationResponse newResponse() throws SerializationException; InvocationRequest newRequest(InvokerContext invokerContext) throws SerializationException; }
|
Serializer
接口中有七个方法,都是和序列化有关,其中proxyRequest
方法请看pigeon的服务注册与发布.
我们进行方法调用,发送请求,获取调用结果。这中间的过程都是需要进行序列化的。
pigeon
支持11种序列化方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| public enum SerializerType { INTERNAL_THRIFT((byte) 1, "internalThrift"), HESSIAN((byte) 2, "hessian"), JAVA((byte) 3, "java"), PROTO((byte) 5, "proto"), HESSIAN1((byte) 6, "hessian1"), JSON((byte) 7, "json"), FST((byte) 8, "fst"), PROTOBUF((byte) 9, "protobuf"), THRIFT((byte) 10, "thrift"), PROTOBUF3((byte) 11, "protobuf3");
|
同时有八个进行序列化操作的实体类。
ProviderProcessHandlerFactory
我们知道在InvokerProcessHandlerFactory
中,给invoke
的过程加上了一天责任链。
那么这个的init
方法就是给Provide
的时候,也加上一条责任链。
1 2 3 4 5 6 7 8 9 10
| registerBizProcessFilter(new TraceFilter()); if (Constants.MONITOR_ENABLE) { registerBizProcessFilter(new MonitorProcessFilter()); } registerBizProcessFilter(new WriteResponseProcessFilter()); registerBizProcessFilter(new ContextTransferProcessFilter()); registerBizProcessFilter(new ExceptionProcessFilter()); registerBizProcessFilter(new SecurityFilter()); registerBizProcessFilter(new GatewayProcessFilter()); registerBizProcessFilter(new BusinessProcessFilter());
|