Spark-源码学习-通信服务-架构设计-传输层-TransportChannelHandler
一、概述
TransportChannelHandler 实现了 Netty 的 ChannelInboundHandler,以便对 Netty 管道中的消息进行处理。当 TransportChannelHandler 读取到的 request 是 RequestMessage 类型时,则将此消息的处理进一步交给 TransportRequestHandler,当 request 是 ResponseMessage 时,则将此消息的处理进一步交给 TransporResponseHandler。
由协议层的实现知道,最终的消息实现类都直接或间接地实现了 RequestMessage 或 ResponseMessage 接口,在 RPC 传输层中创建的所有通道都是双向的。当客户端使用 RequestMessage 启动 Netty 通道(由服务端的 TransportRequestHandler 处理)时,服务端也会生成 ResponseMessage (由客户端的 TransportRequestHandler 处理)。
服务端也会在同一个 Channel 上获取句柄,向客户端发送 RequestMessages。所以客户端还需要一个 RequestHandler,而 Server 需要一个 ResponselHandler,用于客户端对服务端请求的响应。
二、MessageHandler
TransportRequestHandler 与 TransportResponseHandler 都继承自抽象类 MessageHandler, MessageHandler 定义了子类的规范。
- $handle()$: 用于对接收到的单个消息进行处理。
- $channelActive()$: 当 channel 激活时调用。
- $exceptionCaught()$: 当捕获到 channel 发生异常时调用。
- $channelInactive()$: 当 channel 非激活时调用。
2.1. TransportRequestHandler
Server 端处理 Client 请求的处理程序,把请求消息交给 RpcHandler 进行进一步处理。
1 |
|
$processFetchRequest()$ 方法用于处理 ChunkFetchRequest 类型的消息
$processRpcRequest()$ 方法用于处理 RpcRequest 类型的消息
将 RpcRequest 消息的内容体、发送消息的客户端及一个 RpcResponseCallback 类型的匿名内部类作为参数传递给了 RpcHandler 的 $receive()$ 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19private void processRpcRequest(final RpcRequest req) {
try {
rpcHandler.receive(reverseClient, req.body().nioByteBuffer(), new RpcResponseCallback() {
public void onSuccess(ByteBuffer response) {
respond(new RpcResponse(req.requestId, new NioManagedBuffer(response)));
}
public void onFailure(Throwable e) {
respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
}
});
} catch (Exception e) {
//...
} finally {
req.body().release();
}
}所有继承 RpcHandler 的子类都需要在其 $receive()$ 方法的具体实现中回调 RpcResponseCallback 的 $onSuccess$(处理成功时)或者$onFailure$ (处理失败时)方法。
从 RpcResponseCallback 的实现来看,无论处理结果成功还是失败,都将调用 respond 方法对客户端进行响应。
$processStreamRequest()$ 方法用于处理 StreamRequest 类型的消息
$processOneWayMessage()$ 方法用于处理无需回复的 RPC 请求
$processOneWayMessage()$ 方法的实现与 $processRpcRequest()$ 非常相似,区别在于 $processOneWayMessage()$ 调用了 只接收 TransportClient 和 ByteBuffer 两个参数的 $receive$ 方法,其 RpcResponseCallback 为默认的 ONE_WAY_CALLBACK
因而 $processOneWayMessage()$ 在处理完 RPC 请求后不会对客户端作出响应。
1
2
3
4
5
6
7
8
9private void processOneWayMessage(OneWayMessage req) {
try {
rpcHandler.receive(reverseClient, req.body().nioByteBuffer());
} catch (Exception e) {
logger.error("Error while invoking RpcHandler#receive() for one-way message.", e);
} finally {
req.body().release();
}
}
2.2.1. RpcHandler
RpcHandler 是一个抽象类,定义了一些 RPC 处理器的规范。
receive
这是一个抽象方法,用来接收单一的 RPC 消息,具体处理逻辑需要子类去实现。
receive 接收 3 个参数,分别是 TransportClient、ByteBuffer 和 RpcResponseCallback。RpcResponseCallback 用于对请求处理结束后进行回调,无论处理结果是成功还是失败,RpcResponseCallback 都会被调用一次。
RpcResponseCallback 的接口定义如下。
重载的 receive
只接收 TransportClient 和 ByteBuffer 两个参数,RpcResponseCallback 为默认的 ONE_WAY_CALLBACK,其类型为OneWayRpcCallback,OneWayRpcCallback 其 onSuccess 和 onFailure 只是打印日志,并没有针对客户端做回复处理。
channelActive
当与给定客户端相关联的 channel 处于活动状态时调用。
channelInactive
当与给定客户端相关联的channel处于非活动状态时调用。
exceptionCaught
当 channel 产生异常时调用。
getStreamManager
获取 StreamManager, StreamManager 可以从流中获取单个的块,因此它也包含着当前正在被 TransportClient 获取的流的状态
2.3. TransporResponseHandler
Client 端处理 Server 响应的处理程序,在 Client 端发送消息时,根据发送消息的类型调用 TransportResponseHandler 中的方法注册回调函数,回调函数和请求信息放入相应的缓存中。待 TransportResponseHandler 收到 Server 端的响应消息时,再调用主要的工作方法 $handle()$ ,根据响应消息类型从对应缓存中取出回调函数并调用。