Hadoop-组件-HDFS-源码学习-集群启动-DataNode 启动-注册
一、概述
HDFS是一个主从架构,NameNode是主节点,DataNode从节点,DataNode启动时需要和NameNode进行注册
1.1. 相关类
1.1.1. DataStorage
Datanode 最重要的功能就是管理磁盘上存储的 HDFS 数据块。Datanode 将这个管理功能切分为两个部分:
管理与组织磁盘存储目录(由 dfs.data.dir 指定),如 current、 previous、 tmp 等,这个功能由 DataStorage 类实现
如图:Data Storage 的父类为 Storage,Storagelnfo 为根接口,描述存储的基本信息。子类 Storage 是抽象类,它为 Datanode、 Namenode 提供抽象的存储服务。
一个 Storage 可以定义多个存储目录,存储目录由 Storage 的内部类 StorageDirectory 描述,StorageDirectory 类定义了存储目录上的通用操作。如下配置所示,Datanode 定义了六个数据存储目录,HDFS 会使用一个 DataStorage 对象管理整个Datanode 的存储,而这六个存储目录则由两个 StorageDirectory 对象管理。
1
2
3
4
5
6
7
8
9
10<property>
<name>dfs.datanode.data.dir</name>
<value>file://${hadoop.tmp.dir}/dfs/data,
file://${hadoop.tmp.dir}/dfs2/data,
file://${hadoop.tmp.dir}/dfs3/data,
file://${hadoop.tmp.dir}/dfs4/data,
file://${hadoop.tmp.dir}/dfs5/data,
file://${hadoop.tmp.dir}/dfs6/data
</value>
</property>管理与组织数据块及其元数据文件,这个功能主要由 FsDatasetlmpl 相关类实现
二、源码分析
看源码啦~~~
connectToNNAndHandshake()方法用于与 Namenode 握手,初始化 Datanode 存储并注册当前 Datnaode。connectToNNAndHandshake()方法的操作分为以下 4个步骤
1 | connectToNNAndHandshake(); |
2.1. 获取 Namenode 的 RPC 代理。
connectToNN() 构造 DatanodeProtocolClientSideTranslatorPB 引用作为 Datanode 向 Namenode 发起 RPC 调用的句柄。
1 | bpNamenode = dn.connectToNN(nnAddr); //get NN proxy |
2.2. 第一阶段握手
通过调用 RPC 方法 DatanodeProtocol.versionRequest(),获取当前块池对应的命名空间的信息 NamespaceInfo 。
1 | NamespaceInfo nsInfo = retrieveNamespaceInfo(); |
NamespaceInfo 包含了一些存储管理相关的信息,数据节点的后续处理,如版本检查、注册,都需要使用 NamespaceInfo 中的信息
调用 RPC 方法 DatanodeProtocol.versionRequest() 获取命名空间的信息,然后调用checkNNVersion()方法验证 Namenode 版本与当前 Datanode 版本的兼容性。
1 | NamespaceInfo retrieveNamespaceInfo() throws IOException { |
2.3. 初始化命名空间对应块池
确认获取的命名空间信息与命名空间中另一个 Namenode 获取的信息匹配。同时,如果这是第一个 Namenode 的连接,则在 Datanode 上初始化命名空间对应块池的存储
1 | bpos.verifyAndSetNamespaceInfo(nsInfo); |
verifyAndSetNamespacelnfo() 方法会首先对 bpNSInfo 赋值,然后调用 Datanode.initBlockPool() 初始化命名空间对应块池的本地存储。如果这个字段不为空,则命名空间中定义的另一个 Namenode 已经提前注册并且初始化了本地存储,这时只需要将当前注册信息与己有的注册信息比较,确认 blockPoolld、 namespaceld、 clusterld等字段相等即可。
2.3.1. 首次连接 NameNode
BPOfferService 通过 bpNSInfo 字段保存当前块池对应的命名空间信息。如果这个字段为空,则证明当前 BPServiceActor 对应 Namenode 是第一个响应握手请求的 Namenode。 如果是第一次连接 NameNode,则初始化 BlockPool 块池
1 | if (this.bpNSInfo == null) { |
添加块池
BlockPoolManager 用于管理 DataNode上所有的块池。 BlockPoolManager 会持有多个 BPOfferService 对象,每个 BPOfferService 对象都封装了对单个块池操作的 API。
初步初始化存储结构
第一次握手,就是注册存储数据块的集合,并且根据需求来判断是否需要数据回滚和数据恢复
在StorageLocation不为空的情况下,根据需求进行回滚或者恢复数据
初始化DataStorage
DataNode一个重要的功能就是管理磁盘存储的数据块,DataNode将这个功能切分为2个部分:管理与组织磁盘目录,由DataStorage实现
1
2
3
4synchronized (this) {
//在 StorageLocation 不为空的情况下,根据需求进行回滚或者恢复数据
storage.recoverTransitionRead(this, nsInfo, dataDirs, startOpt);
}给 DataNode 生成新的 uuid
初始化 FsDatasetImpl
BlockPoolSlice 负责管理单个存储目录下单个块池的所有数据块;FsVolumeImpl 则负责管理一个完整的存储目录下所有的数据块,也就包括了这个存储目录下多个 BlockPoolSlice 对象的引用。Datanode 可以定义多个存储目录,也就是定义多个 FsVolumeImpl 对象,在 HDFS 中使用 FsVolumeList 对象统一管理 Datanode 上定义的多个 FsVolumelmpl 对象。FsDatasetImpl 负责管理 Datanode 上所有数据块功能的类
检查磁盘损坏,然后从 DataNode 中移除有问题的目录
1
checkDiskError();
启动扫描器 DirectoryScanner
DirectoryScanner 的主要任务是定期扫描磁盘上的数据块,检查磁盘上的数据块信息是否与 FsDatasetlmpl 中保存的数据块信息一致,如果不一致则对 FsDatasetImpl中的信息进行更新。
注意,DirectoryScanner 只会检查内存和磁盘上 FINALIZED 状态的数据块是否一致。
1
initDirectoryScanner(conf);
DirectoryScanner 对象会定期(由 dfs.datanode.directoryscan.interval 配置,默认为 21600秒)在线程池对象 masterThread 上触发扫描任务,这个扫描任务是由 DirectoryScanner.reconcile()方法执行的。reconcile()会首先调用 scan()方法收集磁盘上数据块与内存中数据块的差异信息,并把这些差异信息保存在 diffs 字段中。
scan()方法在获取磁盘上存储的数据块时使用了reportCompileThreadPool 线程池,异步地完成磁盘数据块的扫描任务。
reconcile()方法拿到 scan()更新的 diffs 对象后,调用 FsDataset 的 checkAndUpdate()方法更新 FsDatasetlmpl 保存的数据块副本信息,完成与磁盘上数据块副本的同步操作。
将 BlockPool 添加到 FsDatasetIpml,并继续初始化存储结构
1
data.addBlockPool(nsInfo.getBlockPoolID(), conf);
FsDatasetImpl#addBlockPool() 操作的主要对象具体到了各 BlockPool,完成blockpool、current、rbw、tmp等目录的检查、恢复或初始化
1
2
3
4
5
6
7
8
9
10public void addBlockPool(String bpid, Configuration conf) throws IOException {
synchronized(this) {
// 添加 BlockPool
volumes.addBlockPool(bpid, conf);
// 初始化 ReplicaMap 中 BlockPool 的映射
volumeMap.initBlockPool(bpid);
}
// 将所有副本加载到 FsDatasetImpl#volumeMap 中
volumes.getAllVolumesMap(bpid, volumeMap, ramDiskReplicaTracker);
}
2.3.2. 不是首次连接(刷新)
需要将当前注册信息与己有的注册信息比较,确认 blockPoolld、 namespaceld、 clusterld等字段相等即可。
2.4. 第二阶段握手
调用 BPServiceActor.register() 方法向 Namenode 注册当前 Datanode。
1 | register(nsInfo); |
第二阶段的握手,调用 DatanodeProtocol.registerDatanode()方法在 Namenode 上注册当前 Datanode。注册成功后调用 BPOfferService.registrationSucceeded() 方法确认 namespaceld、clusterld 与命名空间中定义的另一个 Namenode 返回的信息一致,同时确认 DatanodeUUid 与 Datanode 本地存储一致。
2.4.1. 注册 Datanode
1 | while (shouldRun()) { |
2.4.2. 确认namespaceId、 clusterId
调用 registrationSucceeded() 确认 namespaceld、 clusterId与其他 Namenode 返回的信息一致,并且确认 DatanodeUUid与 Datanode 本地存储一致
1 | bpos.registrationSucceeded(this, bpRegistration); |
2.4.3. 对块汇报做一个随机延迟
1 | scheduleBlockReport(dnConf.initialBlockReportDelay); |