在 Hadoop RPC 的使用小节中我们介绍了 DFSClient 会获取一个 ClientProtocol PB 协议的代理对象,并在这个代理对象上调用 RPC 方法,代理对象会调用 RPC.Client.call()方法将序列化之后的 RPC 请求发送到服务器。

一、概述

二、架构

client

2.1. Call

RPC.Client 中发送请求和接收响应是由两个独立的线程进行的,发送请求线程就是调用 Clientl.call() 方法的线程,而接收响应线程则是 call()启动的 Connection 线程。

那么这两个线程是如何同步 Server 发回的响应信息的呢?

  1. 线程 1 调用Client.call() 发送 RPC 请求到 Server,然后在这个请求对应的 Call 对象上调用 Call.wait() 方法等待 Server 发回响应信息。
  2. 当线程 2 从 Server 接收了响应信息后,会设置 Call.rpcResponse字段保存响应信息,然后调用 Cal.notify()方法唤醒线程 1。线程 1 被唤醒后,会取出 Call.rpcResponse 字段中记录的 Server 发回的响应信息并返回。

当 Server 成功地执行了 RPC 调用,并发回响应到接收线程后,接收线程会调用 Call.setRpeResponse() 方法保存 Server 发回的响应信息。如果 Server在执行 RPC 调用时出现异常,则接收线程会调用 Call.setException() 方法保存异常信息。

要特别注意的是,setRpcResponse()以及 setException()都会唤醒在 Call 对象上等待的请求发送线程。

2.2. Connection

内部类 Connection 是一个线程类,它提供了建立 Client 到 Server 的 Socket 连接、发送RPC 请求以及读取 RPC响应信息等功能。Connection 的字段多是与网络连接相关的,如 Socket输入输出流、超时时间、重发次数等。

三、实现

DFSClient 持有一个 ClientProtocol 对象向 Namenode 发送请求,DFSClient 持有的 ClientProtocol 对象其实是 ClientNameNodeProtocolTranslatorPB 的实例( ClientNameNodeProtocolTranslatorPB 实现了 ClientProtocol 接口)。

DFSClient 在 ClientProtocol 代理对象上调用 RPC 方法,代理对象会调用 RPC.Client.call()方法将序列化之后的RPC 请求发送到服务器。

2.2.1. Client 发送请求

Client 类只有一个入口 call() 方法。代理类会调用 Client.call() 方法将 RPC 请求发送到远程服务器,然后等待远程服务器的响应。如果远程服务器响应请求时出现异常,则在 call()方法中抛出异常。

  1. Client.call() 方法将 RPC 请求封裝成一个 Call 对象,Call 对象中保存了 RPC 调用的完成标志、返回值信息以及异常信息

    1
    final Call call = createCall(rpcKind, rpcRequest);
  2. 随后,Client.call() 方法会创建一个 Connection 对象,Connection 对象用于管理 Client 与 Server 的 Socket 连接。

  3. 用 Connectionld 作为 key,将新建的 Connection 对象放入 Client.connections 字段中保存

    对于 Connection 对象,由于涉及了与 Server 建立 Socket 连接,比较耗费资源,所以 Client 类使用一个 HashTable 对象 connections 保存那些没有过期的 Connection,如果可以复用,则复用这些 Connection 对象

    以 callld 作为 key,将构造的 Call 对象放入 Connection.calls 字段中保存。

  4. Client.call() 方法调用 Connection.setupIOstreams() 方法建立与 Server 的 Socket 连接。Connection.setupIOstreams()方法启动 Connection 线程监听 Socket 并读取 Server 发回的响应信息。

  5. Client.call() 方法调用 Connection.sendRpcRequest() 方法发送 RPC 请求到 Server。

  6. Client.call() 方法调用 Call.wait() 在 Call 对象上等待,等待 Server 发回响应信息。

  7. Connection 线程收到 Server 发回的响应信息,根据响应消息中携带的信息找到对应的 Call 对象,然后设置 Call 对象的返回值字段,并调用 call.notify() 唤醒调用 Client.call() 方法的线程读取 Call 对象的返回值。

2.2.3. 接受 Server 响应

RPC.Client 中发送请求和接收响应是由两个独立的线程进行的,发送请求线程就是调用Client.call() 方法的线程,而接收响应线程则是 call() 启动的 Connection 线程。那么这两个线程是如何同步 Server 发回的响应信息的呢?

  • 线程 1 调用Client.call()发送 RPC 请求到 Server,然后在这个请求对应的 Call 对象上调用 Call.wait()方法等待 Server 发回响应信息。
  • 当线程2 从 Server 接收了响应信息后,会设置 Call.rpcResponse 字段保存响应信息,然后调用 Call.notify() 方法唤醒线程 1。线程 1 被唤醒后,会取出Call.rpcResponse 字段中记录的 Server 发回的响应信息并返回。