一、概述

Hadoop 2.3.0 版本新增了集中式缓存管理 (Centralized Cache Management)功能,允许用户将一些文件和目录保存到 HDFS 缓存中。HDFS 集中式缓存是由分布在 Datanode 上的堆外内存组成的,并且由 Namenode 统一管理。

二、理论笔记

正在持续施工中ing~~~

HDFS 为管理员和用户提供了 hdfs cacheadmin 命令管理集中式缓存,包括缓存指令控制和缓存池控制两个部分。

  1. 缓存指令控制

    • 调用 hdfs cacheadmin -addDirective 命令缓存指定路径;

    • 调用 hdfs cacheadmin -removeDirective 命令删除指定 id 对应的缓存

    • 调用 hdfs cacheadmin -removeDirectives 命令删除指定路径的缓存

    • 调用 hdfs cacheadmin -listDirectives 命令显示当前所有的缓存。

  2. 缓存池控制

    • 调用 hdfs cacheadmin -addPool 命令创建一个缓存池
    • 调用 hdfs cacheadmin -modifyPool 命令修改一个缓存池的配置
    • 调用 hdfs cacheadmin -removePool 命令删除一个缓存池。

三、实现

用户可以通过 hdfs cacheadmin 命令或者 HDFS API 向 Namenode 发送缓存指令(CacheDirective),Namenode 的 CacheManager 类会将缓存指令保存到内存的指定数据结构(CacheManager 的 directivesById、 directivesByPath 字段)中,同时在 fsimage 和 editlog 文件中记录该缓存指令。之后 Namenode 的 CacheReplicationMonitor 类会周期性扫描命名空间和活跃的缓存指令,以确定需要缓存或删除缓存的数据块,并向 Datanode 分配缓存任务。

Namenode 还负责管理集群中所有 Datanode 的堆外缓存,Datanode 会周期性向 Namenode 发送缓存报告,而 Namenode 会通过心跳响应向 Datanode 下发缓存指令(添加缓存或者删除缓存)。
DFSClient 读取数据块时会向 Namenode 发送 ClientProtocol.getBlockLocations 请求获取数据块的位置信息,Namenode 除了返回数据块的位置信息外,还会返回该数据块的缓存信息,这样 DFSClient 就可以执行本地零拷贝读取缓存数据块了,从而提高了读取效率。

3.1. 相关类

cachemanager

3.1.1. 缓存块管理命令 CacheAdmin

CacheAdmin是 HDFS 中缓存块的管理命令。在 CacheAdmin 中的每个操作命令,最后通过 RPC 调用都会对应到 CacheManager 中的一个具体操作方法。

cacheadmin

AddCachePoolCommand_run

3.1.2. CacheManager

CacheManager 类是 Namenode 管理集中式缓存的核心组件,它管理着分布在 HDFS 集群中 Datanode 上的所有缓存数据块,同时负责响应 hdfs cacheadmin 命令或者 HDFS API 发送的缓存管理命令。

  1. 缓存指令 CacheDirective

    在 HDFS 中,最终缓存的本质上还是一个 INodeFile 文件,逻辑上 CacheDirective 是缓存的基本单元。

    CacheDirective 不一定是一个目录,也可以是一个文件

CacheDirective
  1. CachePool

    CachePool 中维护了一个 cacheDirective 缓存单元列表,CachePool 缓存池被 CacheManager 管理。

    CachePool
  2. CacheReplicationMonitor

    CacheReplicationMonitor 是一个线程类,它会在 Namenode 启动时以及以固定的间隔扫描命名空间和活跃的缓存指令,以确定需要缓存或删除缓存的数据块,之后向 Datanode 下发缓存指令(时间间隔是由 dfs.namenode.path.based.cache.refresh.interval.ms 配置项配置的,默认为 30 秒)。

    CacheManager 中还运行着一个很重要的服务,就是 CacheReplicationMonitor, 这个监控程序会周期扫描当前的最新的缓存路径,并分发到对应的 DataNode 节点上

3.2. 创建缓存池

客户端调用 ClientProtocol.addCachePool() 方法创建一个缓存池,NameNodeRpcServer 接收请求后,会将创建缓存池请求的参数封装到一个 CachePoolInfo 对象中(包括owner、 group、 mode、 limit、 maxTtl 等参数),然后调用 CacheManager.addCachePool() 方法响应这个请求。

CacheManager.addCachePool()

CacheManager.addCachePool() 方法首先会验证请求参数(info 变量保存)是否合法,然后检查 CacheManager.cachePools 集合中是否已经保存了这个缓存池。如果 CacheManager中没有这个缓存池的信息,则调用 CachePool.createFromInfoAndDefaults() 方法根据请求参数创建一个新的 CachePool 对象。

createFromInfoAndDefaults 方法会将请求中没有设置的参数用默认值补齐,然后构造 CachePool 对象。成功创建 CachePool 对象后,addCachePool() 方法会将 CachePool 对象放入 cachePools 字段中保存。至此,添加缓存池的操作完成。

3.2.1. 向缓存池添加缓存指令

成功创建缓存池之后,客户端就会调用 ClientProtocol.addCacheDirective() 方法将一个缓存指令添加到缓存中。NameNodeRpcServer 接收了这个请求后,会将创建缓存请求的参数封装到一个 CacheDirectiveInfo 对象中,然后调用 CacheManager.addInternal()方法响应这个请求。

CacheManager.addInternal() 会将缓存指令对象加入 directivesById 以及 directivesByPath 集合中保存,然后更新缓存池的统计信息。之后 addinternal() 方法会调用 setNeedsRescan() 方法触发 CacheReplicationMonitor 执行 rescan() 操作。

ClientProtocol.addCacheDirective

3.3. 扫描缓存指令

CacheReplicationMonitor 会循环调用 rescan() 方法执行扫描逻辑~

3.3.1. 遍历缓存指令

rescan() 调用 rescanCacheDirectives() 方法遍历所有的缓存指令(存储在 CacheManager.directivesByPath 字段中),并将缓存指令路径中包含的数据块加入到 CacheReplicationMonitor.cachedBlocks 集合中等待进一步操作。

3.3.2. 遍历 cachedBlocks

之后 rescan() 方法会调用 rescanCachedBloekMap() 遍历 CacheReplicationMonitor.cachedBlocks 集合,判断这些数据块是执行 cache 操作还是 uncache 操作。

  1. cache

    对于需要执行 cache 操作的数据块,rescanCachedBlockMap() 会调用 CacheReplicationMonitor.adaNewPendingCached() 方法为每个等待 cache 的数据块选择一个合适的 Datanode(从保存了该数据块副本的所有 Datanode 中挑选一个可用内存最多的),之后将该数据块加入该 Datanode 对应的 DatanodeDescriptor 对象的 pendingCached 列表中。

  2. uncache

    需要执行 uncache 操作的数据块,rescanCachedBlockMap() 会调用 CacheReplicationMonitor.addNewPendingUncached() 方法从缓存了该数据块的 Datanode 中随机选出一个节点执行 uncache 操作

    也就是将数据块加入该 Datanode 对应的 DatanodeDescriptor 对象的 pendingUncached 列表中。

    1
    2
    3
    4
    5
    6
    7
    8
    for (Iterator<DatanodeDescriptor> iter = pendingUncached.iterator(); iter.hasNext(); ) {
    DatanodeDescriptor datanode = iter.next();
    if (!cblock.isInList(datanode.getCached())) {
    LOG.trace("...");
    datanode.getPendingUncached().remove(cblock);
    iter.remove();
    }
    }

将数据块加入 DatanodeDescriptor 的 pendingCached 和 pendingUncached 列表中后,Namenode 就会在心跳处理流程中生成名字节点指令(请参考数据节点管理的名字节点指令生成小节),并通过心跳响应发送给 Datanode。Datanode 接受指令之后,会执行缓存以及删除缓存操作(请参考第4章的 FSDatasetImpl 小节)。至此,Namenode 端处理缓存数据块的逻辑就结束了。