Spark-源码学习-通信服务-架构设计-传输层设计-客户端
一、概述
创建传输客户端工厂 TransportClientFactory 是 NettyRpcEnv 向远端服务发起请求的基础, Spark 与远端 RpcEnv 进行通信都依赖于 TransportClientFactory 生产的 TransportClient。
有了 TransportClientFactory, Spark 的各个模块就可以使用它创建 RPC 客户端 TransportClient 了。每个 TransportClient 实例只能和一个远端的 RPC 服务通信,所以 Spark 中的组件如果想要和多个 RPC 服务通信,就需要持有多个 TransportClient 实例。
![](https://note3.oss-cn-hangzhou.aliyuncs.com/source/image-20230902114131159.png?x-oss-process=image/watermark,type_d3F5LXplbmhlaQ,text_d3d3Lnp4Y2hvbWUuY29tCg==,size_36,g_center,color_FFFFFF,shadow_100,t_100,g_se,x_30,y_350)
二、实现
$TransportContext.createClientFactory()$ 方法可以创建 TransportClientFactory 的实例,在 TransportContext 中有两个重载的 $createClientFactory()$ 方法, 它们最终在构造 TransportClientFactory 时都会传递两个参数: TransportContext 和 TransportClientBootstrap 列表,通过调用 $TransportContext.createClientFactory()$ 方法创建传输客户端工厂 TransportClientFactory 的实例。
2.1. TransportClientFactory
创建传输客户端工厂 TransportClientFactory 是 NettyRpcEnv 向远端服务发起请求的基础, Spark 与远端 RpcEnv 进行通信都依赖于 TransportClientFactory 生产的 TransportClient
TransportClientFactory 是 RPC 客户端的工厂类。在构造 TransportClientFactory 的实例时,会传递客户端引导程序 TransportClientBootstrap 的列表。此外,TransportClientFactory 内部还存在针对每个 Socket 地址的连接池 ClientPool。
2.2. TransportClient
有了 TransportClientFactory, Spark 的各个模块就可以使用它创建 RPC 客户端 TransportClient 了。每个 TransportClient 实例只能
和一个远端的RPC服务通信,所以Spark中的组件如果想要和多个RPC服务通信,就需要持有多个Transportclient实例。
三、TransportClientFactory
3.1. 初始化
TransportClientFactory 构造器中的各个变量如下
context
参数传递的 TransportContext 的引用
conf
通过调用 $Transportcontext.getconf()$ 获取 TransportConf
clientBootstraps
connectionPool
针对每个 Socket 地址的连接池 ClientPool 的缓存。TransportClientFactory 内部还存在针对每个 Socket 地址的连接池 ClientPool
numConnectionsPerPeer
从 TransportConf 获取的 key 为
spark.+模块名+io.numConnectionsPerPeer
的属性值,用于指定对等节点间的连接数。这里的模块名为 TransportConf 的 module 字段。
rand
对 Socket 地址对应的连接池 ClientPool 中缓存的 TransportClient 进行随机选择,对每个连接做负载均衡。
ioMode
I/O 模式, 即从 TransportConf 获取 key 为
spark.+模块名+io.mode
的属性值。默认值为 NIO.Spark 还支持 EPOLL。socketChannelClass
客户端 Channel 被创建时使用的类,通过 ioMode 来匹配,默认 NioSocketChannel,Spark 还支持 EpollEventLoopGroup。
workerGroup
根据 Netty 的规范, 客户端只有 worker 组, 所以此处创建 workerGroup。 workerGroup 的实际类型是 NioEventLoopGroup。
pooledAllocator
汇集 ByteBuf 但对本地线程缓存禁用的分配器。
四、TransportClient
有了 TransportClientFactory, Spark 的各个模块就可以使用它创建 RPC 客户端 TransportClient 了。每个 TransportClient 实例只能和一个远端的 RPC 服务通信,所以 Spark 中的组件如果想要和多个 RPC 服务通信,就需要持有多个 TransportClient 实例
![image-20230824075957210](https://note3.oss-cn-hangzhou.aliyuncs.com/source/image-20230824075957210.png?x-oss-process=image/watermark,type_d3F5LXplbmhlaQ,text_d3d3Lnp4Y2hvbWUuY29tCg==,size_36,g_center,color_FFFFFF,shadow_100,t_100,g_se,x_30,y_830)
4.1.结构
4.2. TransportClient 初始化
调用 $InetSocketAddress.createUnresaVed()$ 构建 InetSocketAddress(这种方式创建 InetSocketAddress,可以在
缓存中已经有 TransportClient 时避免不必要的域名解析〉,然后从 connectionPool 中获取与此地址对应的 ClientPool,如果没
有,则需要新建 ClientPool,并放人缓存 connectionPool 中。根据
numConnectionsPerPeer
的大小(使用spark.+模块名+.io.numConnectionsPerPeer
属性值),从 ClientPool 中随
4.3. 发送请求
TransportClient 一共有 5 个方法用于发送请求
fetchChunk 从远端协商好的流中请求单个块
stream 使用流的 ID, 从运端获取流数据
sendRpc() 向服务端发送 RPC 的请求, 通过 At least Once Delivery 原则保证请求不会丢失。
使用 UUID 生成请求主键 requestId
调用 addRpcRequest() 向 handler 添加口 requestId 与回调类 RpcResponseCallback 的引用之间的关系。
TransportResponseHandler 的 addRpcReques() 方法将更新最后一次请求的时间为当前系统时间,然后将 requestId 与
RpcResponseCallback 之间的映射加入到 outstandingRpcs 缓存中。outstandingRpcs 用于缓存发出的 RPC 请求信息
调用 Channel 的 writeAndFlush() 方法将 RPC 请求发送出去
调用 Channel 的 writeAndFlush() 方法将 RPC 请求发送出去, 当发送成功或者失败时会回调 ChannelFutureListener 的operationComplete() 方法。如果发送成功, 那么只会打印 requestId、远端地址及花费时间的日志, 如果发送失败, 除了打印错误日志外,还要调用 TransportResponseHandler 的 removeRpcRequest() 方法, 将此次请求从 outstandingRpcs 缓存中移除
请求发送成功后, 客户端将等待接收服务端的响应, 返回的消息会传递给 TransportChannelIHandler 的 channelRead() 方法, 最后交给 TransportResponseHandler 的 handle() 方法来处理。
- sendRpcSync() 向服务端发送异步的RPC的请求,并根据指定的超时时间等待响应。
- send() 向服务端发送RPC的请求,但是并不期望能获取响应,因而不能保证投递的可靠性。