一、概述

客户端的 Stub 可以看作是一个代理对象,它会将请求程序的 RPC 调用序列化,并调用 Client.call() 方法将这个请求发送给远程服务器,这些实现对于客户端请求程序是完全透明的。

proxy

二、初始化

ClientProtocol 对象定义了客户端与名字节点之间的所有接口,Hadoop 2.x 引入了 Namenode 的 HA 机制,也就是说,HDFS 集群中会存在两个 Namenode 实例,同一时间 DFSClient 只会将 ClientProtocol RPC 请求发送给集群中的 Active Namenode。而当集群发生错误切换时,DFSClient 又会将请求发送给新的 Active Namenode, 这些实现对于 DFSClient 来说是透明的,DFSClient 只需在 ClientProtocol 对象上发起 RPC 调用即可。

init
1
2
3
4
5
6
7
8
9
10
11
if (proxyInfo != null) {
this.dtService = proxyInfo.getDelegationTokenService();
this.namenode = proxyInfo.getProxy();
} else if (rpcNamenode != null) {
this.namenode = rpcNamenode;
dtService = null;
} else {
proxyInfo = NameNodeProxies.createProxy(conf, nameNodeUri, ClientProtocol.class, nnFallbackToSimpleAuth);
this.dtService = proxyInfo.getDelegationTokenService();
this.namenode = proxyInfo.getProxy();
}

NameNodeProxies.createProxy()方法就是用于创建支持 HA 机制的 ClientProtocol 代理对象的,它会根据配置文件判断当前 HDFS 集群是否处
于 HA 模式。

  1. 对于处于 HA 模式的情况,createProxy() 方法调用 createFailoverProxyProvider() 方法创建支持 HA 机制的 ClientProtocol 对象
  2. 而对于非 HA 模式的情况,createProxy() 方法则会调用 createNonHAProxy() 方法创建普通的 ClientProtocol 对象
CREATE_PROXY

2.1. 非 HA 模式

非 HA 模式的入口方法是 NameNodeProxies.createNonHAProxy(),它会对 xface 参数也就是 RPC 接口进行判断,然后构造并返回实现了这个接口的对象。对于获取 ClientProtocol 代理的情况,createNonHAProxy 会调用 createNNProxyWithClientProtocol() 方法创建实现了
ClientProtocol 接口的 ClientNamenodeProtocolTranslatorPB 对象。

以 ClientProtocol 为例~👀createNNProxyWithClientProtocol() 方法的实现~

image-20221123205051602

2.1.1. 设置序列化方式

首先会设置当前 RPC 调用序列化方式

1
RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class);

这里配置的 ProtobufRpcEngine 类就定义了当前 RPC 调用是使用protobuf 作为序列化引擎的。

2.1.2. 获取 RetryPolicy

2.1.3. 获取代理对象

调用 RPC.getProtocolProxy() 方法获取 ClientNamenodeProtocolPB 协议的代理对象,然后构造ClientNamenodeProtocolTranslatorPB 对象并返回。

image-20221123205732202

RPC.getProtocolProxy() 用于获取一个指定 RPC 接口的代理对象。

1
2
3
return getProtocolEngine(protocol, conf).getProxy(protocol, clientVersion,
addr, ticket, conf, factory, rpcTimeout, connectionRetryPolicy,
fallbackToSimpleAuth);
  1. getProtocolEngine()

    getProtocolProxy() 会首先调用 getProtocolEngine() 方法通过反射获取当前 RPC 定义的 protocolEngine 对象的实例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    static synchronized RpcEngine getProtocolEngine(Class<?> protocol, Configuration conf) {
    RpcEngine engine = PROTOCOL_ENGINES.get(protocol);
    if (engine == null) {
    Class<?> impl = conf.getClass(ENGINE_PROP+"."+protocol.getName(),WritableRpcEngine.class);
    engine = (RpcEngine)ReflectionUtils.newInstance(impl, conf);
    PROTOCOL_ENGINES.put(protocol, engine);
    }
    return engine;
    }

    以 ClientProtocol 为例,由于 createNNProxyWithClientProtocol() 的第一条语句就是注册 ClientProtocol 的序列化引擎为 ProtobufRpcEngine,使用 protobuf 作为序列化工具,这里 getProtocolEngine() 方法返回的是一个 ProtobufRpcEngine 的实例~

  2. 在 protocolEngine 对象上调用 getProxy() 获取使用特定序列化方式的接口代理对象。

    真正构造代理对象的方法其实是 RpcEngine.getProxy() 方法

    RpcEngine 是一个抽象类,它只定义了接口,具体的实现留给子类去做。ClientProtocol 默认是使用 ProtobufRpcEngine

    首先会构造一个实现了 InvocationHandler 接口的 invoker 对象, 动态代理机制中的 InvocationHandler 对象会在 invoke() 方法中代理所有目标接口上的调用,可以在 invoke() 方法中添加代理操作,接着调用 Proxy.newProxyInstance() 方法构造动态代理对象,将这个对象封装在 ProtocolProxy 对象中并返回。

    由于 Java 动态代理机制决定了在代理对象上的所有调用都会由 InvocationHandler 对象的invoke() 方法代理,所以所有在 ClientNamenodeProtocolPB 代理对象上的调用都会由这个 ProtobufRpcEngine.Invoker 对象的 invoker() 方法代理

2.2. HA 模式

对于 HA 模式,NameNodeProxies 调用 RetryProxy.creat() 方法构造实现了 RPC 协议的对象

RetryProxy 是一个工厂类,它会构造支持 HA 模式的协议对象,这个协议对象会首先尝试连接 HDFS 中的 ActiveNamenode,如果连接失败则会重试,如果重试达到一定的次数,则会切换到 HDFS 集群中的 StandbyNamenode。

1
2
3
4
5
T proxy = (T) RetryProxy.create(xface, failoverProxyProvider, 
RetryPolicies.failoverOnNetworkException(
RetryPolicies.TRY_ONCE_THEN_FAIL, config.maxFailoverAttempts,
config.maxRetryAttempts, config.failoverSleepBaseMillis,
config.failoverSleepMaxMillis));

RetryProxy 的工厂方法 RetryProxy() 调用了 Java 动态代理方法 Proxy.newProxyInstance()

1
2
3
4
5
6
7
8
public static <T> Object create(Class<T> iface,
FailoverProxyProvider<T> proxyProvider, RetryPolicy retryPolicy) {
return Proxy.newProxyInstance(
proxyProvider.getInterface().getClassLoader(),
new Class<?>[] { iface },
new RetryInvocationHandler<T>(proxyProvider, retryPolicy)
);
}

协议的代理对象上的调用都会由 RetryInvocationHandler 的 invoke() 方法统一代理

下面看看👀~ RetryInvocationHandler.invoke()方法的实现。

  1. 首先获取 RetryPolicy 对象

    RetryPolicy 定义了出现调用错误时的重试逻辑。这里默认的 RetryPolicy 是 failoverOnNetworkException。

    1
    2
    3
    4
    RetryPolicy policy = methodNameToPolicyMap.get(method.getName());
    if (policy == null) {
    policy = defaultPolicy;
    }
  2. 通过反射调用 method 对象描述的方法

    1
    Object ret = invokeMethod(method, args);

    invokeMethod() 方法就是在 currentProxy 对象上调用 method 参数描述的方法 currentProxy 是泛型 T 类型——也就是接口 ClientProtocol 类型。

    currentProxy 是实现了 ClientProtocol 协议的对象,它是通过调用 FailoverProxyProvider.getProxy() 获得的。当 Namenode 发生主从切换后,currentProxy 字段会被赋值为新的 ActiveNamenode 对应的 ClientProtocol 的引用,之后在 ClientProtocol 上的调用也就会发送到新的 Active Namenode。

  3. 异常处理

    如果完成上述操作后没有抛出异常,客户端成功地将请求发送到了 ActiveNamenode 服务器。如果抛出异常,则说明远程调用出现了错误,这部分代码在 catch 段处理:

    • 判断操作是否是幂等的

      通过 annotation 判断这个操作是否是幂等的,也就是执行多次是没有问题的,例如 ClientProtocol 中的setReplication()

      1
      2
      3
      4
      5
      6
      7
      8
      boolean isIdempotentOrAtMostOnce = proxyProvider.getInterface()
      .getMethod(method.getName(), method.getParameterTypes())
      .isAnnotationPresent(Idempotent.class);
      if (!isIdempotentOrAtMostOnce) {
      isIdempotentOrAtMostOnce = proxyProvider.getInterface()
      .getMethod(method.getName(), method.getParameterTypes())
      .isAnnotationPresent(AtMostOnce.class);
      }
    • 对于幂等的操作可以再次调用。

      调用 policy.shouldRetry() 判断是否需要执行重试操作

      1
      RetryAction action = policy.shouldRetry(e, retries++, invocationFailoverCount, isIdempotentOrAtMostOnce);

      这里的 RetryPolicy 是默认值 FailoverOnNetworkExceptionRetry

      • 如果失败的次数己经超过最大的次数,就返回一个 RetryAction.RetryDecision.FAIL 的 RetryAction 表明调用失败。

        1
        2
        3
        4
        5
        if (failovers >= maxFailovers) {
        return new RetryAction(RetryAction.RetryDecision.FAIL, 0,
        "failovers (" + failovers + ") exceeded maximum allowed ("
        + maxFailovers + ")");
        }
      • 如果抛出的异常是 ConnectionException、 NoRouteToHostException、 UnKnownHostException、 StandbyException、 RemoteException 中的一个,则说明底层的协议代理对象无法连接到 ActiveNamenode,或者 ActiveNamenode 宕机,或者 HDFS
        集群已经发生主从切换了。在这些情况下,就需要返回一个 RetryAction.RetryDecision.FAILOVER_AND_RETRY 的RetryAction,表明需要执行 performFailover()操作更新 Active Namenode 的引用。

        1
        2
        3
        4
        5
        6
        7
        8
        if (e instanceof ConnectException ||
        e instanceof NoRouteToHostException ||
        e instanceof UnknownHostException ||
        e instanceof StandbyException ||
        e instanceof ConnectTimeoutException ||
        isWrappedStandbyException(e)) {
        return new RetryAction(RetryAction.RetryDecision.FAILOVER_AND_RETRY,
        getFailoverOrRetrySleepTime(failovers));
      • 如果抛出的异常是 SocketException、 IOException 或者其他非 RemoteException 的异常,则无法判断这个 RPC 命令到底是不是执行成功了。可能是本地的 Socket 或者 IO 问题,也可能是 Namenode 端的 Socket 或者 IO 问题。这时就进行进一步的判断:-

        如果被调用的方法是 idempotent,也就是多次执行是没有副作用的,那么就连接另外一个底层代理重试;否则直接返回 RetryAction.RetryDecision.FAIL 表明调用失败。

        1
        2
        3
        4
        5
        6
        7
        8
        if (e instanceof SocketException
        || (e instanceof IOException && !(e instanceof RemoteException))) {
        if (isIdempotentOrAtMostOnce) {
        return RetryAction.FAILOVER_AND_RETRY;
        } else {
        return new RetryAction(RetryAction.RetryDecision.FAIL, 0, "...");
        }
        }
    • 异常处理

      通过异常情况,获得了对应的 RetryAction 之后,就会在 proxyProvider 上调用 performFailover()方法更新 currentProxy。如果是 HA 配置,那么在 Namenode 主从切换后,performFailover() 会更新 currentProxy 到新的 Active Namenode,然后继续循环,
      这样在 currentProxy 上的调用就可以发送到新的 Active Namenode

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      if (action.action == RetryAction.RetryDecision.FAILOVER_AND_RETRY) {
      synchronized (proxyProvider) {
      if (invocationAttemptFailoverCount == proxyProviderFailoverCount) {
      proxyProvider.performFailover(currentProxy.proxy);
      proxyProviderFailoverCount++;
      } else {
      // LOG.warn("...");
      }
      currentProxy = proxyProvider.getProxy();
      }
      invocationFailoverCount++;
      }