一、概述

TransportChannelHandler 实现了 Netty 的 ChannelInboundHandler,以便对 Netty 管道中的消息进行处理。当 TransportChannelHandler 读取到的 request 是 RequestMessage 类型时,则将此消息的处理进一步交给 TransportRequestHandler,当 request 是 ResponseMessage 时,则将此消息的处理进一步交给 TransporResponseHandler。

由协议层的实现知道,最终的消息实现类都直接或间接地实现了 RequestMessage 或 ResponseMessage 接口,在 RPC 传输层中创建的所有通道都是双向的。当客户端使用 RequestMessage 启动 Netty 通道(由服务端的 TransportRequestHandler 处理)时,服务端也会生成 ResponseMessage (由客户端的 TransportRequestHandler 处理)。

服务端也会在同一个 Channel 上获取句柄,向客户端发送 RequestMessages。所以客户端还需要一个 RequestHandler,而 Server 需要一个 ResponselHandler,用于客户端对服务端请求的响应。

二、MessageHandler

TransportRequestHandler 与 TransportResponseHandler 都继承自抽象类 MessageHandler, MessageHandler 定义了子类的规范。

MessageHandler
  1. $handle()$: 用于对接收到的单个消息进行处理。
  2. $channelActive()$: 当 channel 激活时调用。
  3. $exceptionCaught()$: 当捕获到 channel 发生异常时调用。
  4. $channelInactive()$: 当 channel 非激活时调用。

2.1. TransportRequestHandler

Server 端处理 Client 请求的处理程序,把请求消息交给 RpcHandler 进行进一步处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Override
public void handle(RequestMessage request) throws Exception {
if (request instanceof ChunkFetchRequest) {
chunkFetchRequestHandler.processFetchRequest(channel, (ChunkFetchRequest) request);
} else if (request instanceof RpcRequest) {
processRpcRequest((RpcRequest) request);
} else if (request instanceof OneWayMessage) {
processOneWayMessage((OneWayMessage) request);
} else if (request instanceof StreamRequest) {
processStreamRequest((StreamRequest) request);
} else if (request instanceof UploadStream) {
processStreamUpload((UploadStream) request);
} else if (request instanceof MergedBlockMetaRequest) {
processMergedBlockMetaRequest((MergedBlockMetaRequest) request);
} else {
throw new IllegalArgumentException("Unknown request type: " + request);
}
}
  1. $processFetchRequest()$ 方法用于处理 ChunkFetchRequest 类型的消息

  2. $processRpcRequest()$ 方法用于处理 RpcRequest 类型的消息

    将 RpcRequest 消息的内容体、发送消息的客户端及一个 RpcResponseCallback 类型的匿名内部类作为参数传递给了 RpcHandler 的 $receive()$ 方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    private void processRpcRequest(final RpcRequest req) {
    try {
    rpcHandler.receive(reverseClient, req.body().nioByteBuffer(), new RpcResponseCallback() {
    @Override
    public void onSuccess(ByteBuffer response) {
    respond(new RpcResponse(req.requestId, new NioManagedBuffer(response)));
    }

    @Override
    public void onFailure(Throwable e) {
    respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
    }
    });
    } catch (Exception e) {
    //...
    } finally {
    req.body().release();
    }
    }

    所有继承 RpcHandler 的子类都需要在其 $receive()$ 方法的具体实现中回调 RpcResponseCallback 的 $onSuccess$(处理成功时)或者$onFailure$ (处理失败时)方法。

    从 RpcResponseCallback 的实现来看,无论处理结果成功还是失败,都将调用 respond 方法对客户端进行响应。

  3. $processStreamRequest()$ 方法用于处理 StreamRequest 类型的消息

  4. $processOneWayMessage()$ 方法用于处理无需回复的 RPC 请求

    $processOneWayMessage()$ 方法的实现与 $processRpcRequest()$ 非常相似,区别在于 $processOneWayMessage()$ 调用了 只接收 TransportClient 和 ByteBuffer 两个参数的 $receive$ 方法,其 RpcResponseCallback 为默认的 ONE_WAY_CALLBACK

    因而 $processOneWayMessage()$ 在处理完 RPC 请求后不会对客户端作出响应。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    private void processOneWayMessage(OneWayMessage req) {
    try {
    rpcHandler.receive(reverseClient, req.body().nioByteBuffer());
    } catch (Exception e) {
    logger.error("Error while invoking RpcHandler#receive() for one-way message.", e);
    } finally {
    req.body().release();
    }
    }

2.2.1. RpcHandler

RpcHandler 是一个抽象类,定义了一些 RPC 处理器的规范。

RpcHandler
  1. receive

    这是一个抽象方法,用来接收单一的 RPC 消息,具体处理逻辑需要子类去实现。

    receive 接收 3 个参数,分别是 TransportClient、ByteBuffer 和 RpcResponseCallback。RpcResponseCallback 用于对请求处理结束后进行回调,无论处理结果是成功还是失败,RpcResponseCallback 都会被调用一次。

    RpcResponseCallback 的接口定义如下。

    RpcResponseCallback
  2. 重载的 receive

    只接收 TransportClient 和 ByteBuffer 两个参数,RpcResponseCallback 为默认的 ONE_WAY_CALLBACK,其类型为OneWayRpcCallback,OneWayRpcCallback 其 onSuccess 和 onFailure 只是打印日志,并没有针对客户端做回复处理。

  3. channelActive

    当与给定客户端相关联的 channel 处于活动状态时调用。

  4. channelInactive

    当与给定客户端相关联的channel处于非活动状态时调用。

  5. exceptionCaught

    当 channel 产生异常时调用。

  6. getStreamManager

    获取 StreamManager, StreamManager 可以从流中获取单个的块,因此它也包含着当前正在被 TransportClient 获取的流的状态

2.3. TransporResponseHandler

Client 端处理 Server 响应的处理程序,在 Client 端发送消息时,根据发送消息的类型调用 TransportResponseHandler 中的方法注册回调函数,回调函数和请求信息放入相应的缓存中。待 TransportResponseHandler 收到 Server 端的响应消息时,再调用主要的工作方法 $handle()$ ,根据响应消息类型从对应缓存中取出回调函数并调用。