Flink-源码学习-FlinkCore-公共基础服务-Blob 服务
一、概述
Blob 服务用来管理二进制文件(例如上传 job 的 jar 以及依赖的一些 jar,或者 TaskManager 上传的 log 文件等…)
当向 Flink 提交一个 Job 的时候,blob 服务端将会把用户的代码分发到所有工作节点TaskManager
1.1. Flink 中支持的 BLOB 文件类型
jar 包
被 user classloader 使用的 jar 包
高负荷 RPC 消息
- RPC 消息长度超出了
akka.framesize
的大小 - 在 HA 摸式中,利用底层分布式文件系统分发单个高负荷 RPC 消息,比如: TaskDeploymentDescriptor,给多个接受对象。
- 失败导致重新部署过程中复用RPC消息
- RPC 消息长度超出了
TaskManager 的日志文件
为了在 web ui 上展示taskmanager的日志
二、实现
2.1. 架构设计
2.1.1. BlobStore
BLOB 底层存储支持多种实现 HDFS ,S3,FTP FTP 等,HA 中使用 BlobStore 进行文件的恢复。它有两个实现类,分别是 VoidBlobStore、 FileSystemBlobStore。
- VoidBlobStore 实现了 BlobStoreService 接口,它执行空操作
- FileSystemBlobStore 实现了 BlobStoreService,它的构造器要求传入filesystem 及 storagePath
2.1.2. BlobKey
BlobKey 是个抽象类,它有 key(byte[])、 type(BlobType)、 random(AbstractID) 三个属性,BlobType 分为 PERMANENT_BLOB
及 TRANSIENT_BLOB
,其 createKey() 方法根据 BlobType 创建 BlobKey
PERMANENT_BLOB
生命周期和 job 的生命周期一致,并且是可恢复的。会上传到 BlobStore 分布式文件系统中。TRANSIENT_BLOB
生命周期由用户自行管理,并且是不可恢复的。不会上传到 BlobStore 分布式文件系统中
2.1.3. BlobServer
BlobServer 负责接收请求,启动线程处理,并负责创建目录存储Blob和临时文件。主要方法是处理文件的存入和读取,文件类型分为永久和临时,永久类型的文件会存入 BlobStore,而临时文件只会存入本地的临时目录。
2.1.4. BlobClient
Blob 客户端
2.1.5. BlobCache
BlobCache,是用于在 TaskExecutor 中维护 Blob 的缓存,以免多次请求下载
2.1.6. LibraryCacheManager
LibraryCacheManager 用来缓存下载得到的目标 jar 包,具体实现类为 BlobLibraryCacheManager。
libraryCacheManager.registerJob(),向 CacheManager 进行注册,请求缓存
调用 BlobLibraryCacheManager 的 registerJob 创建并缓存该 job 的 classloader
1 | return new JobManagerRunnerImpl( |
TaskManager在执行Task时,首先会调用LibraryCacheManager的registerTask从BlobServer下载相应的jar包并创建
Classloader
1 | private final Map<JobID, LibraryCacheEntry> cacheEntries = new HashMap<>(); |
首先执行 lilbraryCacheManager.registerClassLoaderLease(),向 CacheManager 进行注册
1 | try { |
然后执行 getOrResolveClassLoader() 来加载用户的代码加载器
三、jar 包管理
3.1. Standalone
JobManager 启动时会创建 BlobStore、BlobServer、BlobLibraryCacheManager, TaskManager 注册到 JobManager 后会创建 BlobCacheService、 BlobLibraryCacheManager。JobClient 在向集群提交 job 的过程中会调用 JobSubmissionClientActor 的 tryToSubmitJob() 方法进而调用 JobGraph 对象的 uploadUserJars() 方法