前言
pigeon
是大众点评内部一直在使用的rpc
框架,同时带有服务治理的功能。
组件是使用zookeeper
+ netty
+ jetty
完成。
现在也已经开源
但是开源似乎做的不用心,可能也是看dubbo
很火热,pigeon
没人用
而且pigeon
和点评内部的一点闭源的框架联系紧密,所以用的人并不多(反正我是一个没看到)。
使用
demo
Server端
pigeon
是与spring
进行了很重的耦合的,如果要使用pigeon
,那么必须使用spring
框架。
如果要暴露我们的服务
比如创建一个简单的UserService
接口和实现。
1 | public interface UserService { |
由于pigeon
的服务注册和发现是依赖zookeeper
的,我们还需要装载一个zookeeper
在resources文件夹下创建config
文件夹,在config
文件夹下创建pigeon.properties
文件
在其中写上zookeeper的ip和端口1
pigeon.registry.address=localhost:2181
同时为了区分服务,需要给我们的服务起一个名字
在sources
的META-INF
文件夹的app.properties
里写上1
pigeon.registry.address=localhost:2181
如果使用typical
的方法声明服务的话,我们创建一个spring
的配置文件,假设就叫spring.xml
1
2
3
4
5
6
7
8
9
10<bean id="userServiceImpl" class="cc.lovezhy.service.UserServiceImpl"/>
<bean class="com.dianping.pigeon.remoting.provider.config.spring.ServiceBean" init-method="init">
<property name="services">
<map>
<entry
key="cc.lovezhy.service.UserService" value-ref="userServiceImpl"/>
</map>
</property>
</bean>
这时候,就可以直接用Main方法启动了1
2
3
4
5
6
7public class Main {
public static void main(String[] args) throws Exception {
SpringContainer container = new SpringContainer("classpath*:/META-INF/spring.xml");
container.start();
System.in.read();
}
}
当然如果你想要部署到tomcat
中也是可以的。
同时默认在localhost:4080/services
上我们可以看到当前我们暴露出去的服务
并且可以调用。
client端
client
端只需要包含服务的接口就行了。
所以service
端的代码编写都是分模块写的,一个模块专门提供DTO
和service
接口
打包成maven
供其他人使用。
然后具体的实现我们再在另外一个模块里写。
resources
和META-INF
的内容和服务端一样的。
就是在bean
的声明的时候,我们需要这样1
2
3
4<bean id="userService" class="com.dianping.pigeon.remoting.invoker.config.spring.ReferenceBean"
init-method="init">
<property name="interfaceName" value="cc.lovezhy.service.UserService"/>
</bean>
然后我们就可以在Main
方法中引用了
1 | public class Main { |
配置客户端调用模式
在pigeon
内部,客户端调用远程服务有4种模式
sync
同步future
异步callback
也是异步,只是通过回调的方式来处理结果oneway
不需要回复
例如spring
编程方式下只需要配置callType
属性:
1 | <bean id="babyAccountService" class="com.dianping.pigeon.remoting.invoker.config.spring.ReferenceBean" init-method="init"> |
官方文档解释的很清楚了所有的选项。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20<bean id="echoService"
class="com.dianping.pigeon.remoting.invoker.config.spring.ReferenceBean"
init-method="init">
<!-- 服务全局唯一的标识url,默认是服务接口类名,必须设置 -->
<property name="url" value="http://service.dianping.com/demoService/echoService_1.0.0" />
<!-- 接口名称,必须设置 -->
<property name="interfaceName" value="com.dianping.pigeon.demo.EchoService" />
<!-- 超时时间,毫秒,默认5000,建议自己设置 -->
<property name="timeout" value="2000" />
<!-- 序列化,hessian/fst/protostuff,默认hessian,可不设置-->
<property name="serialize" value="hessian" />
<!-- 调用方式,sync/future/callback/oneway,默认sync,可不设置 -->
<property name="callType" value="sync" />
<!-- 失败策略,快速失败failfast/失败转移failover/失败忽略failsafe/并发取最快返回forking,默认failfast,可不设置 -->
<property name="cluster" value="failfast" />
<!-- 是否超时重试,默认false,可不设置 -->
<property name="timeoutRetry" value="false" />
<!-- 重试次数,默认1,可不设置 -->
<property name="retries" value="1" />
</bean>
ReferenceBean的获取,init方法
这个我也画了一个简单的图,不过省略了很多细节部分,在InvokerBootStrap
部分和Serializer.proxyRequest
部分省略了很多。
下面具体看代码。
在我们声明bean
的时候还带有一个init-method
参数init-method="init"
意思是在这个bean
创建的时候还会调用一下ReferenceBean
的init
方法。
在init
方法中
1 | public void init() throws Exception { |
1 | InvokerConfig<?> invokerConfig = new InvokerConfig(this.objType, this.url, this.timeout, this.callType, |
InvokerConfig
参数
Class<T> serviceInterface
接口的Class类String url
服务全局唯一的标识url,感觉有了serviceInterface
就够了String version
版本?byte callMethod
就是call的方式,sync还是future之类,但是是byte类型的,1代表sync等String callType
callType就是callMethod的Stringbyte serialize
序列化方式,默认是hessianint timeout
超时时间InvocationCallback callback
设置了callback模式才有String suffix
String loadbalance
负载均衡的策略设置String routePolicy
路由的规则RoutePolicy routePolicyObj
boolean timeoutRetry
String cluster
int retries
失败了重试的次数String vip
int maxRequests
String protocol
Map<String, InvokerMethodConfig> methods
ClassLoader classLoader
String secret
String remoteAppKey
Object mock
上面还提到一个服务降级的问题,这个我们之后再说。
从上面看到,得到的service
是从ServiceFactory
这个类直接得到的。
在getService
的时候直接get
了
1 | //ServiceFactory这个类,主要就是管理service的加载,发布和获取的。 |
默认的话是建一个DefaultServiceProxy
它的调用是调用了父类的1
2
3
4
5
6public final class DefaultServiceProxy extends AbstractServiceProxy {
public <T> T getProxy(InvokerConfig<T> invokerConfig) {
return super.getProxy(invokerConfig);
}
}
在AbstractServiceProxy
中1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53//这个其实是个缓存的map,看下面的get过程中有个加锁的过程
protected final static Map<InvokerConfig<?>, Object> services = new ConcurrentHashMap<InvokerConfig<?>, Object>();
public <T> T getProxy(InvokerConfig<T> invokerConfig) {
//...省略好多好多
Object service = null;
service = services.get(invokerConfig);
if (service == null) {
//这个锁,好像用的guava中一个东西,提供和String.intern相同的作用但是不会占用老年代空间?
//有时间再研究研究
synchronized (interner.intern(invokerConfig)) {
service = services.get(invokerConfig);
if (service == null) {
try {
//参见另外一篇文章
InvokerBootStrap.startup();
//下面这句就调用AbstractSerializer中的proxyRequest动态生成一个代理类
service = SerializerFactory.getSerializer(invokerConfig.getSerialize()).proxyRequest(invokerConfig);
if (StringUtils.isNotBlank(invokerConfig.getLoadbalance())) {
LoadBalanceManager.register(invokerConfig.getUrl(), invokerConfig.getSuffix(), invokerConfig.getLoadbalance());
}
} catch (Throwable t) {
throw new RpcException("error while trying to get service:" + invokerConfig, t);
}
try {
//配置地域策略,默认的是autoSwitch
routePolicyManager.register(invokerConfig.getUrl(), invokerConfig.getSuffix(),
invokerConfig.getRoutePolicy());
} catch (Throwable t) {
throw new RouteException("error while setup region route policy: " + invokerConfig, t);
}
// watch service config
try {
serviceConfigManager.register(invokerConfig.getUrl());
} catch (Throwable t) {
throw new ConfigException("error while trying to watch service config: " + invokerConfig, t);
}
try {
ClientManager.getInstance().registerClients(invokerConfig);
} catch (Throwable t) {
logger.warn("error while trying to setup service client:" + invokerConfig, t);
}
//加到Map缓存里
services.put(invokerConfig, service);
}
}
}
return (T) service;
}
在InvokerBootStrap.startup()
中1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21//有很多的init方法,说明都是和invoke相关的一些配件。
//这是处理调用超时问题的,把一个InvocationTimeoutListener跑在一个线程中
ServiceInvocationRepository.getInstance().init();
//初始化那个proxy的handle,下面会提到
InvokerProcessHandlerFactory.init();
//序列化工厂的初始化,默认支持很多序列化方式
SerializerFactory.init();
//负载均衡调度的初始化,默认支持四种
LoadBalanceManager.init();
//Region策略,就是分地域的策略,如果是北京上海都有服务,那么调用哪一边的问题。
RegionPolicyManager.INSTANCE.init();
Monitor monitor = MonitorLoader.getMonitor();
if (monitor != null) {
monitor.init();
}
isStartup = true;
得到服务的唯一标志,如果不指定url的话,默认就是接口的全称。1
2
3
4
5ServiceFactory.getServiceUrl(invokerConfig);
public static <T> String getServiceUrl(InvokerConfig<T> invokerConfig) {
String url = invokerConfig.getServiceInterface().getCanonicalName();
return url;
}
service代理对象1
2
3
4
5
6
7
8
9
10//proxyRequest得到一个proxy的对象
public abstract class AbstractSerializer implements Serializer {
public Object proxyRequest(InvokerConfig<?> invokerConfig) throws SerializationException {
//第一个参数是ClassLoader,第二个参数是interface的数组,第三个参数是Proxy类
return Proxy.newProxyInstance(ClassUtils.getCurrentClassLoader(invokerConfig.getClassLoader()),
new Class[] { invokerConfig.getServiceInterface() }, new ServiceInvocationProxy(invokerConfig,
InvokerProcessHandlerFactory.selectInvocationHandler(invokerConfig)));
}
}
1 | //这里的handler是连接远程调用的桥梁 |
当我们调用其他的方法时,其实是在1
handler.handle(new DefaultInvokerContext(invokerConfig, methodName, parameterTypes, args)
这里面进行处理的。
这里的handle是1
private ServiceInvocationHandler handler;
在InvokerProcessHandlerFactory
中得到一个实例
//这里应该是最重要的部分了1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61public final class InvokerProcessHandlerFactory {
private static List<InvocationInvokeFilter> bizProcessFilters = new LinkedList<InvocationInvokeFilter>();
private static ServiceInvocationHandler bizInvocationHandler = null;
private static volatile boolean isInitialized = false;
//这里进行了一个责任链的处理,类似于netty的那种
//在调用前进行日志,权限之类的分析。
//最后的RemoteCallInvokeFilter才是真正调用我们想要的方法
//初始化在之前就进行了,见上
public static void init() {
if (!isInitialized) {
if (Constants.MONITOR_ENABLE) {
registerBizProcessFilter(new RemoteCallMonitorInvokeFilter());
}
//trace 监控信息
registerBizProcessFilter(new TraceFilter());
//服务降级
registerBizProcessFilter(new DegradationFilter());
registerBizProcessFilter(new ClusterInvokeFilter());
//网关,统计流量啥的
registerBizProcessFilter(new GatewayInvokeFilter());
registerBizProcessFilter(new ContextPrepareInvokeFilter());
//安全验证
registerBizProcessFilter(new SecurityFilter());
//通过Netty调用
registerBizProcessFilter(new RemoteCallInvokeFilter());
bizInvocationHandler = createInvocationHandler(bizProcessFilters);
isInitialized = true;
}
}
public static ServiceInvocationHandler selectInvocationHandler(InvokerConfig<?> invokerConfig) {
return bizInvocationHandler;
}
"rawtypes" }) ({
private static <V extends ServiceInvocationFilter> ServiceInvocationHandler createInvocationHandler( List<V> internalFilters) {
ServiceInvocationHandler last = null;
List<V> filterList = new ArrayList<V>();
filterList.addAll(internalFilters);
//创建一个调用链
for (int i = filterList.size() - 1; i >= 0; i--) {
final V filter = filterList.get(i);
final ServiceInvocationHandler next = last;
last = new ServiceInvocationHandler() {
"unchecked") (
public InvocationResponse handle(InvocationContext invocationContext) throws Throwable {
InvocationResponse resp = filter.invoke(next, invocationContext);
return resp;
}
};
}
return last;
}
public static void registerBizProcessFilter(InvocationInvokeFilter filter) {
bizProcessFilters.add(filter);
}
}
在RemoteCallInvokeFilter
中invoke方法中,
调用的是InvokerUtils
的方法1
response = InvokerUtils.sendRequest(client, invocationContext.getRequest(), future);
1 | public static InvocationResponse sendRequest(Client client, InvocationRequest request, Callback callback) { |
client
的实现有两种,一个是Tcp
的还有一个是Http
的,pigeon
两种都支持。
好像一般的调用是调用的tcp
的方案,然后在4080/services
查看和调用的是http
的协议。
在Netty
的实现中的doWrite
方法,其实就是调用了channel
的write0
方法1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public InvocationResponse doWrite(InvocationRequest request) throws NetworkException {
NettyChannel channel = null;
try {
channel = channelPool.selectChannel();
ChannelFuture future = channel.write0(request);
afterWrite(request, channel);
if (request.getMessageType() == Constants.MESSAGE_TYPE_SERVICE
|| request.getMessageType() == Constants.MESSAGE_TYPE_HEART) {
future.addListener(new MessageWriteListener(request, channel));
}
} catch (Exception e) {
throw new NetworkException("[doRequest] remote call failed:" + request, e);
}
return null;
}
至此,代理bean
的创建和invoke
的流程大概就理清楚了。
但是其实它是怎么和zookeeper
沟通拿到service
的呢。
我猜想肯定是从那条责任链的某个地方中取得的,于是进行了一番苦苦查找。
在ClusterFactory
的select
中,默认是返回FailfastCluster
,在它的invoke
方法中
1 | Client remoteClient = clientManager.getClient(invokerConfig, request, null); |
有这句话。
这个Client
默认是NettyClient
。
如果我们在仔细看一下ClientManager
的构造函数
1 | private ClientManager() { |
这里其实启动了一个ProviderAvailableListener
。
这个类和RegisterManager
关系密切,而RegisterManager
则是掌管和zookeeper
沟通的,由此不难看出,这里的service
就是从这儿得到的。
服务端集群策略
在ClusterInvokeFilter
中进行的配置
failfast
- 调用服务的一个节点失败后抛出异常返回,可以同时配置重试timeoutRetry和retries属性failover
- 调用服务的一个节点失败后会尝试调用另外的一个节点,可以同时配置重试 timeoutRetry和retries属性failsafe
- 调用服务的一个节点失败后不会抛出异常,返回null,后续版本会考虑按配置默认值返回forking
- 同时调用服务的所有可用节点,返回调用最快的节点结果数据。
可以通过配置forkingSize,指定最多调用的节点数(pigeon2.10.3及以上版本通过xml配置forkingSize,其余版本可以通过lion配置{appkey}.pigeon.invoker.forking.size)hedged
- 发出第一个请求后,如果hedgedDelay
时间内没有返回,会向其他节点发送第二个请求,返回最先返回的结果数据
ServiceBean的注册
(画了我好久的图)
具体的流程和ReferenceBean
的获取其实差不了太多。