一、概述

对于 7x24 小时不间断运行的流式系统来讲,保证集群核心组件的高可用是非常重要的。Flink 集群在运行时中主要通过 HighAvailabilityServices 公共基础服务来创建和初始化核心组件的高可用服务

二、 HighAvailabilityServices

当启动集群运行时的时候,会初始化 HighAvailabilityServices 高可用基础服务实现。
ClusterEntrypoint 会在 initializeServices() 方法中调用 createHaServices() 方法创建 HighAvailabilityServices。

1
haServices = createHaServices(configuration, ioExecutor);

HighAvailabilityServices 主要分为 AbstractNonHaServices 和 ZooKeeperHaServices 两种类型,

HighAvailabilityServices

  1. AbstractNonHaServices 实际上不提供高可用能力
  2. ZooKeeperHaServices 通过 ZooKeeper 实现集群组件服务的高可用。

HighAvailabilityServices 基础服务提供了运行时中所有组件高可用服务需要的方法以及创建 LeaderRetrievalService 和LeaderElectionService 服务的方法。

2.1. LeaderRetrievalService

LeaderRetrievalService 主要用于获取集群中当前活动的 Leader 节点

例如在 TaskExecutor中,需要获取当前集群中 ResourceManager 组件的有效地址,防止因为 ResourceManager 出现故障而导致集群异常。此时 LeaderRetrievalService 负责监听开启高可用服务的组件,如果集群中 ResourceManager 组件的 LeaderShip 发生了切换,会通过监听器通知 TaskExecutor 组件,从而保证整个集群的稳定运行。

2.2. LeaderElectionService

LeaderElectionService 实现了从服务竞选者中选举新 Leader 的功能,且同一时间有效的 Leader 节点有且仅有一个,获得 Leader 角色的组件才可以对外提供服务。

三、实现

3.1. 基于 ZooKeeper 高可用实现

Standalone 模式下的 HA 配置,依赖 Zookeeper 实现。Zookeeper 集群独立于 Flink 集群之外,主要被用来进行 Leader 选举和轻量级状态的一致性存储。
Standalone 模式下实现了 ZooKeeperLeaderRetrievalService 和 ZooKeeperLeaderElectionService,提供了基于 ZooKeeper 实现的 LeaderRetrievalService 和 LeaderElectionService。

3.1.1. 基于 Curator 椻架与 ZooKeeper 系统通信

在 Flink 中使用 Curator 框架实现与 Zookeeper 集群之间的通信。Curator 是开源的一套 ZooKeeper 客户端框架,屏蔽了 Zookeeper 客户端底层的细节开发工作,例如连接重连和异常处理等。

3.1.2. 基于 LeaderElectionService 实现 Leader 竞选

通过 LeaderElectionService 可以对 LeaderContender 进行选举。并产生新的 Leader 对外提供服务在 Flink 集群运行时中实现 LeaderContender 接口的组件主要有 ResourceManager、DefaultDispatcherRunner、WebMonitorEndpoint 和 JobManagerRunnerImpl这些组件都可以通过 LeaderElectionService 在竞选者之间选择可用的 Leader 服务。

1
ZooKeeperLeaderElectionService.start()

ZooKeeperLeaderElectionService 除了实现 LeaderElectionService 接口之外,还实现了 CuratorFramework 中的三个接口:

ZooKeeperLeaderElectionService

  1. LeaderLatchListener

    其中 LeaderLatchListener 用于监听当前的 LeaderContender 是否为 Leader

    ZooKeeperLeaderElectionService 类是 LeaderLatchListener 的子类,所以当该组件在进行选举如果成功的话则会自动调用 isLeader() 方法,否则调用 notLeader() 方法。这是 ZooKeeper 的 API 框架 cruator 的机制

  2. NodeCacheListener

    NodecacheListener 用于监听节点的创建、更新、删除,并将节点的数据缓存在本地。

  3. UnhandledErrorListener

3.1.3.基于 LeaderRetrievalService 获取 Leader 节点

LeaderRetrievalService 接口主要提供了start() 和 stop() 两个方法。

LeaderRetrievalService

启动 LeaderRetrievalService 服务时,参数 LeaderRetrievalListener 监听目标组件 Leader 地址切换,一旦被监听组件的地址发生变化,调用 LeaderRetrievalListener.notifyLeaderAddress()方法通知监听者最新的组件 Leader 节点地址。
在集群运行时中,LeaderRetrievalService 的主要应用场景如下。

  1. JobMaster/TaskExecutor 获取 ResourceManager Leader 连接地址。
  2. TaskExecutor 通过 JobLeaderService 获取 Job 的 Leader 地址。
  3. WebMonitorEndpoint 获取 DispatcherGateway 和 ResourceManagerGateway 的 Leader 地址

3.2. 基于 Kubernetes 高可用实现