Spark-源码学习-SparkCore-存储服务-磁盘-DiskStore-DiskBlockManager
一、概述
Block 数据存放在本地磁盘,每个 block 对应一个文件,文件名为 block 的 name 属性的值,DiskBlockManager 负责为逻辑的 Block 与数据写入磁盘的位置之间建立逻辑的映射关系~
二、实现
2.1. 结构
2.1.1. 属性
conf: SparkConf
deleteFilesOnStop: 停止 DiskBlockManager 的时候是否删除本地目录。
当不指定外部的 ShuffleClient(即 spark.shuffle.service.enabled 属性为 false) 或者当前实例是 Driver 时,此属性为 true
localDirs: 本地目录的数组,是 DiskBlockManager 管理的本地目录数组
1
private[spark] val localDirs: Array[File] = createLocalDirs(conf)
localDirs 是 DiskBlockManager 管理的本地目录数组。localDirs 通过调用 $createLocalDirs()$ 方法创建的本地目录数组,在每个路径下创建以
blockmgr-
为前缀,UUID为后缀的随机字符串的子目录。subDirsPerLocalDir: 磁盘存储 DiskStore 的本地子目录的数量,可以通过 spark.diskStore.subDirectories 属性配置,默认为64
subDirs: DiskStore的本地子目录的二维数组
1
private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir))
shutdownHook: 初始化 DiskBlockManager 时,调用 $ShutdownHookManager.addShutdownHook()$ 方法,为 DiskBlockManager 设置好关闭钩子
2.2. 本地目录结构
2.3. 文件操作
获取文件: $getFile()$
根据 blockId 或者 filename 获取文件
- 调用 Utils 工具类的 $nonNegativeHash()$ 方法获取文件名的非负哈希值。
- 从 localDirs 数组中按照取余方式获得选中的一级目录。
- 哈希值除以一级目录的大小获得商,然后用商数与 subDirsPerLocalDir 取余获得的余数作为选中的二级目录。
- 获取二级目录。如果二级目录不存在,则需要创建二级目录。
- 返回二级目录下的文件。
$getAllFiles()$
获取本地 localDirs 目录中的所有文件
$getAllFiles()$ 为了保证线程安全,采用了同步 + 克隆的方式。
$createTempLocalBlock()$/$createTempShuffleBlock()$
这两个方法用来创建 Spark 计算过程中的中间结果以及 Shuffle Write 阶段输出的存储文件。它们的块 ID 分别用 TempLocalBlockId 和TempShuffleBlockId 来表示
$createLocalDirs()$
$createLocalDirs()$ 调用 $Utils.getConfiguredLocalDirs()$ 方法获取根目录,然后对每个根目录,调用 $Utils.createDirectory()$ 方法创建存储目录,所有磁盘存储的目录都组织在一起。
$Utils.createDirectory()$ 方法就会创建名称形如
blockmgr-[UUID.randomUUID]
的一级存储目录,但不会创建子目录。创建子目录的逻辑在 $getFile()$ 方法中,它除了名称所述的获取文件的功能外,会同时创建子目录~