一、概述

二、源码

offerService()方法是 BPServiceActor 的主循环方法,它用于向 Namenode 发送心跳、块汇报、缓存汇报以及增量汇报。offerService()方法会一直循环运行,直到 Datanode 关闭或者客户端调用 ClientDatanodeProtocol.refreshNodes()重新加载 Namenode 配置。

offerService() 包括以下几个部分:

2.1. 定期发送心跳

offerService()是通过调用 DatanodeProtocol.sendHeartbeat()方法向 Namenode 发送心跳的。

2.1.1. 心跳时间间隔

offerService() 方法会以 dnConf.heartBeatInterval(默认配置是3 秒)间隔向 Namenode 发送心跳。

1
2
3
if (startTime - lastHeartbeat >= dnConf.heartBeatInterval){
//...
}

2.1.2. 发送心跳

offerService() 通过调用 DatanodeProtocol#sendHeartbeat() 方法向 Namenode 发送心跳。Datanode 向 Namenode 发送的心跳信息主要包括:Datanode 注册信息、Datanode 存储信息(使用容量,剩余容量等)、缓存信息、当前 Datanode 写文件的连接数,以及读写数据使用的线程数等

1
2
3
4
5
6
7
@Override
public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration,
StorageReport[] reports, long cacheCapacity, long cacheUsed,
int xmitsInProgress, int xceiverCount, int failedVolumes,
VolumeFailureSummary volumeFailureSummary) throws IOException {
...
}

Namenode 收到 Datanode 的心跳之后,会返回一个心跳响应 HeartbeatResponse。这个心跳响应中包含一个 DatanodeCommand 的数组,用来携带 Namenode 对 Datanode 的名字节点指令。

同时心跳响应中还包含一个 NNHAStatusHeartbeat 对象,用来标识当前 Namenode 的 HA状态。Datanode 会使用这个字段来确定 BPOfferService 当中的哪一个 BPServiceActor 对应的Namenode 是 Active 状态的。

2.1.3. 处理心跳响应

调用 bpos.updateActorStatesFromHeartbeat() 方法处理心跳响应当中的 HA 状态字目NNHA StatusHeartbeat, 也就是更新 BPOfferService 中的 Active Namenode 的引用。

1
2
3
4
5
bpos.updateActorStatesFromHeartbeat(this, resp.getNameNodeHaState());
state = resp.getNameNodeHaState().getState();
if (state == HAServiceState.ACTIVE) {
handleRollingUpgradeStatus(resp);
}

BPOfferService#updateActorStatesFromHeartbeat() 方法用于处理心跳响应中携带的Namenode HA 状态。

1
2
3
4
5
6
7
8
// Namenode 携带的 txid
final long txid = nnHaState.getTxId();
// BPOfferService 是否认为当前 Namenode 为 Active Namenode
final boolean nnClaimsActive = nnHaState.getState() == HAServiceState.ACTIVE;
// BPOfferservice 是否认为当前 Namenode 为 Active Namenode
final boolean bposThinksActive = bpServiceToActive == actor;
// 当前 Namenode 携带的 txid是否大于原 Active Namenode 携带的 txid
final boolean isMoreRecentClaim = txid > lastActiveClaimTxId;
  1. Standby Namenode 切换成 Active Namenode。

    如果发生这种情况,就需要对 BPOfferService 的 bpServiceToActive 字段进行更改

    bpService ToActive 字段记录了 BPOfferService 认为是 Active 状态的 Namenode 对应的 BPServiceActor 对象的引用

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    // 原来的 Standby Namenode 声明自己为 Active Namenode,发生状态切换
    if (nnClaimsActive && !bposThinksActive) {
    // 当前 Namenode 携带的 txid 小于原 Active Namenode 的txid
    // 也就是有两个 Namenode 声明自己为 Active,但是当前 Namenode 的请求过时
    if (!isMoreRecentClaim) { //对于过时的请求,则直接忽略
    return;
    } else { // 当前的请求是最新的请求
    if (bpServiceToActive == null) {
    // BPofferservice 还没有保存 active namenode
    } else {
    // 当前 Namenode 的请求是最新的请求
    }
    // 将 bpServiceToActive 指向当前 Namenode 对应的 BPServiceActor
    bpServiceToActive = actor;
    }
    }

    为了防止脑裂,也就是两个 Namenode 都声明自己是 AcitveNamenode 的情况。Namenode 心跳响应中的 NNHAStatusHeartbeat 对象专门携带了一个 txid 字段,只有携带大的 txid 的 Namenode 才能作为 ActiveNamenode。

  2. Active 状态的 Namenode 切换为Standby 状态

    直接将 BPOfferService.bpService ToActive 字段赋值为 null

    1
    2
    3
    4
    // 原来 Active 状态的 Namenode 现在声明自己为 Standby
    if (!nnClaimsActive && bposThinksActive) {
    bpServiceToActive = null;
    }

    最后更新 lastActiveclaimTxid

    1
    2
    3
    4
    if (bpServiceToActive == actor) {
    assert txid >= lastActiveClaimTxId;
    lastActiveClaimTxId = txid;
    }

2.1.4. 执行心跳返回的指令

完成上面的操作以后,offerService()方法就会调用 processCommand() 方法执行 Namenode 通过心跳返回的名字节点指令

1
if (!processCommand(resp.getCommands())) continue;

processCommand() 方法首先遍历心跳响应中携带的指令,然后调用BPOfferService.processCommandFromActor() 方法处理指令。

1
2
3
4
5
6
7
8
9
for (DatanodeCommand cmd : cmds) {
try {
if (bpos.processCommandFromActor(cmd, this) == false) {
return false;
}
} catch (IOException ioe) {
LOG.warn("Error processing datanode Command", ioe);
}
}

2.2. 最近新添加和删除的数据块

Datanode 除了定期向 Namenode 发送心跳外,还需要向 Namenode 汇报 Datanode 最近新添加和删除的数据块(汇报间隔是100*心跳间隔,也就是300秒)。

在 offerService()方法中,调用 reportReceivedDeletedBlocks()方法执行这个操作。

1
2
3
4
5
6
if (sendImmediateIBR || (startTime - lastDeletedReport > dnConf.deleteReportInterval)) {
//发送数据块增量汇报
reportReceivedDeletedBlocks();
// 设置上次数据块增量汇报时间 lastDeletedReport 为 startTime
lastDeletedReport = startTime;
}

2.2.1. 维护 Datastorage 上新添加的数据块信息以及刚删除的数据块信息

pendingIncrementalBRperStorage 是一个 Map,维护 DataStorage 上存储的两次汇报之间新添加和删除的数据块。BPServiceActor 提供了 notifyNamenodeBlock() 以及 notifyNamenodeDeleted Block()方法用于向 pendingIncrementalBRperStorage 中添加更新的数据块信息。

  1. 将新添加的数据块信息加入 pendingIncrementalBRperstorage 中

    notifyNamenodeBlock() 用于添加一个 Datanode 新接收的数据块信息

    对于新添加的数据块,notifyNamenodeBlock() 方法会将 sendlmmediateIBR 字段设置为 true,并且唤醒在pendingIncrementalBRperStorage 上等待新数据块的 offerService()方法,也就是 offerService()会立即将添加数据块的信息发送给 Namenode。

  2. 将刚删除的数据块信息加入 pendingIncrementalBRperstorage 中

    notifyNamenodeDeletedBlock() 用于添加一个 Datanode 新删除的数据块信息。

reportReceivedDeletedBlocks()方法从 pendingIncrementalBRperStorage 对象中取出两次块汇报之间增量的、还没有向 Namenode 汇报的数据块信息。然后将这些信息放入 reports 变量中

2.2.2. 汇报

reportReceivedDeletedBlocks() 通过调用 RPC 方法 DatanodeProtocol.blockReceivedAndDeleted() 将汇报发送给 Namenode。

1
2
3
bpNamenode.blockReceivedAndDeleted(bpRegistration,
bpos.getBlockPoolId(),
reports.toArray(new StorageReceivedDeletedBlocks[reports.size()]));

2.2.3. 失败处理

如果这次 RPC 调用失败,则将 reports 变量中的信息重新放回 pendingIncrementalBRperStorage 中。然后将 sendlmmediateIBR 赋值为 true,立即重发ReceivedDeletedBlockReport。如果当前没有工作且 sendlmmediateIBR 为 false,那么 offerService()方法会在 pendingIncrementalBRperStorage 对象上等待,直到超时或者被唤醒。

1
2
3
4
5
6
7
8
9
10
11
if (!success) {// 汇报不成功的话
synchronized (pendingIncrementalBRperStorage) {
for (StorageReceivedDeletedBlocks report : reports) {
// 将数据块再放回到 pendingIncrementalBRperStorage
PerStoragePendingIncrementalBR perStorageMap =
pendingIncrementalBRperStorage.get(report.getStorage());
perStorageMap.putMissingBlockInfos(report.getBlocks());
sendImmediateIBR = true; // 立即汇报的标志位 sendImmediateIBR 设置为true
}
}
}

2.3. 数据块汇报

Datanode 会在启动时或者间隔 blockReportInterval 时向 Namenode 发送块汇报,包括 Datanode 上存储的所有数据块信息。Namenode 会通过响应返回名字节点指令。

1
2
List<DatanodeCommand> cmds = blockReport();
processCommand(cmds == null ? null : cmds.toArray(new DatanodeCommand[cmds.size()]));

offerService()首先调用blockReport()方法发送块汇报,然后接收 Namenode 返回的名字节点指令 cmds,最后调用 processCommand()方法执行这些指令。

2.3.1. 触发时机

blockReport()方法每隔 blockReportlnterval(默认间隔是6个小时)会触发一次块汇报。

1
2
3
4
// 如果当前时间 startTime 减去上次数据块汇报时间小于数据节点配置的数据块汇报时间间隔的话,直接返回 null,
if (startTime - lastBlockReport <= dnConf.blockReportInterval) {
return null;
}

2.3.2. 方法发送数据块增量汇报

首先调用 DatanodeProtocol.reportReceivedDeletedBlocks() 方法向 Namenode 汇报 Datanode 最近添加与删除的数据块,以避免 Namenode 和 Datanode 元数据不同步。

2.3.3. 获得当前块池的块汇报信息

然后调用FSDataselmpl.getBlockReports() 获得当前块池的块汇报信息。

1
2
Map<DatanodeStorage, BlockListAsLongs> perVolumeBlockLists =
dn.getFSDataset().getBlockReports(bpos.getBlockPoolId());

2.3.4. 块汇报信息转换为 StorageBlockReport 类型

将获取的块汇报信息转换为 StorageBlockReport 类型

1
2
3
4
5
6
7
8
9
10
11
12
13
int i = 0;
//总汇报块数
int totalBlockCount = 0;
// 创建数据块汇报数组 StorageBlockReport,大小为上述 perVolumeBlockLists 的大小
StorageBlockReport reports[] = new StorageBlockReport[perVolumeBlockLists.size()];
// 遍历 perVolumeBlockLists
for(Map.Entry<DatanodeStorage, BlockListAsLongs> kvPair : perVolumeBlockLists.entrySet()) {
BlockListAsLongs blockList = kvPair.getValue();
// 汇报的数据块保存在数组
reports[i++] = new StorageBlockReport(kvPair.getKey(), blockList);
// 累加要汇报的数据块儿数目
totalBlockCount += blockList.getNumberOfBlocks();
}

2.3.5. 汇报

再通过 RPC方法 DatanodeProtocol.blockReport() 发送这个数据块汇报到 Namenode,保存 Namenode 返回的名字节点指令。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 根据数据块总数目判断是否需要多次发送消息
// split 阈值取参数 dfs.blockreport.split.threshold,参数未配置的话默认为1000*1000
if (totalBlockCount < dnConf.blockReportSplitThreshold) {
// 如果数据块总数目在 split 阈值之下,则将所有的数据块汇报信息放在一个消息中发送
// 向 NameNode 发送数据块汇报信息
DatanodeCommand cmd = bpNamenode.blockReport( bpRegistration, bpos.getBlockPoolId(), reports,
new BlockReportContext(1, 0, reportId));
numRPCs = 1;
numReportsSent = reports.length;
// 将数据块汇报后返回的命令 cmd 加入到命令列表cmds
if (cmd != null) {
cmds.add(cmd);
}
} else {
for (int r = 0; r < reports.length; r++) {
StorageBlockReport singleReport[] = { reports[r] };
DatanodeCommand cmd = bpNamenode.blockReport( bpRegistration, bpos.getBlockPoolId(), singleReport,
new BlockReportContext(reports.length, r, reportId));
numReportsSent++;
numRPCs++;
if (cmd != null) {
cmds.add(cmd);
}
}
}

2.3.6. 调度下一次数据块汇报

1
scheduleNextBlockReport(startTime);

2.4. 缓存数据块汇报

缓存汇报的逻辑类似于块汇报,offerService()方法首先调用 cacheReport()方法,向Namenode 汇报当前 Datanode 的缓存情况,然后调用 processCommand()方法执行 Namenode携带回的指令。

1
2
3
// 缓存块汇报
DatanodeCommand cmd = cacheReport();
processCommand(new DatanodeCommand[]{ cmd });

2.5. 启动数据块扫描操作

offerService() 方法会启动当前块池的数据块扫描功能

2.6. 线程睡眠等待

offerService()方法会在 pendingIncrementalBRperStorage 对象上等待,直到下一个心跳周期或者被唤醒。

这里的唤醒操作是在 notifyNamenodeBlock()方法中执行的