Spark-源码学习-SparkCore-通信服务-架构设计-路由层-Inbox
一、概述
Spark RPC 的 Socket I/O 一个典型的 Reactor 模型, 但是结合了 Actor 模型中的 mailbox(Inbox/OutBox), 是一种混合的实现方式。Inbox 是一个存储消息的队列,它负责将消息存储在一个队列中,然后以线程安全的方式将消息发送到 RpcEndpoint。
二、实现
2.1. 结构
![](https://note3.oss-cn-hangzhou.aliyuncs.com/source/image-20230827221325248.png?x-oss-process=image/watermark,type_d3F5LXplbmhlaQ,text_d3d3Lnp4Y2hvbWUuY29tCg==,size_36,g_center,color_FFFFFF,shadow_100,t_100,g_se,x_30,y_30)
2.1.1. 属性
messages: 向其他远端 NettyRpcEnv 上的所有 RpcEndpoint 发送的消息列表
消息列表 messages 中的消息类型为 OutboxMessage, 所有将要向远端发送的消息都会被封装成 OutboxMessage 类型。
OutboxMessage 在客户端使用, 是对外发送消息的封装。InboxMessage 在服务端使用, 是对所接收消息的封装。
TransportClient 的 $sendRpc()$ 方法的第二个参数是 RpcResponseCallback 类型, RpcOutboxMessage 本身也实现了 RpcResponseCallback, 所以调用的时候传递了 RpcOutboxMessage 的 this 引用。
stopped: 当前 Inbox 是否停止的状态
2.1.2. sealed trait
在 Inbox 类中还定义了几个私有的内部类型,它们都扩展了 InboxMessage 这个 sealed trait
。这些内部类型用于表示可以在收件箱中发送的不同类型的消息,包括:
- OneWayMessage: 表示发送方只需要将消息发送到接收方,但是不需要接收方的回复。
- RpcMessage: 表示发送方需要将消息发送到接收方,并且需要接收方的回复。
- OnStart: 表示这个收件箱(以及与之关联的端点)开始运行。
- OnStop: 表示这个收件箱(以及与之关联的端点)停止运行。
- RemoteProcessConnected: 表示一个远程进程已连接。
- RemoteProcessDisconnected: 表示一个远程进程已断开连接。
- RemoteProcessConnectionError: 表示一个远程连接出现错误
三、初始化
Inbox 初始化时自动投递 OnStart 消息,让 RpcEndpoint 做一些准备工作
1 | inbox.synchronized { |
四、处理消息
在 Inbox 类中,有一个方法 $process()$ 用于处理存储的消息。这个方法会循环遍历消息列表,并使用一个调度程序(Dispatcher)来处理每个消息。对于每个消息,这个方法会检查消息的类型,并使用相应的方法来处理它:
- 如果消息是一个 RpcMessage,则调用端点的 $receiveAndReply$ 方法来处理这个消息。如果在处理过程中发生了异常,则会调用回调上下文的 $sendFailure$ 方法并将异常抛出。
- 如果消息是一个 OneWayMessage,则调用端点的 $receive$ 方法来处理这个消息。
- 如果消息是一个 OnStart,则调用端点的 $onStart$ 方法,并且如果端点不是一个线程安全的端点,则会启用并发处理。
- 如果消息是一个 OnStop,则调用端点的 $onStop$ 方法,并且会从调度程序中移除端点的引用。
- 对于其他类型的消息,如果端点的 onConnected、onDisconnected 或 onNetworkError 方法已被重写,则会调用相应的方法来处理消息。
![](https://note3.oss-cn-hangzhou.aliyuncs.com/source/image-20230827223338471.png?x-oss-process=image/watermark,type_d3F5LXplbmhlaQ,text_d3d3Lnp4Y2hvbWUuY29tCg==,size_36,g_center,color_FFFFFF,shadow_100,t_100,g_se,x_30,y_330)
五、关闭
Inbox 关闭化时自动投递 OnStop 消息,让 RpcEndpoint 做一些收尾工作。
1 | def stop(): Unit = inbox.synchronized { |
5.1. 处理 OnStop 消息
$dispatcher.removeRpcEndpointRef()$
1
def removeRpcEndpointRef(endpoint: RpcEndpoint): Unit = endpointRefs.remove(endpoint)
$endpoint.onStop()$