一、概述

flink rpc client

二、请求程序

三、Stub 程序

3.1. 协议层

Flink rpc message 包定义了使用的协议类。

在 AkkaRpcActor 中主要创建了 RemoteHandshakeMessage(主要用于进行正式RPC通信之前的网络连接检测,保障 RPC 通信正常)、ControlMessages(用于控制 Akka系统,例如启动和停业Akka Actor等控制消息)等消息对应的处理器,此外还有集群运行时中 RPC 组件通信使用的 Message 类型。

image-20221227222823158

3.2. 代理层

3.2.1. RpcGateway 实现

RpcGateway 又叫作远程调用网关,是对外提供可调用的接口,所有实现 RPC 的组件都实现了此接口。

RpcGateway
  • JobMasterGateway 接口是 JobMaster 提供的对外服务接口
  • TaskExecutorGateway 是 TaskManager(其实现类是TaskExecutor)提供的对外服务接口
  • ResourceManagerGateway 是 ResourceManager 资源管理器提供的对外服务接口
  • DispatcherGateway 是 Flink 提供的作业提交接口。组件之间的通信行为都是通过 RpcGateway 进行交互的。

Fenced 消息专门用来解决集群脑裂问题,JobMaster、ResourceManager、DispatcherGateway 在高可用模式下,因为涉及 Leader 的选举,可能导致集群的脑裂问题,所以涉及选举的组件,都继承了 FencedRpcGateway。

3.2.2. 代理对象初始化

入口在 RpcService 的 connect(String address, Class clazz)方法。

  1. 获取 Actor 的引用 ActorRef

    1
    final CompletableFuture<ActorRef> actorRefFuture = resolveActorAddress(address);
  2. 发送握手消息

    1
    2
    3
    4
    5
    6
    7
    8
    final CompletableFuture<HandshakeSuccessMessage> handshakeFuture = actorRefFuture.thenCompose(
    (ActorRef actorRef) -> FutureUtils.toJava(Patterns.ask(actorRef, new RemoteHandshakeMessage(clazz,
    getVersion()),
    configuration.getTimeout().toMilliseconds()).<HandshakeSuccessMessage>mapTo(
    ClassTag$.MODULE$.<HandshakeSuccessMessage>apply(HandshakeSuccessMessage.class
    )
    ))
    );
  3. 通过反射机制异步创建代理对象并返回

    1
    2
    3
    4
    5
    6
    7
    8
    return actorRefFuture.thenCombineAsync(handshakeFuture, (ActorRef actorRef, HandshakeSuccessMessage ignored) -> {
    InvocationHandler invocationHandler = invocationHandlerFactory.apply(actorRef);
    ClassLoader classLoader = getClass().getClassLoader();
    @SuppressWarnings("unchecked")
    C proxy = (C) Proxy.newProxyInstance(classLoader, new Class<?>[]{clazz}, invocationHandler);
    return proxy;
    }, actorSystem.dispatcher());
    }

3.3. 路由层

RpoServer 中提供的 RpcGateway 接口方法,最终都会通过 AkkaInvocationHandler() 方法进行代理实现。
AkkaInvocationHandler 中根据在本地执行还是远程执行将代理方法进行区分。在触发和执行指定方法之前,先判断当前方法的 DeclaringClass 是否为基本实现接口,如果是则直接调用 method.invoke() 方法执行本地代理;如果不是,则调用 invokeRpc(method, args) 方法触发远程 RPC 接口的触发。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Class<?> declaringClass = method.getDeclaringClass();
Object result;
// 本地执行
if(declaringClass.equals(AkkaBasedEndpoint.class) || declaringClass.equals(Object.class) || declaringClass
.equals(RpcGateway.class) || declaringClass.equals(StartStoppable.class) || declaringClass
.equals(MainThreadExecutable.class) || declaringClass.equals(RpcServer.class)) {
result = method.invoke(this, args);
} else if(declaringClass.equals(FencedRpcGateway.class)) {
// ...
} else {
// 远程 rpc 调用
result = invokeRpc(method, args);
}
return result;
}

3.3.1. 本地执行

通常情况下,RpcEndpoint 实现类除了调用指定服务组件的 RpcGateway 接口(DispatcherGateway、JobMasterGateway…)之外,其余的 RpcGateway 接口基本上都是本地调用和执行的。本地接口主要有 AkkaBasedEndpoint、 RpcGateway、StartStoppable、MainThreadExecutable 和 RpcServer 等,这些接口方法都是通过 AkkaInvocationHandler 代理类通过动态代理的方法实现的。

例如 ResourceManager 组件需要执行定时代码块时,会调用 RpcEndpoint.scheduleRunAsync() 方法,最终调用 AkkaInvocationHandler.scheduleRunAsync() 方法执行定时线程服务。

3.3.2. 远程 RPC 调用

另外一种方法是远程调用,此时会在 AkkaInvocationHandler 中创建 RpcInvocatioMessage,并发送 RpcInvocationMessage 到指定地址的远端进程中,远端的 RpcEndpoint 会接收 RpcInvocatioMessage 并通过 Akka 进行反序列化,然后调用底层的动态代理类实现进程内部的方法调用。

总结

AkkaInvocationHandler.invokeRpc()方法主要包含如下逻辑:

  1. 获取被调用的 RpcGateway 接口的 methodName、 parameterTypesparameterAnnotations 等参数信息,并基于这些参数调用 createRpcInvocationMessage() 方法创建 RpcInvocationMessage。

    1
    2
    3
    4
    5
    6
    String methodName = method.getName();
    Class<?>[] parameterTypes = method.getParameterTypes();
    Annotation[][] parameterAnnotations = method.getParameterAnnotations();
    Time futureTimeout = extractRpcTimeout(parameterAnnotations, args, timeout);
    // 创建 RpcInvocation
    final RpcInvocation rpcInvocation = createRpcInvocationMessage(methodName, parameterTypes, args);

    RpcInvocation 结构中封装了RPC访问过程中使用到的关键信息,包括 MethodName、 ParameterType 以及 Args 等参数在 AkkaInvocationHandler.createRpcInvocationMessage() 方法中,AkkaInvocationHandler 首先判断 RpcEndpoint 是否为本地 Actor,然后根据 isLocal 是否为 True 将 RpcInvocation 分为 LocalRpcInvocation 和 RemoteRpcInvocation 两种类型。

    其中 LocalRpcInvocation 消息不需要做序列化,例如集群运行时中 Dispatcher 和 ResourceManager 在同一个进程中,此时会创建LocalRpcInvocation。RemoteRpcInvocation 则需要进行序列化处理,以保证跨网络节点的数据能够正常传输,主要用于 TaskExecutor和ResourceManager 之间的 RPC 通信等。

  2. 判断被调用的方法返回值是否为 Void 类型,如果是则直接调用 tell(rpcInvocation) 方法

    Akka 中的 tell()方法是没有返回值的

    1
    2
    3
    4
    5
    6
    7
    Class<?> returnType = method.getReturnType();
    final Object result;
    // 如果方法返回的是 Void 类型,则直接调用 tell(rpcInvocation) 方法
    if (Objects.equals(returnType, Void.TYPE)) {
    tell(rpcInvocation);
    result = null;
    }
  3. 如果被调用方法返回非 Void 类型,就会调用 ask(rpcInvocation, futureTimeout)方法创建 CompletableFuture,并判断 CompletableFuture 中的对象是否可以序列化,如果不能有效序列化则抛出异常。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    // 否则调用 Ask,执行异步调用
    CompletableFuture<?> resultFuture = ask(rpcInvocation, futureTimeout);
    // 对返回的数据进行处理,反序列化处理成 Object 类型数据并返回
    CompletableFuture<?> completableFuture = resultFuture.thenApply((Object o) -> {
    if (o instanceof SerializedValue) {
    try {
    return ((SerializedValue<?>) o).deserializeValue(getClass().getClassLoader());
    } catch (IOException | ClassNotFoundException e) {
    throw new CompletionException("...");
    }
    }
    });

  4. 判断方法返回的 returnType 是否和 CompletableFuture 类型一致,如果是则将返回的结果设置为 completableFuture,说明接口本身就是异步的,即返回值为 CompletableFuture

    1
    2
    3
    4
    // 如果返回接口和方法的returnType一致,则直接返回
    if (Objects.equals(returnType, CompletableFuture.class)) {
    result = completableFuture;
    }
  5. 如果 returnType 不为 CompletableFuture 类型,则调用 completableFuture.get() 方法同步获取返回结果

    1
    result = completableFuture.get(futureTimeout.getSize(), futureTimeout.getUnit());

3.4. 序列化层

Flink 远程 RemoteRpcInvocation 需要经过序列化

1
2
3
4
5
6
7
public  RemoteRpcInvocation(
final String methodName,
final Class<?>[] parameterTypes,
final Object[] args) throws IOException {
serializedMethodInvocation = new SerializedValue<>(new RemoteRpcInvocation.MethodInvocation(methodName, parameterTypes, args));
methodInvocation = null;
}

四、传输层 Client

Akka 支持多种传输协议用于进程间的通信.单进程 Actor 系统默认协议是 akka:// 如果使用的远程或者集群,则通常会使用 akka.tcp:// 或者 akka.udp:// 在节点之间进行通信

五、总结

5.1. RPC调用流程

1. 通过客户端代理对象调用 RpcGateway 的方法会交由 invoke 方法执行。

2. invoke 将方法、参数信息封装为 RpcInvocation 对象,并通过 ActorRef 将消息发送给服务端 Actor。

如果执行的方法有返回值就使用Akka ask方式,否则以tell方式发送消息。 通过连接的服务端的地址可以判断出服务端在远程还是本地。 如果在远程,消息类型为RemoteRpcInvocation,实现了序列化接口,对象可序列化传输。(会判断methodName + parameterTypes + args序列化后的字节数是否超时指定的值,见参数akka.remote.netty.tcp.maximum-frame-size) 如果在本地,消息类型为LocalRpcInvocation。

3. 服务端Actor收到RpcInvocation消息,会从中获取到方法名、方法参数等相关信息,在主线程中通过反射的方式调用代理对象对应方法执行业务逻辑,如果方法有返回值,还会以tell方法告知客户端结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, RpcServer{
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Class<?> declaringClass = method.getDeclaringClass();
Object result;
// 非Rpc方法,直接本地执行。这个是服务端通过自己的代理对象RpcServer调用自己非Rpc方法时走的逻辑
if (declaringClass.equals(AkkaBasedEndpoint.class)
|| declaringClass.equals(Object.class)
|| declaringClass.equals(RpcGateway.class)
|| declaringClass.equals(StartStoppable.class)
|| declaringClass.equals(MainThreadExecutable.class)
|| declaringClass.equals(RpcServer.class)) {
result = method.invoke(this, args);
} else if (declaringClass.equals(FencedRpcGateway.class)) { // 支持HA的见FencedAkkaInvocationHandler
throw new UnsupportedOperationException(...);
} else {
// RPC方法,指RpcGateway子接口中定义的方法
// 接口:ResourceManagerGateway、DispatcherGateway、JobMasterGateway、MetricQueryServiceGateway、TaskExecutorGateway
result = invokeRpc(method, args);
}

return result;
}
private Object invokeRpc(Method method, Object[] args) throws Exception {
...
// 1) 封装消息
final RpcInvocation rpcInvocation = createRpcInvocationMessage(methodName, parameterTypes, args);
// 2) 借助akka发送消息,进行RPC调用
Class<?> returnType = method.getReturnType();
final Object result;
if (Objects.equals(returnType, Void.TYPE)) {
// 无返回值,用akka tell模式
tell(rpcInvocation);
result = null;
} else {
...
// 有返回值,用akka ask模式
final CompletableFuture<?> resultFuture = ask(rpcInvocation, futureTimeout);
...
}
return result;
}

}