一、概述

创建传输客户端工厂 TransportClientFactory 是 NettyRpcEnv 向远端服务发起请求的基础, Spark 与远端 RpcEnv 进行通信都依赖于 TransportClientFactory 生产的 TransportClient。

有了 TransportClientFactory, Spark 的各个模块就可以使用它创建 RPC 客户端 TransportClient 了。每个 TransportClient 实例只能和一个远端的 RPC 服务通信,所以 Spark 中的组件如果想要和多个 RPC 服务通信,就需要持有多个 TransportClient 实例。

二、实现

$TransportContext.createClientFactory()$ 方法可以创建 TransportClientFactory 的实例,在 TransportContext 中有两个重载的 $createClientFactory()$ 方法, 它们最终在构造 TransportClientFactory 时都会传递两个参数: TransportContext 和 TransportClientBootstrap 列表,通过调用 $TransportContext.createClientFactory()$ 方法创建传输客户端工厂 TransportClientFactory 的实例。

2.1. TransportClientFactory

创建传输客户端工厂 TransportClientFactory 是 NettyRpcEnv 向远端服务发起请求的基础, Spark 与远端 RpcEnv 进行通信都依赖于 TransportClientFactory 生产的 TransportClient

TransportClientFactory 是 RPC 客户端的工厂类。在构造 TransportClientFactory 的实例时,会传递客户端引导程序 TransportClientBootstrap 的列表。此外,TransportClientFactory 内部还存在针对每个 Socket 地址的连接池 ClientPool。

2.2. TransportClient

有了 TransportClientFactory, Spark 的各个模块就可以使用它创建 RPC 客户端 TransportClient 了。每个 TransportClient 实例只能
和一个远端的RPC服务通信,所以Spark中的组件如果想要和多个RPC服务通信,就需要持有多个Transportclient实例。

三、TransportClientFactory

3.1. 初始化

TransportClientFactory 构造器中的各个变量如下

  1. context

    参数传递的 TransportContext 的引用

  2. conf

    通过调用 $Transportcontext.getconf()$ 获取 TransportConf

  3. clientBootstraps

  4. connectionPool

    针对每个 Socket 地址的连接池 ClientPool 的缓存。TransportClientFactory 内部还存在针对每个 Socket 地址的连接池 ClientPool

  5. numConnectionsPerPeer

    从 TransportConf 获取的 key 为 spark.+模块名+io.numConnectionsPerPeer 的属性值,用于指定对等节点间的连接数。

    这里的模块名为 TransportConf 的 module 字段。

  6. rand

    对 Socket 地址对应的连接池 ClientPool 中缓存的 TransportClient 进行随机选择,对每个连接做负载均衡。

  7. ioMode

    I/O 模式, 即从 TransportConf 获取 key 为 spark.+模块名+io.mode 的属性值。默认值为 NIO.Spark 还支持 EPOLL。

  8. socketChannelClass

    客户端 Channel 被创建时使用的类,通过 ioMode 来匹配,默认 NioSocketChannel,Spark 还支持 EpollEventLoopGroup。

  9. workerGroup

    根据 Netty 的规范, 客户端只有 worker 组, 所以此处创建 workerGroup。 workerGroup 的实际类型是 NioEventLoopGroup。

  10. pooledAllocator

    汇集 ByteBuf 但对本地线程缓存禁用的分配器。

四、TransportClient

有了 TransportClientFactory, Spark 的各个模块就可以使用它创建 RPC 客户端 TransportClient 了。每个 TransportClient 实例只能和一个远端的 RPC 服务通信,所以 Spark 中的组件如果想要和多个 RPC 服务通信,就需要持有多个 TransportClient 实例

image-20230824075957210

4.1.结构

4.2. TransportClient 初始化

  1. 调用 $InetSocketAddress.createUnresaVed()$ 构建 InetSocketAddress(这种方式创建 InetSocketAddress,可以在
    缓存中已经有 TransportClient 时避免不必要的域名解析〉,然后从 connectionPool 中获取与此地址对应的 ClientPool,如果没
    有,则需要新建 ClientPool,并放人缓存 connectionPool 中。

  2. 根据 numConnectionsPerPeer 的大小(使用 spark.+模块名+.io.numConnectionsPerPeer 属性值),从 ClientPool 中随

4.3. 发送请求

TransportClient 一共有 5 个方法用于发送请求

  1. fetchChunk 从远端协商好的流中请求单个块

  2. stream 使用流的 ID, 从运端获取流数据

  3. sendRpc() 向服务端发送 RPC 的请求, 通过 At least Once Delivery 原则保证请求不会丢失。

  • 使用 UUID 生成请求主键 requestId

  • 调用 addRpcRequest() 向 handler 添加口 requestId 与回调类 RpcResponseCallback 的引用之间的关系。

    TransportResponseHandler 的 addRpcReques() 方法将更新最后一次请求的时间为当前系统时间,然后将 requestId 与
    RpcResponseCallback 之间的映射加入到 outstandingRpcs 缓存中。

    outstandingRpcs 用于缓存发出的 RPC 请求信息

  • 调用 Channel 的 writeAndFlush() 方法将 RPC 请求发送出去

    调用 Channel 的 writeAndFlush() 方法将 RPC 请求发送出去, 当发送成功或者失败时会回调 ChannelFutureListener 的operationComplete() 方法。如果发送成功, 那么只会打印 requestId、远端地址及花费时间的日志, 如果发送失败, 除了打印错误日志外,还要调用 TransportResponseHandler 的 removeRpcRequest() 方法, 将此次请求从 outstandingRpcs 缓存中移除

请求发送成功后, 客户端将等待接收服务端的响应, 返回的消息会传递给 TransportChannelIHandler 的 channelRead() 方法, 最后交给 TransportResponseHandler 的 handle() 方法来处理。

  1. sendRpcSync() 向服务端发送异步的RPC的请求,并根据指定的超时时间等待响应。
  2. send() 向服务端发送RPC的请求,但是并不期望能获取响应,因而不能保证投递的可靠性。