一、概述

HDFS是一个主从架构,NameNode是主节点,DataNode从节点,DataNode启动时需要和NameNode进行注册

1.1. 相关类

1.1.1. DataStorage

Datanode 最重要的功能就是管理磁盘上存储的 HDFS 数据块。Datanode 将这个管理功能切分为两个部分:

  • 管理与组织磁盘存储目录(由 dfs.data.dir 指定),如 current、 previous、 tmp 等,这个功能由 DataStorage 类实现

    如图:Data Storage 的父类为 Storage,Storagelnfo 为根接口,描述存储的基本信息。子类 Storage 是抽象类,它为 Datanode、 Namenode 提供抽象的存储服务。

    一个 Storage 可以定义多个存储目录,存储目录由 Storage 的内部类 StorageDirectory 描述,StorageDirectory 类定义了存储目录上的通用操作。如下配置所示,Datanode 定义了六个数据存储目录,HDFS 会使用一个 DataStorage 对象管理整个Datanode 的存储,而这六个存储目录则由两个 StorageDirectory 对象管理。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    <property>
    <name>dfs.datanode.data.dir</name>
    <value>file://${hadoop.tmp.dir}/dfs/data,
    file://${hadoop.tmp.dir}/dfs2/data,
    file://${hadoop.tmp.dir}/dfs3/data,
    file://${hadoop.tmp.dir}/dfs4/data,
    file://${hadoop.tmp.dir}/dfs5/data,
    file://${hadoop.tmp.dir}/dfs6/data
    </value>
    </property>
  • 管理与组织数据块及其元数据文件,这个功能主要由 FsDatasetlmpl 相关类实现

二、源码分析

看源码啦~~~

connectToNNAndHandshake()方法用于与 Namenode 握手,初始化 Datanode 存储并注册当前 Datnaode。connectToNNAndHandshake()方法的操作分为以下 4个步骤

1
connectToNNAndHandshake();

2.1. 获取 Namenode 的 RPC 代理。

connectToNN() 构造 DatanodeProtocolClientSideTranslatorPB 引用作为 Datanode 向 Namenode 发起 RPC 调用的句柄。

1
bpNamenode = dn.connectToNN(nnAddr); //get NN proxy

2.2. 第一阶段握手

通过调用 RPC 方法 DatanodeProtocol.versionRequest(),获取当前块池对应的命名空间的信息 NamespaceInfo 。

1
NamespaceInfo nsInfo = retrieveNamespaceInfo();

NamespaceInfo 包含了一些存储管理相关的信息,数据节点的后续处理,如版本检查、注册,都需要使用 NamespaceInfo 中的信息

调用 RPC 方法 DatanodeProtocol.versionRequest() 获取命名空间的信息,然后调用checkNNVersion()方法验证 Namenode 版本与当前 Datanode 版本的兼容性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
NamespaceInfo retrieveNamespaceInfo() throws IOException {
NamespaceInfo nsInfo = null;
while (shouldRun()) {
try {
//返回的 NamespaceInfo 里面有blockPoolID
nsInfo = bpNamenode.versionRequest();
break;
}
}
// ...
if (nsInfo != null) {
checkNNVersion(nsInfo);
}
// ...
return nsInfo;
}

2.3. 初始化命名空间对应块池

确认获取的命名空间信息与命名空间中另一个 Namenode 获取的信息匹配。同时,如果这是第一个 Namenode 的连接,则在 Datanode 上初始化命名空间对应块池的存储

1
bpos.verifyAndSetNamespaceInfo(nsInfo);

verifyAndSetNamespacelnfo() 方法会首先对 bpNSInfo 赋值,然后调用 Datanode.initBlockPool() 初始化命名空间对应块池的本地存储。如果这个字段不为空,则命名空间中定义的另一个 Namenode 已经提前注册并且初始化了本地存储,这时只需要将当前注册信息与己有的注册信息比较,确认 blockPoolld、 namespaceld、 clusterld等字段相等即可。

2.3.1. 首次连接 NameNode

BPOfferService 通过 bpNSInfo 字段保存当前块池对应的命名空间信息。如果这个字段为空,则证明当前 BPServiceActor 对应 Namenode 是第一个响应握手请求的 Namenode。 如果是第一次连接 NameNode,则初始化 BlockPool 块池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
if (this.bpNSInfo == null) {
this.bpNSInfo = nsInfo;
boolean success = false;
// 如果是第一次连接 NameNode 初始化 BlockPool 块池
try {
dn.initBlockPool(this);
success = true;
} finally {
if (!success) {
// 如果初始化失败,则将 bpNSInfo 置为空,等待下一个 Namenode 的响应
this.bpNSInfo = null;
}
}
}
  1. 添加块池

    BlockPoolManager 用于管理 DataNode上所有的块池。 BlockPoolManager 会持有多个 BPOfferService 对象,每个 BPOfferService 对象都封装了对单个块池操作的 API。

  2. 初步初始化存储结构

    第一次握手,就是注册存储数据块的集合,并且根据需求来判断是否需要数据回滚和数据恢复

    • 在StorageLocation不为空的情况下,根据需求进行回滚或者恢复数据

      初始化DataStorage

      DataNode一个重要的功能就是管理磁盘存储的数据块,DataNode将这个功能切分为2个部分:管理与组织磁盘目录,由DataStorage实现

      1
      2
      3
      4
      synchronized (this) {
      //在 StorageLocation 不为空的情况下,根据需求进行回滚或者恢复数据
      storage.recoverTransitionRead(this, nsInfo, dataDirs, startOpt);
      }
    • 给 DataNode 生成新的 uuid

    • 初始化 FsDatasetImpl

      BlockPoolSlice 负责管理单个存储目录下单个块池的所有数据块;FsVolumeImpl 则负责管理一个完整的存储目录下所有的数据块,也就包括了这个存储目录下多个 BlockPoolSlice 对象的引用。Datanode 可以定义多个存储目录,也就是定义多个 FsVolumeImpl 对象,在 HDFS 中使用 FsVolumeList 对象统一管理 Datanode 上定义的多个 FsVolumelmpl 对象。FsDatasetImpl 负责管理 Datanode 上所有数据块功能的类

      HADOOP-文件系统数据集_FsDatasetImpl

  3. 检查磁盘损坏,然后从 DataNode 中移除有问题的目录

    1
    checkDiskError();
  4. 启动扫描器 DirectoryScanner

    DirectoryScanner 的主要任务是定期扫描磁盘上的数据块,检查磁盘上的数据块信息是否与 FsDatasetlmpl 中保存的数据块信息一致,如果不一致则对 FsDatasetImpl中的信息进行更新。

    注意,DirectoryScanner 只会检查内存和磁盘上 FINALIZED 状态的数据块是否一致。

    1
    initDirectoryScanner(conf);

    DirectoryScanner 对象会定期(由 dfs.datanode.directoryscan.interval 配置,默认为 21600秒)在线程池对象 masterThread 上触发扫描任务,这个扫描任务是由 DirectoryScanner.reconcile()方法执行的。reconcile()会首先调用 scan()方法收集磁盘上数据块与内存中数据块的差异信息,并把这些差异信息保存在 diffs 字段中。

    scan()方法在获取磁盘上存储的数据块时使用了reportCompileThreadPool 线程池,异步地完成磁盘数据块的扫描任务。

    reconcile()方法拿到 scan()更新的 diffs 对象后,调用 FsDataset 的 checkAndUpdate()方法更新 FsDatasetlmpl 保存的数据块副本信息,完成与磁盘上数据块副本的同步操作。

  5. 将 BlockPool 添加到 FsDatasetIpml,并继续初始化存储结构

    1
    data.addBlockPool(nsInfo.getBlockPoolID(), conf);

    FsDatasetImpl#addBlockPool() 操作的主要对象具体到了各 BlockPool,完成blockpool、current、rbw、tmp等目录的检查、恢复或初始化

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    public void addBlockPool(String bpid, Configuration conf) throws IOException {
    synchronized(this) {
    // 添加 BlockPool
    volumes.addBlockPool(bpid, conf);
    // 初始化 ReplicaMap 中 BlockPool 的映射
    volumeMap.initBlockPool(bpid);
    }
    // 将所有副本加载到 FsDatasetImpl#volumeMap 中
    volumes.getAllVolumesMap(bpid, volumeMap, ramDiskReplicaTracker);
    }

2.3.2. 不是首次连接(刷新)

需要将当前注册信息与己有的注册信息比较,确认 blockPoolld、 namespaceld、 clusterld等字段相等即可。

2.4. 第二阶段握手

调用 BPServiceActor.register() 方法向 Namenode 注册当前 Datanode。

1
register(nsInfo);

第二阶段的握手,调用 DatanodeProtocol.registerDatanode()方法在 Namenode 上注册当前 Datanode。注册成功后调用 BPOfferService.registrationSucceeded() 方法确认 namespaceld、clusterld 与命名空间中定义的另一个 Namenode 返回的信息一致,同时确认 DatanodeUUid 与 Datanode 本地存储一致。

2.4.1. 注册 Datanode

1
2
3
4
5
6
7
8
9
10
11
12
13
while (shouldRun()) {
try {
// 调用 DatanodeProtocol.registerDatanode() 注册 Datanode
bpRegistration = bpNamenode.registerDatanode(bpRegistration);
bpRegistration.setNamespaceInfo(nsInfo);
break;
} catch(EOFException e) { // namenode might have just restarted
sleepAndLogInterrupts(1000, "connecting to server");
} catch(SocketTimeoutException e) { // namenode is busy
// Namenode 忙碌,则等待1 秒后重试
sleepAndLogInterrupts(1000, "connecting to server");
}
}

2.4.2. 确认namespaceId、 clusterId

调用 registrationSucceeded() 确认 namespaceld、 clusterId与其他 Namenode 返回的信息一致,并且确认 DatanodeUUid与 Datanode 本地存储一致

1
bpos.registrationSucceeded(this, bpRegistration);

2.4.3. 对块汇报做一个随机延迟

1
scheduleBlockReport(dnConf.initialBlockReportDelay);