Spark-源码学习-通信服务-RpcEnv
一、概述
RpcEnv 是各个组件之间通信的执行环境,每个节点之间(Driver 或者 Worker/Executor) 组件的 Endpoint 和对应的 EndpointRef 之间的信息通信和方法调用都是通过 RpcEnv 作协调。
RpcEnv 类似于 ActorSystem,服务端和客户端都可以使用它来做通信:
对于 server 端来说,RpcEnv 是 RpcEndpoint 的运行环境,负责 RpcEndPoint 的生命周期管理,解析 Tcp 层的数据包以及反序列化数据封装成 RpcMessage,然后根据路由传送到对应的 Endpoint;
对于 client 端来说,可以通过 RpcEnv 获取 RpcEndpoint 的引用 RpcEndpointRef,然后通过 RpcEndpointRef 与对应的 Endpoint 通信。
二、RpcEnv 初始化
RpcEnv 的初始化由 RpcEnvFactory 负责, RpcEnvFactory 目前只有一个子类实现: NettyRpcEnvFactory,初始化 NettyRpcEnv 的过程其实就是对内部各个子组件 RpcEndpoint、 Inbox/Outbox、 Dispatcher、 TransportContext、TransportServer 等组件的实例化过程。
2.1. 初始化 NettyRpcEnv
2.1.1. 初始化路由层
初始化 Dispatcher
Dispatcher 负责将 rpc 消息路由到对此消息进行处理的 RpcEndpoint 上。
引用本站文章Spark-源码学习-通信服务-架构设计-路由层-DispatcherJoker初始化 Outbox
指令消息发件箱, 一个远程端点对应一个发件箱, 当消息放入 Outbox 后、紧接器保消息通过 TransportClient 发送出去。
1
private val outboxes = new ConcurrentHashMap[RpcAddress, Outbox]()
引用本站文章Spark-源码学习-通信服务-架构设计-路由层-OutboxJoker
2.1.2. 初始化 NettyStreamManager
流式管理器,用于处理 RPC 环境中的文件,如自定义的配置文件或者 JAR 包。
2.1.3. 初始化传输层
初始化 TransportConf
1
2
3
4
5
6
7val role = conf.get(EXECUTOR_ID).map { id =>
if (id == SparkContext.DRIVER_IDENTIFIER) "driver" else "executor"
}
private[netty] val transportConf = SparkTransportConf.fromSparkConf(
conf.clone.set(RPC_IO_NUM_CONNECTIONS_PER_PEER, 1), "rpc", conf.get(RPC_IO_THREADS).getOrElse(numUsableCores),
role)初始化 TransportContext
1
2private val transportContext = new TransportContext(transportConf,
new NettyRpcHandler(dispatcher, this, streamManager))初始化传输层客户端 TransportClientFactory
1
private val clientFactory = transportContext.createClientFactory(createClientBootstraps())
2.2. startServiceOnPort()
2.2.1. 获取 tryPort
1 | val tryPort = if (startPort == 0) { |
2.2.2. startNettyRpcEnv
1 | val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort => |
初始化 TransportServer
1
server = transportContext.createServer(bindAddress, port, bootstraps)
注册 Dispatcher
1
dispatcher.registerRpcEndpoint(RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher))
在每个 RpcEndpoint 注册的时候都会注册一个默认的 RpcEndpointVerifier,它的作用是客户端调用的时候先用它来询问 Endpoint 是否存在。