一、概述

Spark RPC 的 Socket I/O 一个典型的 Reactor 模型, 但是结合了 Actor 模型中的 mailbox(Inbox/OutBox), 是一种混合的实现方式。Inbox 是一个存储消息的队列,它负责将消息存储在一个队列中,然后以线程安全的方式将消息发送到 RpcEndpoint。

二、实现

2.1. 结构

2.1.1. 属性

  1. messages: 向其他远端 NettyRpcEnv 上的所有 RpcEndpoint 发送的消息列表

    消息列表 messages 中的消息类型为 OutboxMessage, 所有将要向远端发送的消息都会被封装成 OutboxMessage 类型。

    OutboxMessage 在客户端使用, 是对外发送消息的封装。InboxMessage 在服务端使用, 是对所接收消息的封装。

    Outbox

    TransportClient 的 $sendRpc()$ 方法的第二个参数是 RpcResponseCallback 类型, RpcOutboxMessage 本身也实现了 RpcResponseCallback, 所以调用的时候传递了 RpcOutboxMessage 的 this 引用。

  2. stopped: 当前 Inbox 是否停止的状态

2.1.2. sealed trait

在 Inbox 类中还定义了几个私有的内部类型,它们都扩展了 InboxMessage 这个 sealed trait。这些内部类型用于表示可以在收件箱中发送的不同类型的消息,包括:

  1. OneWayMessage: 表示发送方只需要将消息发送到接收方,但是不需要接收方的回复。
  2. RpcMessage: 表示发送方需要将消息发送到接收方,并且需要接收方的回复。
  3. OnStart: 表示这个收件箱(以及与之关联的端点)开始运行。
  4. OnStop: 表示这个收件箱(以及与之关联的端点)停止运行。
  5. RemoteProcessConnected: 表示一个远程进程已连接。
  6. RemoteProcessDisconnected: 表示一个远程进程已断开连接。
  7. RemoteProcessConnectionError: 表示一个远程连接出现错误

三、初始化

Inbox 初始化时自动投递 OnStart 消息,让 RpcEndpoint 做一些准备工作

1
2
3
inbox.synchronized {
messages.add(OnStart)
}

四、处理消息

在 Inbox 类中,有一个方法 $process()$ 用于处理存储的消息。这个方法会循环遍历消息列表,并使用一个调度程序(Dispatcher)来处理每个消息。对于每个消息,这个方法会检查消息的类型,并使用相应的方法来处理它:

  1. 如果消息是一个 RpcMessage,则调用端点的 $receiveAndReply$ 方法来处理这个消息。如果在处理过程中发生了异常,则会调用回调上下文的 $sendFailure$ 方法并将异常抛出。
  2. 如果消息是一个 OneWayMessage,则调用端点的 $receive$ 方法来处理这个消息。
  3. 如果消息是一个 OnStart,则调用端点的 $onStart$ 方法,并且如果端点不是一个线程安全的端点,则会启用并发处理。
  4. 如果消息是一个 OnStop,则调用端点的 $onStop$ 方法,并且会从调度程序中移除端点的引用。
  5. 对于其他类型的消息,如果端点的 onConnected、onDisconnected 或 onNetworkError 方法已被重写,则会调用相应的方法来处理消息。

五、关闭

Inbox 关闭化时自动投递 OnStop 消息,让 RpcEndpoint 做一些收尾工作。

1
2
3
4
5
6
7
8
9
10
def stop(): Unit = inbox.synchronized {
if (!stopped) {
// We should disable concurrent here. Then when RpcEndpoint.onStop is called, it's the only
// thread that is processing messages. So `RpcEndpoint.onStop` can release its resources
// safely.
enableConcurrent = false
stopped = true
messages.add(OnStop)
}
}

5.1. 处理 OnStop 消息

  1. $dispatcher.removeRpcEndpointRef()$

    1
    def removeRpcEndpointRef(endpoint: RpcEndpoint): Unit = endpointRefs.remove(endpoint)
  2. $endpoint.onStop()$