一、概述

客户端的请求会经过路由层,通过路由层内部的规则去匹配对应的 provider 服务~

二、实现

2.1. 客户端

Hadoop RPC 巧妙地使用了 Java 动态代理机制,ClientNamenodeProtocolTranslatorPB 持有的 ClientNamenodeProtocolPB 对象其实是通过 Java动态代理机制获取的一个 ClientNamenodeProtocolPB 接口的代理对象,调用 RPC.getProtocolProxy() 方法获取,这个代理对象内部封装了一个 ProtobufRpcEngine.Invoker对象。对 ClientNamenodeProtocolPB 接口的调用都会由这个 Invoker 对象的 invoke()方法代理。

invoke2

Invoker.invoke() 方法会首先构造一个描述 RPC 调用信息的对象 RequestHeaderProto,记录了客户端在什么协议上调用了什么方法(在 ClientProtocol 协议上调用了 rename 方法)

1
RequestHeaderProto rpcRequestHeader = constructRpcRequestHeader(method);
  1. ProtobufRpcEngine.Invoker.invoker() 方法主要做了三件事情:

    • 构造请求头域,使用 protobuf 将请求头序列化,这个请求头域记录了当前 RPC 调用是什么接口的什么方法上的调用

      1
      RequestHeaderProto rpcRequestHeader = constructRpcRequestHeader(method);
    • 通过 RPC.Client 类发送请求头以及序列化好的请求参数。请求参数是在 ClientNamenodeProtocolPB 调用时就已经序列化好的,调用 Client.call() 方法时,需要将请求头以及请求参数使用一个 RpcRequestWrapper 对象封装

      1
      2
      3
      val = (RpcResponseWrapper) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
      new RpcRequestWrapper(rpcRequestHeader, theRequest), remoteId,
      fallbackToSimpleAuth);
    • 获取响应信息,序列化响应信息并返回

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      Message prototype = null;
      try {
      // 获取返回参数类型: RenameResponseProto
      prototype = getReturnProtoType(method);
      } catch (Exception e) {
      //...
      }
      Message returnMessage;
      try {
      // 序列化响应信息并返回
      returnMessage = prototype.newBuilderForType().mergeFrom(val.theResponseRead).build();
      } catch (Throwable e) {
      //...
      }

然后将 RequestHeaderProto 对象以及调用参数对象 RenameRequestProto 包装成一个 RpcRequestWrapper 对象,调用底层 RPC.Client 类提供的 call()方法将 rename 请求发送到远程服务器了~

1
2
3
val = (RpcResponseWrapper) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
new RpcRequestWrapper(rpcRequestHeader, theRequest), remoteId,
fallbackToSimpleAuth);

Hadoop RPC 使用了 Java 动态代理机制,ClientNamenodeProtocolTranslatorPB 持有的 ClientNamenodeProtocolPB 对象其实是通过 Java 动态代理机制获取的一个 CientNamenodeProtocolPB 接口的代理对象(调用RPC.getProtocolProxy()方法获取),这个代理对象内部封装了一个 ProtobufRpcEngine.Invoker 对象。对 ClientNamenodeProtocolPB 接口的调用都会由这个 Invoker 对象的 invoke() 方法代理,
Invoker.invoke()方法会首先构造一个描述 RPC 调用信息的对象 RequestHeaderProto,记录了客户端在什么协议上调用了什么方法(在 ClientProtocol 协议上调用了 rename 方法),然后将 RequestHeaderProto 对象以及调用参数对象 RenameRequestProto 包
装成一个 RpeRequestWrapper 对象,最后就可以调用底层 RPC.Client 类提供的 call()方法将rename 请求发送到远程服务器了。

由于这里巧妙地使用了动态代理机制,Invoker 调用 RPC.Client.call() 方法发送请求到远端的所有操作对ClienlNamenodeProtocolTranslatorPB 对象都是透明的,客户端调用 ClientNamenodeProtocolTranslatorPB.rename()方法就与调用本地方法一样。

2.2. 服务端

当 rename 请求到达服务器端之后,服务器端是怎么找到对应的响应程序呢🤔️~

截屏2022-08-21 14.07.55

服务器端 Stub 程序会将通信模块接收的数据反序列化,然后调用服务程序对应的方法响应这个 RPC 请求。Server 类是服务器端从网络接收 RPC 请求的类,服务器端调用 RPC.Builder.build()方法获取 Server 对象的引用,成功地接收一个 RPC 请求后,Server 会调用 call()方法响应这个请求。

2.3.1. 获取 BlockingService 对象

ClientNamenodeProtocol.newReflectiveBlockingService() 会构造一个匿名的 BlockingService 对象并返回,这个匿名的 BlockingService 对象定义了一个callBlockingMethod() 方法。

callBlockingMethod()方法接受三个参数

  • method 参数描述了当前RPC 调用的方法信息
  • controller 参数在这里默认为 null,不使用
  • request 参数记录了 RPC 调用的参数信息。

callBlockingMethod() 方法会根据 method 参数记录的调用方法信息,在 impl 引用上调用对应的方法。这里的 impl 引用是 ClientNamenodeProtocolServerSideTranslatorPB 类型,ClientNamenodeProtocolServerSideTranslatorPB 会将 ClientNamenodeProtocolPB 调用的参数反序列化,然后前转到 NamenodeRpcServer 对象上执行 RPC 操作。

这样,Server 对象监听到 RPC 请求后,只需通过请求头域中的接口信息获取对应的 BlockingService 对象,然后在这个 BlockingService 对象上调用 callBlockingMethod() 就可以触发 NameNodeRpcServer 对象响应这个 RPC 请求了。