Hadoop-组件-HDFS-源码学习-集群启动-DataNode 启动-心跳
一、概述
二、源码
offerService()方法是 BPServiceActor 的主循环方法,它用于向 Namenode 发送心跳、块汇报、缓存汇报以及增量汇报。offerService()方法会一直循环运行,直到 Datanode 关闭或者客户端调用 ClientDatanodeProtocol.refreshNodes()重新加载 Namenode 配置。
offerService() 包括以下几个部分:
2.1. 定期发送心跳
offerService()是通过调用 DatanodeProtocol.sendHeartbeat()方法向 Namenode 发送心跳的。
2.1.1. 心跳时间间隔
offerService() 方法会以 dnConf.heartBeatInterval
(默认配置是3 秒)间隔向 Namenode 发送心跳。
1 | if (startTime - lastHeartbeat >= dnConf.heartBeatInterval){ |
2.1.2. 发送心跳
offerService() 通过调用 DatanodeProtocol#sendHeartbeat() 方法向 Namenode 发送心跳。Datanode 向 Namenode 发送的心跳信息主要包括:Datanode 注册信息、Datanode 存储信息(使用容量,剩余容量等)、缓存信息、当前 Datanode 写文件的连接数,以及读写数据使用的线程数等
1 |
|
Namenode 收到 Datanode 的心跳之后,会返回一个心跳响应 HeartbeatResponse。这个心跳响应中包含一个 DatanodeCommand 的数组,用来携带 Namenode 对 Datanode 的名字节点指令。
同时心跳响应中还包含一个 NNHAStatusHeartbeat 对象,用来标识当前 Namenode 的 HA状态。Datanode 会使用这个字段来确定 BPOfferService 当中的哪一个 BPServiceActor 对应的Namenode 是 Active 状态的。
2.1.3. 处理心跳响应
调用 bpos.updateActorStatesFromHeartbeat() 方法处理心跳响应当中的 HA 状态字目NNHA StatusHeartbeat, 也就是更新 BPOfferService 中的 Active Namenode 的引用。
1 | bpos.updateActorStatesFromHeartbeat(this, resp.getNameNodeHaState()); |
BPOfferService#updateActorStatesFromHeartbeat() 方法用于处理心跳响应中携带的Namenode HA 状态。
1 | // Namenode 携带的 txid |
Standby Namenode 切换成 Active Namenode。
如果发生这种情况,就需要对 BPOfferService 的 bpServiceToActive 字段进行更改
bpService ToActive 字段记录了 BPOfferService 认为是 Active 状态的 Namenode 对应的 BPServiceActor 对象的引用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16// 原来的 Standby Namenode 声明自己为 Active Namenode,发生状态切换
if (nnClaimsActive && !bposThinksActive) {
// 当前 Namenode 携带的 txid 小于原 Active Namenode 的txid
// 也就是有两个 Namenode 声明自己为 Active,但是当前 Namenode 的请求过时
if (!isMoreRecentClaim) { //对于过时的请求,则直接忽略
return;
} else { // 当前的请求是最新的请求
if (bpServiceToActive == null) {
// BPofferservice 还没有保存 active namenode
} else {
// 当前 Namenode 的请求是最新的请求
}
// 将 bpServiceToActive 指向当前 Namenode 对应的 BPServiceActor
bpServiceToActive = actor;
}
}为了防止脑裂,也就是两个 Namenode 都声明自己是 AcitveNamenode 的情况。Namenode 心跳响应中的 NNHAStatusHeartbeat 对象专门携带了一个 txid 字段,只有携带大的 txid 的 Namenode 才能作为 ActiveNamenode。
Active 状态的 Namenode 切换为Standby 状态
直接将 BPOfferService.bpService ToActive 字段赋值为 null
1
2
3
4// 原来 Active 状态的 Namenode 现在声明自己为 Standby
if (!nnClaimsActive && bposThinksActive) {
bpServiceToActive = null;
}最后更新 lastActiveclaimTxid
1
2
3
4if (bpServiceToActive == actor) {
assert txid >= lastActiveClaimTxId;
lastActiveClaimTxId = txid;
}
2.1.4. 执行心跳返回的指令
完成上面的操作以后,offerService()方法就会调用 processCommand() 方法执行 Namenode 通过心跳返回的名字节点指令
1 | if (!processCommand(resp.getCommands())) continue; |
processCommand() 方法首先遍历心跳响应中携带的指令,然后调用BPOfferService.processCommandFromActor() 方法处理指令。
1 | for (DatanodeCommand cmd : cmds) { |
2.2. 最近新添加和删除的数据块
Datanode 除了定期向 Namenode 发送心跳外,还需要向 Namenode 汇报 Datanode 最近新添加和删除的数据块(汇报间隔是100*心跳间隔,也就是300秒)。
在 offerService()方法中,调用 reportReceivedDeletedBlocks()方法执行这个操作。
1 | if (sendImmediateIBR || (startTime - lastDeletedReport > dnConf.deleteReportInterval)) { |
2.2.1. 维护 Datastorage 上新添加的数据块信息以及刚删除的数据块信息
pendingIncrementalBRperStorage 是一个 Map,维护 DataStorage 上存储的两次汇报之间新添加和删除的数据块。BPServiceActor 提供了 notifyNamenodeBlock() 以及 notifyNamenodeDeleted Block()方法用于向 pendingIncrementalBRperStorage 中添加更新的数据块信息。
将新添加的数据块信息加入 pendingIncrementalBRperstorage 中
notifyNamenodeBlock() 用于添加一个 Datanode 新接收的数据块信息
对于新添加的数据块,notifyNamenodeBlock() 方法会将 sendlmmediateIBR 字段设置为 true,并且唤醒在pendingIncrementalBRperStorage 上等待新数据块的 offerService()方法,也就是 offerService()会立即将添加数据块的信息发送给 Namenode。
将刚删除的数据块信息加入 pendingIncrementalBRperstorage 中
notifyNamenodeDeletedBlock() 用于添加一个 Datanode 新删除的数据块信息。
reportReceivedDeletedBlocks()方法从 pendingIncrementalBRperStorage 对象中取出两次块汇报之间增量的、还没有向 Namenode 汇报的数据块信息。然后将这些信息放入 reports 变量中
2.2.2. 汇报
reportReceivedDeletedBlocks() 通过调用 RPC 方法 DatanodeProtocol.blockReceivedAndDeleted() 将汇报发送给 Namenode。
1 | bpNamenode.blockReceivedAndDeleted(bpRegistration, |
2.2.3. 失败处理
如果这次 RPC 调用失败,则将 reports 变量中的信息重新放回 pendingIncrementalBRperStorage 中。然后将 sendlmmediateIBR 赋值为 true,立即重发ReceivedDeletedBlockReport。如果当前没有工作且 sendlmmediateIBR 为 false,那么 offerService()方法会在 pendingIncrementalBRperStorage 对象上等待,直到超时或者被唤醒。
1 | if (!success) {// 汇报不成功的话 |
2.3. 数据块汇报
Datanode 会在启动时或者间隔 blockReportInterval 时向 Namenode 发送块汇报,包括 Datanode 上存储的所有数据块信息。Namenode 会通过响应返回名字节点指令。
1 | List<DatanodeCommand> cmds = blockReport(); |
offerService()首先调用blockReport()方法发送块汇报,然后接收 Namenode 返回的名字节点指令 cmds,最后调用 processCommand()方法执行这些指令。
2.3.1. 触发时机
blockReport()方法每隔 blockReportlnterval(默认间隔是6个小时)会触发一次块汇报。
1 | // 如果当前时间 startTime 减去上次数据块汇报时间小于数据节点配置的数据块汇报时间间隔的话,直接返回 null, |
2.3.2. 方法发送数据块增量汇报
首先调用 DatanodeProtocol.reportReceivedDeletedBlocks() 方法向 Namenode 汇报 Datanode 最近添加与删除的数据块,以避免 Namenode 和 Datanode 元数据不同步。
2.3.3. 获得当前块池的块汇报信息
然后调用FSDataselmpl.getBlockReports() 获得当前块池的块汇报信息。
1 | Map<DatanodeStorage, BlockListAsLongs> perVolumeBlockLists = |
2.3.4. 块汇报信息转换为 StorageBlockReport 类型
将获取的块汇报信息转换为 StorageBlockReport 类型
1 | int i = 0; |
2.3.5. 汇报
再通过 RPC方法 DatanodeProtocol.blockReport() 发送这个数据块汇报到 Namenode,保存 Namenode 返回的名字节点指令。
1 | // 根据数据块总数目判断是否需要多次发送消息 |
2.3.6. 调度下一次数据块汇报
1 | scheduleNextBlockReport(startTime); |
2.4. 缓存数据块汇报
缓存汇报的逻辑类似于块汇报,offerService()方法首先调用 cacheReport()方法,向Namenode 汇报当前 Datanode 的缓存情况,然后调用 processCommand()方法执行 Namenode携带回的指令。
1 | // 缓存块汇报 |
2.5. 启动数据块扫描操作
offerService() 方法会启动当前块池的数据块扫描功能
2.6. 线程睡眠等待
offerService()方法会在 pendingIncrementalBRperStorage 对象上等待,直到下一个心跳周期或者被唤醒。
这里的唤醒操作是在 notifyNamenodeBlock()方法中执行的