一、概述

Blob 服务用来管理二进制文件(例如上传 job 的 jar 以及依赖的一些 jar,或者 TaskManager 上传的 log 文件等…)

当向 Flink 提交一个 Job 的时候,blob 服务端将会把用户的代码分发到所有工作节点TaskManager

  1. jar 包

    被 user classloader 使用的 jar 包

  2. 高负荷 RPC 消息

    • RPC 消息长度超出了 akka.framesize 的大小
    • 在 HA 摸式中,利用底层分布式文件系统分发单个高负荷 RPC 消息,比如: TaskDeploymentDescriptor,给多个接受对象。
    • 失败导致重新部署过程中复用RPC消息
  3. TaskManager 的日志文件

    为了在 web ui 上展示taskmanager的日志

二、实现

2.1. 架构设计

2.1.1. BlobStore

BLOB 底层存储支持多种实现 HDFS ,S3,FTP FTP 等,HA 中使用 BlobStore 进行文件的恢复。它有两个实现类,分别是 VoidBlobStore、 FileSystemBlobStore。

BlobStore
  1. VoidBlobStore 实现了 BlobStoreService 接口,它执行空操作
  2. FileSystemBlobStore 实现了 BlobStoreService,它的构造器要求传入filesystem 及 storagePath

2.1.2. BlobKey

BlobKey 是个抽象类,它有 key(byte[])、 type(BlobType)、 random(AbstractID) 三个属性,BlobType 分为 PERMANENT_BLOBTRANSIENT_BLOB ,其 createKey() 方法根据 BlobType 创建 BlobKey

  1. PERMANENT_BLOB
    生命周期和 job 的生命周期一致,并且是可恢复的。会上传到 BlobStore 分布式文件系统中。
  2. TRANSIENT_BLOB
    生命周期由用户自行管理,并且是不可恢复的。不会上传到 BlobStore 分布式文件系统中
BlobKey

2.1.3. BlobServer

BlobServer 负责接收请求,启动线程处理,并负责创建目录存储Blob和临时文件。主要方法是处理文件的存入和读取,文件类型分为永久和临时,永久类型的文件会存入 BlobStore,而临时文件只会存入本地的临时目录。

2.1.4. BlobClient

Blob 客户端

2.1.5. BlobCache

BlobCache,是用于在 TaskExecutor 中维护 Blob 的缓存,以免多次请求下载

2.1.6. LibraryCacheManager

LibraryCacheManager 用来缓存下载得到的目标 jar 包,具体实现类为 BlobLibraryCacheManager。
libraryCacheManager.registerJob(),向 CacheManager 进行注册,请求缓存
调用 BlobLibraryCacheManager 的 registerJob 创建并缓存该 job 的 classloader

1
2
3
4
5
6
7
return new JobManagerRunnerImpl(
jobGraph,
jobMasterFactory,
highAvailabilityServices,
jobManagerServices.getLibraryCacheManager().registerClassLoaderLease(jobGraph.getJobID()),
jobManagerServices.getScheduledExecutorService(),
fatalErrorHandler);

TaskManager在执行Task时,首先会调用LibraryCacheManager的registerTask从BlobServer下载相应的jar包并创建
Classloader

1
2
3
4
5
6
private final Map<JobID, LibraryCacheEntry> cacheEntries = new HashMap<>();	
public ClassLoaderLease registerClassLoaderLease(JobID jobId) {
synchronized(lockObject) {
return cacheEntries.computeIfAbsent(jobId, jobID -> new LibraryCacheEntry(jobId)).obtainLease();
}
}

首先执行 lilbraryCacheManager.registerClassLoaderLease(),向 CacheManager 进行注册

1
2
3
4
5
6
7
try {
userCodeLoader = classLoaderLease.getOrResolveClassLoader(
jobGraph.getUserJarBlobKeys(),
jobGraph.getClasspaths());
} catch (IOException e) {
throw new Exception("Cannot set up the user code libraries: " + e.getMessage(), e);
}

然后执行 getOrResolveClassLoader() 来加载用户的代码加载器

三、jar 包管理

3.1. Standalone

JobManager 启动时会创建 BlobStore、BlobServer、BlobLibraryCacheManager, TaskManager 注册到 JobManager 后会创建 BlobCacheService、 BlobLibraryCacheManager。JobClient 在向集群提交 job 的过程中会调用 JobSubmissionClientActor 的 tryToSubmitJob() 方法进而调用 JobGraph 对象的 uploadUserJars() 方法

uploadAndSetUserJars