一、概述

Block 数据存放在本地磁盘,每个 block 对应一个文件,文件名为 block 的 name 属性的值,DiskBlockManager 负责为逻辑的 Block 与数据写入磁盘的位置之间建立逻辑的映射关系~

二、实现

2.1. 结构

2.1.1. 属性

  1. conf: SparkConf

  2. deleteFilesOnStop: 停止 DiskBlockManager 的时候是否删除本地目录。

    当不指定外部的 ShuffleClient(即 spark.shuffle.service.enabled 属性为 false) 或者当前实例是 Driver 时,此属性为 true

  3. localDirs: 本地目录的数组,是 DiskBlockManager 管理的本地目录数组

    1
    private[spark] val localDirs: Array[File] = createLocalDirs(conf)

    localDirs 是 DiskBlockManager 管理的本地目录数组。localDirs 通过调用 $createLocalDirs()$ 方法创建的本地目录数组,在每个路径下创建以 blockmgr- 为前缀,UUID为后缀的随机字符串的子目录。

  4. subDirsPerLocalDir: 磁盘存储 DiskStore 的本地子目录的数量,可以通过 spark.diskStore.subDirectories 属性配置,默认为64

  5. subDirs: DiskStore的本地子目录的二维数组

    1
    private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir))
  6. shutdownHook: 初始化 DiskBlockManager 时,调用 $ShutdownHookManager.addShutdownHook()$ 方法,为 DiskBlockManager 设置好关闭钩子

2.2. 本地目录结构

2.3. 文件操作

  1. 获取文件: $getFile()$

    根据 blockId 或者 filename 获取文件

    1. 调用 Utils 工具类的 $nonNegativeHash()$ 方法获取文件名的非负哈希值。
    2. 从 localDirs 数组中按照取余方式获得选中的一级目录。
    3. 哈希值除以一级目录的大小获得商,然后用商数与 subDirsPerLocalDir 取余获得的余数作为选中的二级目录。
    4. 获取二级目录。如果二级目录不存在,则需要创建二级目录。
    5. 返回二级目录下的文件。
  2. $getAllFiles()$

    获取本地 localDirs 目录中的所有文件

    $getAllFiles()$ 为了保证线程安全,采用了同步 + 克隆的方式。

  3. $createTempLocalBlock()$/$createTempShuffleBlock()$

    这两个方法用来创建 Spark 计算过程中的中间结果以及 Shuffle Write 阶段输出的存储文件。它们的块 ID 分别用 TempLocalBlockId 和TempShuffleBlockId 来表示

  4. $createLocalDirs()$

    $createLocalDirs()$ 调用 $Utils.getConfiguredLocalDirs()$ 方法获取根目录,然后对每个根目录,调用 $Utils.createDirectory()$ 方法创建存储目录,所有磁盘存储的目录都组织在一起。

    $Utils.createDirectory()$ 方法就会创建名称形如 blockmgr-[UUID.randomUUID] 的一级存储目录,但不会创建子目录。

    创建子目录的逻辑在 $getFile()$ 方法中,它除了名称所述的获取文件的功能外,会同时创建子目录~