一、概述

二、实现

Namenode 定义了 NameNodeRpcServer 类响应来自 HDFS 集群中其他节点的 RPC 请求,NameNodeRpeServer 实现了包括 ClientProtocol、 NamenodeProtocol、DatanodeProtocol 以及 HAServiceProtocol 在内的所有需要与 Namenode 交互的 RPC 协议接口。

NameNodeRpcServer

Namenode 会在它的初始化方法 initialize() 中调用 createRpcServer() 创建 NameNodeRpcServer 对象的实例,createRpcServer() 方法会直接调用 NameNodeRpcServer 的构造方法~

2.1. 初始化

NameNodeRpcServer 的构造方法首先设置了 RPC 类的序列化引擎为 protobuf,然后构造了两个 RPC.Server 对象:

  1. clientRpcServer 用于响应来自 HDFS 客户端的RPC 请求;
  2. serviceRpcServer 则用于响应来自 Datanode 的 RPC 请求。

NameNodeRpcServer 的构造方法代码如下:

2.1.1. 设置 RPC 引擎为 protobufEngine

1
RPC.setProtocolEngine (conf, ClientNamenodeProtocolPB.class,ProtobufRpcEngine.class);

2.1.2. 构造 ClientNamenodeProtocolServersideTranslatorPB 对象

1
2
ClientNamenodeProtocolServerSideTranslatorPB clientProtocolServerTranslator = 
new ClientNamenodeProtocolServerSideTranslatorPB(this);

ClientNamenodeProtocolServerSideTranslatorPB 对象是将 ClientNamenodeProtocolPB 接口调用适配成 ClientProtocol 调用的适配器类,它内部会持有一个实现了ClientProtocol 接口的对象,将 ClientNamenodeProtocolPB 调用的参数反序列化之后,它会调用 ClientProtocol 对象的对应方法执行 RPC 操作。

2.1.3. 构造 BlockingService

通过调用 ClientNamenodeProtocol.newReflectiveBlockingService(BlockingInterfaceimpl)方法获取 ClientNamenodeProtocolPB 协议对应的 BlockingService 实现类。

1
2
BlockingService clientNNPbService = ClientNamenodeProtocol.
newReflectiveBlockingService(clientProtocolServerTranslator);

ClientNamenodeProtocol.newReflectiveBlockingService() 构造了一个匿名的 BlockingService 对象并返回

ClientNamenodeProtocol.newReflectiveBlockingService

这个匿名的 BlockingService 对象定义了一个 callBlockingMethod()方法。callBlockingMethod() 方法接受三个参数:

image-20221121111314505
  • method 参数描述了当前RPC 调用的方法信息

    callBlockingMethod() 方法会根据 method 参数记录的调用方法信息,在 impl 引用上调用对应的方法。这里的 impl 引用 是 ClientNamenodeProtocolServerSideTranslatorPB 类型的

  • controller 参数在这里默认为 null,不使用

  • request 参数记录了 RPC 调用的参数信息

ClientNamenodeProtocolServerSideTranslatorPB 会将 ClientNamenodeProtocolPB 调用的参数反序列化,然后前转到 NamenodeRpcServer 对象上执行 RPC 操作。这样,Server 对象监听到 RPC 请求后,只需通过请求头域中的接口信息获取对应的 BlockingService 对象,然后在这个 BlockingService 对象上调用 calBlockingMethod()就可以触发 NameNodeRpeServer 对象响应这个 RPC 请求了。

2.1.4. 初始化 Server

NameNodeRpcServer 的构造方法调用了 RPC.Server.build() 方法构造 Server 对象

build() 参数 protocol(ClientNamenodeProtocolPB.class) 和 impl (clientNNPBService)设置了 ClientNamenodeProtocolPB 协议与它的 BlockingService 响应类的对应关系

也就是说,ClientNamenodeProtocolPB 的调用会由 BlockingService 对象 clientNNPBService 来响应。

来看看👀~build()~~~

1
2
3
4
5
6
7
8
9
this.serviceRpcServer = new RPC.Builder(conf)
.setProtocol(
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class)
.setInstance(clientNNPbService)
.setBindAddress(bindHost)
.setPort(serviceRpcAddr.getPort()).setNumHandlers(serviceHandlerCount)
.setVerbose(false)
.setSecretManager(namesystem.getDelegationTokenSecretManager())
.build();
  1. build() 方法首先调用 getProtocolEngine() 获取当前 RPC 类配置的 RpcEngine 对象

    在 NameNodeRpcServer 的构造方法中己经将当前 RPC 类的 RpcEngine 对象设置为 ProtobufRpcEngine了。

  2. 调用 getServer()方法获取一个 RPC Server 对象的引用

    客户端获取 Proxy 对象时是通过调用 RpcEngine.getProxy()方法实现的,对于不同的 RpcEngine 获取的 Proxy 对象是不同的。对于
    底层来说,就是 RPC 请求所使用的序列化工具是不同的。同理,对于 RPC.Server 来说,不同的 RpcEngine 构造 RPC.Server 对象也是使用不同的反序列化工具,ProtobufRpcEngine.getServer()会返回使用 protobuf 作为反序列化工具的服务器,这个 RPC.Server 对象是 ProtobufRpcEngine的内部类 Server (RPC.Server 的子类),getServer()方法会构造这个对象并返回。

    ProtobufRpcEngine

    ProtobufRpcEngine.Server 是 RPC.Server 的子类,它的构造方法首先调用父类 RPC.Server 的构造方法,之后构造方法会调用 registerProtocolAndlmpl() 方法注册接口类 protocolClass 和实现类 protocolImpl 的映射关系。这样,当客户端的 RPC 请求到达时,就可以通过这个映射关系获得具体的实现类了。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    public Server(Class<?> protocolClass, Object protocolImpl,
    Configuration conf, String bindAddress, int port, int numHandlers,
    int numReaders, int queueSizePerHandler, boolean verbose,
    SecretManager<? extends TokenIdentifier> secretManager,
    String portRangeConfig)
    throws IOException {
    // 父类
    super(bindAddress, port, null, numHandlers,
    numReaders, queueSizePerHandler, conf, classNameBase(protocolImpl
    .getClass().getName()), secretManager, portRangeConfig);
    this.verbose = verbose;
    registerProtocolAndImpl(RPC.RpcKind.RPC_PROTOCOL_BUFFER, protocolClass, protocolImpl);
    }

    ProtobufRpcEngine.Server 类最重要的部分就是实现了一个 ProtoBufRpcInvoker 类,当 RPC.Server 类解析出来自网络的 RPC 请求后,会调用 ProtoBufRpcInvoker.call() 方法响应这个请求。

    call() 方法首先会从请求头中提取出 RPC 调用的接口名和方法名等信息,然后根据调用的接口信息获取对应的 BlockingService 对象,再根
    据调用的方法信息在 Blocking Service 对象上调用 callBlockingMethod() 方法并将调用前转到
    ClientNamenodeProtocolServerSideTranslatorPB 对象上,最终这个请求会由 NameNodeRpcServer 响应。

    通过对 call() 方法的分析我们知道,客户端在什么接口上调用什么方法是在请求头 requestHeader 中保存的,ProtoBufRpcInvoker 获取了接口信息之后,会调用 getProtocolImpl() 方法通过接口名获取对应的实现类。这个映射信息是在 RPC.Server 构造时创建的,同时也可以在 NameNodeRpcServer 的构造方法中调用 DFSUtil.addPBProtocol() 方法添加。

    getProtocolImpl()方法的实现如下:

    通过接口信息获取对应的 BlockingService 实现对象后,call() 方法会在这个 BlockingService 对象上调用 BlockingService.callBlockingMethod() 方法

  1. 初始化 serviceRpcserver 对象

    这个对象用于响应來自 Datanode 的请求

    • 构造 serviceRpcServer 对象,并配置 ClientProtocolPB 的响应类为 clientNNPbService
  2. 构造 clientRpcServer

    用于响应来自 HDFS 客户端的 RPC 请求

三、总结

NameNodeRpcServer

2.3.1. 服务器获取 Server 对象

Namenode 定义了 NameNodeRpcServer 类响应来自 HDFS 集群中其他节点的 RPC 请求,NameNodeRpcServer 实现了包括 ClientProtocol、 NamenodeProtocol、DatanodeProtocol 以及 HAServiceProtocol 在内的所有需要与 Namenode 交互的RPC 协议接口。

Namenode 启动时 initialize()方法调用 createRpeServer()创建 NameNodeRpcServer对象的实例,createRpcServer() 直接调用 NameNodeRpcServer 的构造方法,首先设置 RPC 序列化引擎为 protobuf

1
RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class);

然后构造两个 RPC.Server 对象: clientRpcServer 和 serviceRpcServer

  1. clientRpcServer

    NameNodeRpcServer 会启动 clientRpcServer 监听来自 HDFS 客户端的所有 RPC 请求,当 RPC Server 在网络上监听到一个 RPC 请求时,它会从网络中解析这个请求,然后构造一个 ProtoBufRpclnvoker 对象来处理这个请求。

    1
    2
    3
    4
    5
    6
    this.clientRpcServer = new RPC.Builder(conf)
    .setProtocol(org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class)
    .setInstance(clientNNPbService).setBindAddress(bindHost)
    .setPort(rpcAddr.getPort()).setNumHandlers(handlerCount)
    .setVerbose(false)
    .setSecretManager(namesystem.getDelegationTokenSecretManager()).build();

    clientRpcServer 设置 protocol 参数为 ClientNamenodeProtocolPB,同时设置了 impl 参数为 clientNNPBService。两个参数设置了 ClientNamenodeProtocolPB 协议与它的 BlockingService 响应类的对应关系,也就是说,ClientNamenodeProtocolPB 的调用会由 BlockingService 对象 clientNNPBService 来响应。

    ProtocolPB 协议接口与BlockingService 对象之间的映射关系保证了 RPC 请求到达 Server 后,Server 可以找到正确的响应类来执行相应操作。

    —还是以 rename() 方法为例: —

    callBlockingMethod() 方法根据调用方法信息判断这是一个 ClientNamenodeProtocolPB.rename()调用,它会在ClientNamenodeProtocolServerSideTranslatorPB对象上调用 rename(RenameRequestProto)方法响应,ClientNamenodeProtocolServerSideTranslatorPB.rename()方法会将 RenameRequestProto 参数反序列化成两个 String 参数,然后在自己持有的实现了 ClientProtocol 接口的 NameNodeRpcServer 上调用 rename(String,String)方法响应,NameNodeRpcServer.rename()方法会在 Namenode 的命名空间中更改指定 HDFS 文件的名称,最后返回响应信息。

  2. serviceRpcServer

    serviceRpcServer 则用于响应来自 Datanode 的 RPC 请求

    1
    2
    3
    4
    5
    6
    7
    8
    this.serviceRpcServer = new RPC.Builder(conf)
    .setProtocol(org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class)
    .setInstance(clientNNPbService)
    .setBindAddress(bindHost)
    .setPort(serviceRpcAddr.getPort()).setNumHandlers(serviceHandlerCount)
    .setVerbose(false)
    .setSecretManager(namesystem.getDelegationTokenSecretManager())
    .build();

来具体看看 build()方法到底做了什么😉~~~

build()方法首先调用 getProtocolEngine()获取当前 RPC 类配置的 RpcEngine 对象,在NameNodeRpeServer 的构造方法中已经将当前 RPC 类的 RpeEngine 对象设置为ProtobufRpcEngine 了。获取了 ProtobufRpcEngine 对象之后,build() 方法会在 ProtobufRpcEngine对象上调用 getServer()方法获取一个 RPC.Server 对象的引用。

对于 RPC.Server 来说,不同 RpcEngine 构造 RPC.Server 对象也是使用不同的反序列化工具,ProtobufRpcEngine.getServer() 会返回使用 protobuf 作为反序列化工具的RPC.Server

ProtobufRpcEngine.Server 是 RPC.Server 的子类,它的构造方法首先调用父类 RPC.Server 的构造方法,之后构造方法会调用 registerProtocolAndlmpl()方法注册接口类 protocolClass 和实现类 protocollmpl 的映射关系。这样,当客户端的 RPC 请求到达时,就可以通过这个映射关系获得具体的实现类了。

ProtobufRpcEngine.Server 类最重要的部分就是实现了一个 ProtoBufRpcInvoker 类,当RPC.Server 类解析出来自网络的 RPC 请求后,会调用 ProtoBufRpclnvoker.call()方法响应这个请求。

call() 方法首先会从请求头中提取出 RPC 调的接口名和方法名等信息,然后根据调用的接口信息获取对应的 BlockingService 对象

1
2
3
String methodName = rpcRequest.getMethodName();
String protoName = rpcRequest.getDeclaringClassProtocolName();
long clientVersion = rpcRequest.getClientProtocolVersion();

再根据调用的方法信息在 BlockingService 对象上调用 callBlockingMethod()方法并将调用前转到 ClientNamenodeProtocolServerSideTranslatorPB 对象上,最终这个请求会由 NameNodeRpcServer 响应。

1
2
3
4
try {
server.rpcDetailedMetrics.init(protocolImpl.protocolClass);
result = service.callBlockingMethod(methodDescriptor, null, param);
}

ClientNamenodeProtocolServerSideTranslatorPB 接收到NameNodeRpcServer 的响应信息后会将这个响应包裝成一个 protobuf 序列化的RenameResponseProto 对象,然后返回到 ProtoBufRpclnvoker 对象。

1
2
3
4
try {
boolean result = server.rename(req.getSrc(), req.getDst());
return RenameResponseProto.newBuilder().setResult(result).build();
}

ProtoBufRpclnvoker 接收到 RenameResponseProto 这个响应对象后,会构造一个 RpcResponse Wrapper 对象包装 RenameResponseProto,然后将这个对象返回给 RPC Client。