一、概述

flink rpc server

二、传输层 Server

Akka 中 创建 actor 需要继承 AbstractActor 类并重写它的初始行为方法 createReceive(),actor 接收消息后会触发 createReceive() 方法被调用,通过 receiveBuilder() 来接收消息以及它的类型,从而判断该如何处理消息。

1
2
3
4
5
6
public Receive createReceive() {
return ReceiveBuilder.create()
.match(RemoteHandshakeMessage.class, this::handleHandshakeMessage)
.match(ControlMessages.class, this::handleControlMessage)
.matchAny(this::handleMessage).build();
}

AkkaRpcActor 接收到的消息总共有3种。

  1. 握手消息

在客户端构造时会通过 ActorSelection 发送过来。收到消息后会检查接口、版本是否匹配,如果一致就返回成功

  1. 控制消息

    在 RpcEndpoint 调用 start 方法后,会向自身发送一条 Processing START 消息来转换当前 Actor 的状态为 STARTED,STOP 也类似,并且只有在 Actor 状态为 STARTED 时才会处理 RPC 请求

  2. RPC消息
    通过解析 RpcInvocation 获取方法名和参数类型,并从 RpcEndpoint 类中找到 Method 对象,通过反射调用该方法。如果有返回结果,会以Akka 消息的形式发送回发送者。

三、Stub 程序

3.1. 序列化层

3.2. 协议层

Flink rpc message 包定义了使用的协议类。

在 AkkaRpcActor 中主要创建了 RemoteHandshakeMessage(主要用于进行正式RPC通信之前的网络连接检测,保障 RPC 通信正常)、ControlMessages(用于控制 Akka系统,例如启动和停业Akka Actor等控制消息)等消息对应的处理器,此外还有集群运行时中 RPC 组件通信使用的 Message 类型。

image-20221227222823158

3.3. 路由层

3.3.1. akka 路由实现

在 RpcEndpoint 中创建的 RemoteRpcInvocation 消息,RPC 消息通过 RpcEndpoint 所绑定的 Actor 的 ActorRef 发送,最终会通过 Akka 系统传递到被调用方。

例如 TaskExecutor 向 ResourceManager 发送 SlotReport 请求的时候,会在 TaskExecutor 中将 ResourceManagerGateway 的方法名称和参数打包成 RemoteRpcInvocation 对象。然后经过网络发送到 ResourceManager 中的 AkkaRpcActor,在 ResourceManager 本地执行具体的方法

3.3.2. Java 反射机制

在 AkkaRpcActor.handleMessage() 方法中,最终会调用 handleRpcMessage() 方法继续对 RPC 消息进行处理。根据传入的 RPC 消息进行判别,确定消息是否为 RunAsync、CallAsync 以及 RpcInvocation 等对象类型。如果是 RunAsync 或 CallAsync 等线程实现,则直接调用handleRunAsync() 或 handleCallAsync() 方法将代码块提交到本地线程池中执行。对于RpcInvocation类型的消息,则会调用handleRpcInvocation()方法,通过解析 RpcInvocation 获取方法名和参数类型,并从 RpcEndpoint 类中找到 Method 对象,通过反射调用该方法。如果有返回结果,会以 Akka 消息的形式发送回发送者。

1
2
3
4
5
6
7
8
9
10
11
protected void handleRpcMessage(Object message) {
if (message instanceof RunAsync) {
handleRunAsync((RunAsync) message);
} else if (message instanceof CallAsync) {
handleCallAsync((CallAsync) message);
} else if (message instanceof RpcInvocation) {
handleRpcInvocation((RpcInvocation) message);
} else {
sendErrorIfSender("");
}
}

AkkaRpcActor.handleRpcInvocation()方法主要包含如下逻辑:

  1. 从 RpcInvocation 对象中获取调用的 methodName 以及相应的 parameterTypes 参数信息,然后调用 lookupRpcMethod() 方法判断当前的 RpcEndpoint 是否实现了指定的 Method 名称

    例如JobMaster调用ResourceManagerGateway.requestSlot() 方法,会在 lookupRpcMethod() 方法中判断当前 ResourceManager 实现的 Endpoint 是否提供了该方法的实现。

  2. 当 rpcMethod 不为空时,首先调用 rpcMethod.setAccessible(true) 支持匿名类的定义操作,然后判断 rpcMethod 返回类型是否为Void,如果是则调用 rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs()) 触发执行方法,此时不会返回任何返回值。

  3. 如果 rpcMethod 返回类型非 Void,则会调用 rpcMethod.invoke() 触发调用和执行方法,同时获取方法的返回值并赋值给 Object result对象。

  4. 判断 result 是否为 CompletableFuture 类型,如果是则将返回结果转换为 CompletableFuture 对象,然后调用 sendAsyncResponse() 方法通过Akka系统将RpcMethod返回值返回给调用方。

  5. 如果 result 不为 CompletableFuture 类型,则直接调用 sendSyncResponse() 方法将结果返回给调用方。

四、响应程序

在 Flink 中,RpcEndpoint是一个通用的可以对外通讯的基类,Flink 中所有提供远程调用服务的组件(Dispatcher、 JobMaster、ResourceManager、 TaskExecutor等)都继承自 RpcEndpoint 在实例化 RpcEndpoint 时,会创建其相关的通讯 Actor,并且基于 Actor 创建 AkkaInvocationHandler,然后利用动态 Proxy 生成一个 RpcServer 对象,这个 RpcServer 对象就是这个 RpcEndpoint 对外通讯的服务端代理了,通过该代理可以实现start,stop,getAddress等具体行为。
RpcEndpoint 的核心组件是 RpcServer,其 start(), stop(), getSelfGateway(), getAddress() 等绝大部份的方法实现都委托给了
RpcServer 来实现,而 RpcServer 是通过 RpcService 来进行构建,在 Flink中 Rpcservice 的实现是 AkkaRpcService

五、总结