Hadoop 源码学习-Hadoop RPC-传输层-服务端设计
服务器端代码获取了 Server 对象后,会启动这个 Server 对象监听网络上的 RPC 请求并触发响应操作。为了提高性能,Server 类采用了 Java NIO 提供的基于 Reactor 设计模式的事件驱动 I/O 模型,当 Server 完整地从网络接收一个 RPC 请求后,会调用 call()方法响应这个请求
一、概述
为了提高性能,Server 类采用了很多技术来提高并发能力,包括线程池、JavaNIO 提供的 Reactor 模式等,其中 Reactor 模式贯穿了整个 Server 的设计。
1.1. 模式 Reactor
二、架构
Server 类是服务器端从网络接收 RPC 请求的类,成功地接收一个 RPC 请求后,Server 会调用 call() 方法响应这个请求。
服务端 NameNodeRpcServer 会启动一个 RPC.Server 监听来自客户端的所有 RPC 请求,当 RPC Server 在网络上监听到一个 RPC 请求时,解析这个请求,然后构造 ProtoBufRpcInvoker 对象来处理这个请求。
2.1. 组件
2.1.1. Listener
类似于 Reactor 模式中的 mainReactor。Listener 对象中存在一个 Selector 对象 acceptSelector,负责监听来自客户端的 Socket 连接请求。
当 acceptSelector 监听到连接请求后,Listener 对象会初始化这个连接,之后采用轮询的方式从 readers 线程池中选出一个 Reader 线程处理 RPC 请求的读取操作。
2.1.2. Reader
与 Reactor 模式中的 Reader 线程相同,用于读取 RPC 请求。Reader 线程类中存在一个 Selector 对象 readSelector,类似于 Reactor 模式中的 readReactor,用于监听网络中是否有可以读取的 RPC 请求。当 readSelector 监听到有可读的 RPC 请求后,会唤醒 Reader 线程读取这个请求,并将请求封装在一个 Call 对象中,然后将这个 Call 对象放入共享队列 CallQueue 中。
2.1.3. 内部类 Connection
Connection 类维护 了 Server 与 Client 之间的 Socket 连接。Reader 线程会调用 readAndProcess() 方法从 IO 流中读取一个 RPC 请求。
2.1.4. Handler
与 Reactor 模式中的 Handler 类似,用于处理 RPC 请求并发回响应。Handler 对象会从 CallQueue 中不停地取出 RPC 请求,然后执行 RPC 请求对应的本地函数,最后封装响应并将响应发回客户端。为了能够并发地处理 RPC 请求,Server 中会存在多个 Handler 对象。
2.1.5. Responder
内部类 Responder 是一个线程类,用于向客户端发送 RPC 响应,Server 端仅有一个 Responder 对象,Responder 内部包含一个 Selector 对象 responseSelector,用于监听 SelectionKey.OP_WRITE
事件。当网络环境不佳或者响应信息太大时,Handler 线程可能无法发送完整的响应信息到客户端,这时Handler 会在 Responder.responseSelector 上注册 SelectionKey.OP_WRITE
事件,responseSelector 会循环监听网络环境是否具备发送数据的条件,之后 responseSelector 会触发 Responder 线程发送未完成的响应结果到客户端。
读者可能会问,在 Handler 中不是已经发送 RPC 响应了吗?为什么还需要再实现一个 Responder 类?这是因为,在响应很大或者网络条件不佳等情况下,Handler 线程很难将完整的响应发回客户端,这就会造成 Handler 线程阻塞,从而影响 RPC 请求的处理效率。所以 Handler 在没能够将完整的 RPC 响应发回客户端时,会在 Responder 内部的 respondSelector 上注册一个写响应事件,这里的 respondSelector 与 Reactor 模式的 respondSelector 概念相同,当respondSelector 监听到网络情况具备写响应的条件时,会通知 Responder 将剩余响应发回客户端。
三、实现
3.1. 接受 RPC 请求
Listener 线程的 acceptSelector 在 ServerSocketChannel 上注册 OP_ACCEPT 事件,并且创建 readers 线程池。每个 Reader 的 readSelector 此时并不监听任何 Channel。
Client 发送 Socket 连接请求,触发 Listener 的 acceptSelector 唤醒 Listener 线程。
Listener 调用 ServerSocketChannel.accept() 创建一个新的 SocketChannel。
Listener 从 readers 线程池中挑选一个线程,并在 Reader 的 readSeleetor 上注册
OP READ
事件。Client 发送 RPC 请求数据包,触发 Reader 的 selector 唤醒 Reader 线程。
Reader 从 SocketChannel 中读取数据,封装成 Call 对象,然后放入共享队列 CallQueue 中。
最初,handlers 线程池中的线程都在 CallQueue(调用 BlockingQueue.take() 上阻塞,当有 Call 对象被放入后,其中一个 Handler 线程被唤醒,然后根据 Call 对象的信息调用 BlockingService 对象的 callBlockingMethod() 方法。随后,Handler 尝试将响应
写入 SocketChannel。如果 Handler 发现无法将响应完全写 入 SocketChannel,将在 Responder 的 respondSelector 上注册 OP WRITE 事件。当 Socket 恢复正常时,Responder 将被唤醒,继续写响应。当然,如果一个 Call 响应在一定时间内都无法被写入,则会被 Responder 移除。