一、概述

RpcEnv 是各个组件之间通信的执行环境,每个节点之间(Driver 或者 Worker/Executor) 组件的 Endpoint 和对应的 EndpointRef 之间的信息通信和方法调用都是通过 RpcEnv 作协调。

RpcEnv 类似于 ActorSystem,服务端和客户端都可以使用它来做通信:

  1. 对于 server 端来说,RpcEnv 是 RpcEndpoint 的运行环境,负责 RpcEndPoint 的生命周期管理,解析 Tcp 层的数据包以及反序列化数据封装成 RpcMessage,然后根据路由传送到对应的 Endpoint;

  2. 对于 client 端来说,可以通过 RpcEnv 获取 RpcEndpoint 的引用 RpcEndpointRef,然后通过 RpcEndpointRef 与对应的 Endpoint 通信。

二、RpcEnv 初始化

RpcEnv 的初始化由 RpcEnvFactory 负责, RpcEnvFactory 目前只有一个子类实现: NettyRpcEnvFactory,初始化 NettyRpcEnv 的过程其实就是对内部各个子组件 RpcEndpoint、 Inbox/Outbox、 Dispatcher、 TransportContext、TransportServer 等组件的实例化过程。

2.1. 初始化 NettyRpcEnv

2.1.1. 初始化路由层

  1. 初始化 Dispatcher

    Dispatcher 负责将 rpc 消息路由到对此消息进行处理的 RpcEndpoint 上。

  2. 初始化 Outbox

    指令消息发件箱, 一个远程端点对应一个发件箱, 当消息放入 Outbox 后、紧接器保消息通过 TransportClient 发送出去。

    1
    private val outboxes = new ConcurrentHashMap[RpcAddress, Outbox]()

2.1.2. 初始化 NettyStreamManager

流式管理器,用于处理 RPC 环境中的文件,如自定义的配置文件或者 JAR 包。

2.1.3. 初始化传输层

  1. 初始化 TransportConf

    1
    2
    3
    4
    5
    6
    7
    val role = conf.get(EXECUTOR_ID).map { id =>
    if (id == SparkContext.DRIVER_IDENTIFIER) "driver" else "executor"
    }

    private[netty] val transportConf = SparkTransportConf.fromSparkConf(
    conf.clone.set(RPC_IO_NUM_CONNECTIONS_PER_PEER, 1), "rpc", conf.get(RPC_IO_THREADS).getOrElse(numUsableCores),
    role)
  2. 初始化 TransportContext

    1
    2
    private val transportContext = new TransportContext(transportConf, 
    new NettyRpcHandler(dispatcher, this, streamManager))
  3. 初始化传输层客户端 TransportClientFactory

    1
    private val clientFactory = transportContext.createClientFactory(createClientBootstraps())

2.2. startServiceOnPort()

2.2.1. 获取 tryPort

1
2
3
4
5
val tryPort = if (startPort == 0) {
startPort
} else {
userPort(startPort, offset)
}

2.2.2. startNettyRpcEnv

1
2
3
4
val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
nettyEnv.startServer(config.bindAddress, actualPort)
(nettyEnv, nettyEnv.address.port)
}
  1. 初始化 TransportServer

    1
    server = transportContext.createServer(bindAddress, port, bootstraps)
  2. 注册 Dispatcher

    1
    dispatcher.registerRpcEndpoint(RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher))

    在每个 RpcEndpoint 注册的时候都会注册一个默认的 RpcEndpointVerifier,它的作用是客户端调用的时候先用它来询问 Endpoint 是否存在。