Flink-源码学习-通信服务-Flink RPC 设计-客户端
一、概述
二、请求程序
三、Stub 程序
3.1. 协议层
Flink rpc message 包定义了使用的协议类。
在 AkkaRpcActor 中主要创建了 RemoteHandshakeMessage(主要用于进行正式RPC通信之前的网络连接检测,保障 RPC 通信正常)、ControlMessages(用于控制 Akka系统,例如启动和停业Akka Actor等控制消息)等消息对应的处理器,此外还有集群运行时中 RPC 组件通信使用的 Message 类型。
3.2. 代理层
3.2.1. RpcGateway 实现
RpcGateway 又叫作远程调用网关,是对外提供可调用的接口,所有实现 RPC 的组件都实现了此接口。
- JobMasterGateway 接口是 JobMaster 提供的对外服务接口
- TaskExecutorGateway 是 TaskManager(其实现类是TaskExecutor)提供的对外服务接口
- ResourceManagerGateway 是 ResourceManager 资源管理器提供的对外服务接口
- DispatcherGateway 是 Flink 提供的作业提交接口。组件之间的通信行为都是通过 RpcGateway 进行交互的。
Fenced 消息专门用来解决集群脑裂问题,JobMaster、ResourceManager、DispatcherGateway 在高可用模式下,因为涉及 Leader 的选举,可能导致集群的脑裂问题,所以涉及选举的组件,都继承了 FencedRpcGateway。
3.2.2. 代理对象初始化
入口在 RpcService 的 connect(String address, Class
获取 Actor 的引用 ActorRef
1
final CompletableFuture<ActorRef> actorRefFuture = resolveActorAddress(address);
发送握手消息
1
2
3
4
5
6
7
8final CompletableFuture<HandshakeSuccessMessage> handshakeFuture = actorRefFuture.thenCompose(
(ActorRef actorRef) -> FutureUtils.toJava(Patterns.ask(actorRef, new RemoteHandshakeMessage(clazz,
getVersion()),
configuration.getTimeout().toMilliseconds()).<HandshakeSuccessMessage>mapTo(
ClassTag$.MODULE$.<HandshakeSuccessMessage>apply(HandshakeSuccessMessage.class
)
))
);通过反射机制异步创建代理对象并返回
1
2
3
4
5
6
7
8return actorRefFuture.thenCombineAsync(handshakeFuture, (ActorRef actorRef, HandshakeSuccessMessage ignored) -> {
InvocationHandler invocationHandler = invocationHandlerFactory.apply(actorRef);
ClassLoader classLoader = getClass().getClassLoader();
C proxy = (C) Proxy.newProxyInstance(classLoader, new Class<?>[]{clazz}, invocationHandler);
return proxy;
}, actorSystem.dispatcher());
}
3.3. 路由层
RpoServer 中提供的 RpcGateway 接口方法,最终都会通过 AkkaInvocationHandler() 方法进行代理实现。
AkkaInvocationHandler 中根据在本地执行还是远程执行将代理方法进行区分。在触发和执行指定方法之前,先判断当前方法的 DeclaringClass 是否为基本实现接口,如果是则直接调用 method.invoke() 方法执行本地代理;如果不是,则调用 invokeRpc(method, args) 方法触发远程 RPC 接口的触发。
1 | public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { |
3.3.1. 本地执行
通常情况下,RpcEndpoint 实现类除了调用指定服务组件的 RpcGateway 接口(DispatcherGateway、JobMasterGateway…)之外,其余的 RpcGateway 接口基本上都是本地调用和执行的。本地接口主要有 AkkaBasedEndpoint、 RpcGateway、StartStoppable、MainThreadExecutable 和 RpcServer 等,这些接口方法都是通过 AkkaInvocationHandler 代理类通过动态代理的方法实现的。
例如 ResourceManager 组件需要执行定时代码块时,会调用 RpcEndpoint.scheduleRunAsync() 方法,最终调用 AkkaInvocationHandler.scheduleRunAsync() 方法执行定时线程服务。
3.3.2. 远程 RPC 调用
另外一种方法是远程调用,此时会在 AkkaInvocationHandler 中创建 RpcInvocatioMessage,并发送 RpcInvocationMessage 到指定地址的远端进程中,远端的 RpcEndpoint 会接收 RpcInvocatioMessage 并通过 Akka 进行反序列化,然后调用底层的动态代理类实现进程内部的方法调用。
AkkaInvocationHandler.invokeRpc()方法主要包含如下逻辑:
获取被调用的 RpcGateway 接口的 methodName、 parameterTypesparameterAnnotations 等参数信息,并基于这些参数调用 createRpcInvocationMessage() 方法创建 RpcInvocationMessage。
1
2
3
4
5
6String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
Annotation[][] parameterAnnotations = method.getParameterAnnotations();
Time futureTimeout = extractRpcTimeout(parameterAnnotations, args, timeout);
// 创建 RpcInvocation
final RpcInvocation rpcInvocation = createRpcInvocationMessage(methodName, parameterTypes, args);RpcInvocation 结构中封装了RPC访问过程中使用到的关键信息,包括 MethodName、 ParameterType 以及 Args 等参数在 AkkaInvocationHandler.createRpcInvocationMessage() 方法中,AkkaInvocationHandler 首先判断 RpcEndpoint 是否为本地 Actor,然后根据
isLocal
是否为 True 将 RpcInvocation 分为 LocalRpcInvocation 和 RemoteRpcInvocation 两种类型。其中 LocalRpcInvocation 消息不需要做序列化,例如集群运行时中 Dispatcher 和 ResourceManager 在同一个进程中,此时会创建LocalRpcInvocation。RemoteRpcInvocation 则需要进行序列化处理,以保证跨网络节点的数据能够正常传输,主要用于 TaskExecutor和ResourceManager 之间的 RPC 通信等。
判断被调用的方法返回值是否为 Void 类型,如果是则直接调用 tell(rpcInvocation) 方法
Akka 中的 tell()方法是没有返回值的
1
2
3
4
5
6
7Class<?> returnType = method.getReturnType();
final Object result;
// 如果方法返回的是 Void 类型,则直接调用 tell(rpcInvocation) 方法
if (Objects.equals(returnType, Void.TYPE)) {
tell(rpcInvocation);
result = null;
}如果被调用方法返回非 Void 类型,就会调用 ask(rpcInvocation, futureTimeout)方法创建 CompletableFuture,并判断 CompletableFuture 中的对象是否可以序列化,如果不能有效序列化则抛出异常。
1
2
3
4
5
6
7
8
9
10
11
12
13// 否则调用 Ask,执行异步调用
CompletableFuture<?> resultFuture = ask(rpcInvocation, futureTimeout);
// 对返回的数据进行处理,反序列化处理成 Object 类型数据并返回
CompletableFuture<?> completableFuture = resultFuture.thenApply((Object o) -> {
if (o instanceof SerializedValue) {
try {
return ((SerializedValue<?>) o).deserializeValue(getClass().getClassLoader());
} catch (IOException | ClassNotFoundException e) {
throw new CompletionException("...");
}
}
});
判断方法返回的 returnType 是否和 CompletableFuture 类型一致,如果是则将返回的结果设置为 completableFuture,说明接口本身就是异步的,即返回值为 CompletableFuture
1
2
3
4// 如果返回接口和方法的returnType一致,则直接返回
if (Objects.equals(returnType, CompletableFuture.class)) {
result = completableFuture;
}如果 returnType 不为 CompletableFuture 类型,则调用 completableFuture.get() 方法同步获取返回结果
1
result = completableFuture.get(futureTimeout.getSize(), futureTimeout.getUnit());
3.4. 序列化层
Flink 远程 RemoteRpcInvocation 需要经过序列化
1 | public RemoteRpcInvocation( |
四、传输层 Client
Akka 支持多种传输协议用于进程间的通信.单进程 Actor 系统默认协议是 akka://
如果使用的远程或者集群,则通常会使用 akka.tcp://
或者 akka.udp://
在节点之间进行通信
五、总结
5.1. RPC调用流程
1. 通过客户端代理对象调用 RpcGateway 的方法会交由 invoke 方法执行。
2. invoke 将方法、参数信息封装为 RpcInvocation 对象,并通过 ActorRef 将消息发送给服务端 Actor。
如果执行的方法有返回值就使用Akka ask方式,否则以tell方式发送消息。 通过连接的服务端的地址可以判断出服务端在远程还是本地。 如果在远程,消息类型为RemoteRpcInvocation,实现了序列化接口,对象可序列化传输。(会判断methodName + parameterTypes + args序列化后的字节数是否超时指定的值,见参数akka.remote.netty.tcp.maximum-frame-size) 如果在本地,消息类型为LocalRpcInvocation。
3. 服务端Actor收到RpcInvocation消息,会从中获取到方法名、方法参数等相关信息,在主线程中通过反射的方式调用代理对象对应方法执行业务逻辑,如果方法有返回值,还会以tell方法告知客户端结果。
1 | class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, RpcServer{ |