感受学习业内优秀开源分布式框架的底层rpc实现。
调用过程大致可以分为六个阶段,这里只贴出服务调用各个阶段的调用栈进行备忘,详细源码分析请点击原文链接进行阅读
1服务消费方(dubbo-consumer)发布请求
调用栈
proxy0#sayHello(String)
—> InvokerInvocationHandler#invoke(Object, Method, Object[])
—> MockClusterInvoker#invoke(Invocation)
—> AbstractClusterInvoker#invoke(Invocation)
—> FailoverClusterInvoker#doInvoke(Invocation, List<Invoker<T>>, LoadBalance)
—> Filter#invoke(Invoker, Invocation) // 包含多个 Filter 调用
—> ListenerInvokerWrapper#invoke(Invocation)
—> AbstractInvoker#invoke(Invocation)
—> DubboInvoker#doInvoke(Invocation)
—> ReferenceCountExchangeClient#request(Object, int)
—> HeaderExchangeClient#request(Object, int)
—> HeaderExchangeChannel#request(Object, int)
—> AbstractPeer#send(Object)
—> AbstractClient#send(Object, boolean)
—> NettyChannel#send(Object, boolean)
—> NioClientSocketChannel#write(Object)
2请求(request)编码
dubbo数据包
请求头
偏移量(Bit) | 字段 | 取值 |
---|---|---|
0 ~ 7 | 魔数高位 | 0xda00 |
8 ~ 15 | 魔数低位 | 0xbb |
16 | 数据包类型 | 0 - Response, 1 - Request |
17 | 调用方式 | 仅在第16位被设为1的情况下有效,0 - 单向调用,1 - 双向调用 |
18 | 事件标识 | 0 - 当前数据包是请求或响应包,1 - 当前数据包是心跳包 |
19 ~ 23 | 序列化器编号 | 2 - Hessian2Serialization 3 - JavaSerialization 4 - CompactedJavaSerialization 6 - FastJsonSerialization 7 - NativeJavaSerialization 8 - KryoSerialization 9 - FstSerialization |
24 ~ 31 | 状态 | 20 - OK 30 - CLIENT_TIMEOUT 31 - SERVER_TIMEOUT 40 - BAD_REQUEST 50 - BAD_RESPONSE … |
32 ~ 95 | 请求编号 | 共8字节,运行时生成 |
96 ~ 127 | 消息体长度 | 运行时计算 |
调用栈
ExchangeCodec#encode(Channel channel, ChannelBuffer buffer, Object msg)
->ExchangeCodec#encodeRequest(Channel channel, ChannelBuffer buffer, Request req)
->DubboCodec#encodeRequestData(Channel channel, ObjectOutput out, Object data, String version)
3服务提供方(dubbo-provider)解码请求
调用栈
ExchangeCodec#decode(Channel channel, ChannelBuffer buffer)
->ExchangeCodec#decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header)
->DubboCodec#decodeBody(Channel channel, InputStream is, byte[] header)
->DecodeableRpcInvocation#decode
4服务提供方调用服务
提供方处理请求的线程模型
解码器将数据包解析成 Request 对象后,NettyHandler 的 messageReceived 方法紧接着会收到这个对象,并将这个对象继续向下传递。这期间该对象会被依次传递给 NettyServer、MultiMessageHandler、HeartbeatHandler 以及 AllChannelHandler。最后由 AllChannelHandler 将该对象封装到 Runnable 实现类对象中,并将 Runnable 放入线程池中执行后续的调用逻辑。
调用栈
NettyHandler#messageReceived(ChannelHandlerContext, MessageEvent)
—> AbstractPeer#received(Channel, Object)
—> MultiMessageHandler#received(Channel, Object)
—> HeartbeatHandler#received(Channel, Object)
—> AllChannelHandler#received(Channel, Object)
—> ExecutorService#execute(Runnable) // 由线程池执行后续的调用逻辑
在不同的子线程里进行实际的服务调用,整个调用栈如下
ChannelEventRunnable#run()
—> DecodeHandler#received(Channel, Object)
—> HeaderExchangeHandler#received(Channel, Object)
—> HeaderExchangeHandler#handleRequest(ExchangeChannel, Request)
—> DubboProtocol.requestHandler#reply(ExchangeChannel, Object)
—> Filter#invoke(Invoker, Invocation)
—> AbstractProxyInvoker#invoke(Invocation)
—> Wrapper0#invokeMethod(Object, String, Class[], Object[])
—> DemoServiceImpl#sayHello(String)
5服务提供方返回调用结果(response)
服务提供方调用指定服务后,会将调用结果封装到 Response 对象中,并将该对象返回给服务消费方。服务提供方也是通过 NettyChannel 的 send 方法将 Response 对象返回。
调用栈
ExchangeCodec#(Channel channel, ChannelBuffer buffer, Object msg)
->ExchangeCodec#encodeResponse(Channel channel, ChannelBuffer buffer, Response res)
->DubboCodec#encodeResponseData(Channel channel, ObjectOutput out, Object data, String version)
6服务消费方接收调用结果
服务消费方在收到响应数据后,首先要做的事情是对响应数据进行解码,得到 Response 对象。然后再将该对象传递给下一个入站处理器,这个入站处理器就是 NettyHandler。接下来 NettyHandler 会将这个对象继续向下传递,最后 AllChannelHandler 的 received 方法会收到这个对象,并将这个对象派发到线程池中。
响应数据解码-调用栈
DubboCodec#decodeBody(Channel channel, InputStream is, byte[] header)
->DecodeableRpcResult#DecodeableRpcResult(Channel channel, Response response, InputStream is, Invocation invocation, byte id)
->DecodeableRpcResult#decode()
->DecodeableRpcResult#decode(Channel channel, InputStream input)
解码接收到的数据的线程与rpc请求最初发起的线程必定不是同一个线程,所以最后要解决的问题就是如何将调用结果传递给用户线程。dubbo设计了一个类似于 java.util.concurrent.Future的ResponseFuture(具体实现类为DefaultFuture)。使用ReentrantLock进行线程通讯。在用户线程发起请求时会调用condition.awit()对用户线程进行阻塞,在接收到响应结果后反序列化并塞入DefaultFuture的response字段,此时调用condition.signal()唤醒用户线程,用户线程便可拿到序列化后的结果。更具体的实现推荐直接阅读源码,这里只给出调用栈。
序列化结果传递-调用栈
HeaderExchangeHandler#received(Channel channel, Object message)
->HeaderExchangeHandler#handleResponse(Channel channel, Response response)
->DefaultFuture#received(Channel channel, Response response)
->DefaultFuture#doReceived(Response res)
一般情况下,服务消费方会并发调用多个服务,每个用户线程发送请求后,会调用不同 DefaultFuture 对象的 get 方法进行等待。 一段时间后,服务消费方的线程池会收到多个响应对象。这个时候要考虑一个问题,如何将每个响应对象传递给相应的 DefaultFuture 对象,且不出错。答案是通过调用编号。DefaultFuture 被创建时,会要求传入一个 Request 对象。此时 DefaultFuture 可从 Request 对象中获取调用编号,并将 <调用编号, DefaultFuture 对象> 映射关系存入到静态 Map 中,即 FUTURES。线程池中的线程在收到 Response 对象后,会根据 Response 对象中的调用编号到 FUTURES 集合中取出相应的 DefaultFuture 对象,然后再将 Response 对象设置到 DefaultFuture 对象中。最后再唤醒用户线程,这样用户线程即可从 DefaultFuture 对象中获取调用结果了。整个过程大致如下图:
版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: