refreshNamenodes() 方法是 DataNode 启动的核心代码之一啦!内容很多~,开始看源码~~~过程中我们还会针对 DataNode 锁进行优化哦😯~~~

一、概述

在 DataNode 的构造方法里,初始化了 BlockPoolManager 对象,通过其 blockPoolManager.refreshNamenodes(conf);从配置文件中获取该DataNode 相关的 namenode信息,然后向其发生注册和心跳信息。

1.1. 相关类

1.1.1. BPServiceActor

在 BlockPoolManager 的实现中,使用 BPServiceActor 类负责与一个 Namenode 通信。每 BPServiceActor 的实例都是一个独立运行的线程,这个线程主要实现了以下功能。

  • 与 Namenode 进行第一次握手,获取命名空间的信息。
  • 向 Namenode 注册当前 Datanode。
  • 定期向 Namenode 发送心跳、增量块汇报、全量块汇报以及缓存块汇报。
  • 执行 Namenode 传回的名字节点指令。

1.1.2. BPOfferService

在 HDFS Federation 部署中,一个 HDFS 集群可以定义多个命名空间,每一个命名空间在 Datanode 上都有一个对应的块池存储这个命名空间的数据块,这个块池是由一个 BPOfferService 实例管理的。由于在 HDFS HA 部署中,每个命名空间又会同时拥有两个 Namenode,一个作为活动的(Active) Namenode, 另一个作为热备的(Standby) Namenode,所以每个 BPOfferService 都会包含两个 BPServiceActor 对象。每个 BPServiceActor 对象都封装了与该命名空间中单个 Namenode 的操作,包括定时向这个Namenode 发送心跳(heartbeat)、增量块汇报(blockReceivedAndDeleted )、全量块汇报(blockreport)、缓存块汇报(cacheReport),以及执行 Namenode 通过心跳/块汇报响应传回的名字节点指令等操作。同时为了防止出现脑裂的情况,需要保证命名空间中有且只能有一个处于活动状态的 Namenode, BPOfferService 类还需要管理当前 Datanode 认为是 Active 状态的 Namenode 的引用(通过 bpService ToActive 字段)

1.1.3. BlockPoolManager

BlockPoolManager 用于管理 DataNode上所有的块池。 BlockPoolManager 会持有多个 BPOfferService 对象,每个 BPOfferService 对象都封装了对单个块池操作的 API。

二、源码

开始调试源码~~~

得到时序图~

从配置信息 conf 中获取 nameserviceid->{namenode名称->InetSocketAddress} 的映射集合 newAddressMap,第二步调用 doRefreshNamenodes()方法执行集合 newAddressMap 中 NameNodes 的刷新。

2.1. 从配置信息 conf 中获取 newAddressMap

从配置信息conf中获取 nameserviceid->{namenodeid->InetSocketAddress} 的映射集合 newAddressMap

1
2
Map<String, Map<String, InetSocketAddress>> newAddressMap =
DFSUtil.getNNServiceRpcAddressesForCluster(conf);

2.1.1. 获取 NameNode 的地址(localhost:9000)

1
InetSocketAddress address = NameNode.getAddress(conf);

2.1.2. 获取 hdfs 的内部命名服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Collection<String> parentNameServices = conf.getTrimmedStringCollection(DFSConfigKeys.DFS_INTERNAL_NAMESERVICES_KEY);
if (parentNameServices.isEmpty()) {// 如果没有配置dfs.internal.nameservices
//获取dfs.nameservices,赋值给集合parentNameServices
parentNameServices = conf.getTrimmedStringCollection(DFSConfigKeys.DFS_NAMESERVICES);
} else {
// Ensure that the internal service is ineed in the list of all available
// 获取dfs.nameservices
Set<String> availableNameServices = Sets.newHashSet(conf
.getTrimmedStringCollection(DFSConfigKeys.DFS_NAMESERVICES));
// 验证parentNameServices中的每个nsId在dfs.nameservices中是否都存在
for (String nsId : parentNameServices) {
if (!availableNameServices.contains(nsId)) {
throw new IOException("Unknown nameservice: " + nsId);
}
}
}

2.1.3. 获取 nameserviceId->{nameNodeId->InetSocketAddress} 对应关系的集合

1
Map<String, Map<String, InetSocketAddress>> addressList = getAddressesForNsIds(conf, parentNameServices, defaultAddress, DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, DFS_NAMENODE_RPC_ADDRESS_KEY);
  1. 通过 getAddressesForNameserviceId() 方法获取 nameNodeId->InetSocketAddress 的对应关系,$nameNodeId$ 来自参数 dfs.ha.namenodes.nsId
    • 获取 dfs.ha.namenodes.nsId
    • 根据 keys 获取 address
    • 将 address 封装成 InetSocketAddress,得到 isa
    • nnId->InetSocketAddress 的对应关系放入到 Map 中
  2. nameserviceId->{nameNodeId->InetSocketAddress} 的对应关系放入集合 ret
  3. 最后返回 nameserviceId->{nameNodeId->InetSocketAddress} 对应关系的集合 ret。

2.2. 刷新

doRefreshNamenodes() 方法用于根据 HDFS 配置添加、删除以及更新命名空间。在BlockPoolManager 的实现中,就是对指定命名空间的 BPOfferSerivce 引用进行更新。本方法分成 6 个步骤来完成对命名空间的新增、刷新、删除、以及对新增命名空间添加 BPOfferService 线程,并启动所有线程

1
doRefreshNamenodes(newAddressMap)

确保当前线程在 refreshNamenodesLock 上拥有互斥锁

1
Thread.holdsLock(refreshNamenodesLock)

构造 toRefresh、 toAdd、 toRemove 三个队列,添加、更新、删除 BPOfferService,方法中的 addrMap 变量保存了配置的命名空间列表,bpByNameserviceld 变量则保存了当前BlockPoolManager 中己有的命名空间列表。

  1. 待刷新的 toRefresh

    如果命名空间列表中存在,BlockPoolManager 的 Map 中也存在,则将当前的 nameserviceld 添加到 toRefresh 队列中。

    1
    Set<String> toRefresh = Sets.newLinkedHashSet();
  2. 待添加的 toAdd

    如果命名空间列表中存在,BlockPoolManager 的 Map 中不存在,则将当前的 nameserviceld 添加到 toAdd 队列中。

    1
    Set<String> toAdd = Sets.newLinkedHashSet();

    遍历 addrMap,有多少套联邦,就会有多少 Nameservice,并添加到 toAdd 集合

    针对 nameserviceid->{namenode名称->InetSocketAddress} 的映射集合 newAddressMap 中每个 nameserviceid,确认它是一个完全新 nameservice,还是一个其 NameNode 列表被更新的 nameservice,分别加入待添加 toAdd 和待刷新 toRefresh 集合

    1
    2
    3
    4
    5
    6
    7
    8
    for (String nameserviceId : addrMap.keySet()) {
    // 如果 bpByNameserviceId 集合中存在 nameserviceId,加入待刷新集合 toRefresh,否则加入到待添加集合 toAdd
    if (bpByNameserviceId.containsKey(nameserviceId)) {
    toRefresh.add(nameserviceId);
    } else {
    toAdd.add(nameserviceId);
    }
    }
  3. 待移除的 toRemove

    如果 BlockPoolManager 的 Map 中存在,命名空间列表中不存在,则将当前的 nameserviceld 添加到 toRemove 队列中。

    1
    Set<String> toRemove;

2.2.1. 处理 toAdd 队列

toAdd 队列中保存了需要添加到 BlockPoolManager 中的命名空间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
if (!toAdd.isEmpty()) {
for (String nsToAdd : toAdd) {
// 从 addrMap 中根据 nameserviceId 获取对应 Socket 地址 InetSocketAddress,创建集合 addrs
ArrayList<InetSocketAddress> addrs = Lists.newArrayList(addrMap.get(nsToAdd).values());
BPOfferService bpos = createBPOS(addrs);
// 将 nameserviceId->BPOfferService 的对应关系添加到集合 bpByNameserviceId 中
bpByNameserviceId.put(nsToAdd, bpos);
// 将 BPOfferService 添加到集合 offerServices 中
offerServices.add(bpos);
}
}
// TODO 启动所有 offerServices 的 BPOfferService
startAll();
}
  1. 创建 BPOfferService 对象

    refreshNamenodes() 方法会首先调用 createBPOS() 方法创建 BPOfferService 对象

    1
    2
    3
    protected BPOfferService createBPOS(List<InetSocketAddress> nnAddrs) {
    return new BPOfferService(nnAddrs, dn);
    }

    BPOfferService 的构造方法会对命名空间中定义的每一个 Namenode 构造其对应的 BPServiceActor 对象,并将这些新创建的 BPServiceActor 对象放入 BPOferService 的数据结构中保存。

    1
    2
    3
    4
    5
    6
    7
    BPOfferService(List<InetSocketAddress> nnAddrs, DataNode dn) {
    this.dn = dn;
    // 每个 Namenode 一个 BPServiceActor
    for (InetSocketAddress addr : nnAddrs) {
    this.bpServices.add(new BPServiceActor(addr, this));
    }
    }
  2. BPOfferService 构造完成后,将会放入BlockPoolManager.bpByNameserviceld 映射中保存

    将 nameserviceId->BPOfferService 的对应关系添加到集合 bpByNameserviceId 中

  3. 添加新创建的 BPOfferService

调用 startAll() 方法启动所有的 BPOfferService, BPOfferService 会级联启动所有的 BPServiceActor 的工作线程。通过BPOfferService#start()方法循环启动 BPServiceActor 线程,BPServiceActor 向其对应的 namenode发送注册和心跳消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
synchronized void startAll() throws IOException {
try {
UserGroupInformation.getLoginUser().doAs( new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
//逐个调用 BPOfferService#start(),启动BPOfferService
for (BPOfferService bpos : offerServices) {
bpos.start();
}
return null;
}
});
} catch (InterruptedException ex) {
//...
}
}

BPServiceActor 是个线程,具体的实现方法在BPServiceActor#run()方法中。所以我们定位到其 run()方法里~~~

  1. 与 NameNode握手,注册

    BPServiceActor.run()方法会首先调用 BPServiceActor.connectToNNAndHandshake()方法与Namenode 握手并初始化 Datanode 上该命名空间对应块池(BlockPool)的存储,然后在该Namenode 上注册当前 Datanode。

    1
    connectToNNAndHandshake();

  2. offerService()

    执行完初始化操作之后,run()方法会循环调用 BPServiceActor.offerService()方法定期向Namenode 发送心跳、块汇报、缓存汇报以及增量汇报。offerService()方法还会处理 Namenode通过响应带回的名字节点指令。

    offerService() 方法会一直循环运行,直到 Datanode 关闭或者客户端调用 ClientDatanodeProtocol.refreshNodes() 重新加载 Namenode 配置。

    1
    offerService();

2.2.2. 处理 toRemove 集合元素

toRemove 队列中保存了要从 BlockPoolManager 中删除的命名空间。对于 toRemove 队列中保存的命名空间,停止它对应的 BPOfferService 的服务,也就是停止 BPOfferService 中所有的 BPServiceActor 线程。当 BPOfferService 的所有 BPServiceActor 线程都停止后,会自动
从 BlockPoolManager.bpByNameserviceld 映射中移除当前的 BPOfferService 对象。

1
2
3
4
5
6
7
if (!toRemove.isEmpty()) {
for (String nsToRemove : toRemove) {
BPOfferService bpos = bpByNameserviceId.get(nsToRemove);
bpos.stop();
bpos.join();
}
}

2.2.3. 处理 toRefresh 队列

对于 toRefresh 队列中保存的命名空间,则调用 BPOfferService.refreshNNList() 方法更新该命名空间中的 Namenode 列表。

1
2
3
4
5
6
7
8
9
if (!toRefresh.isEmpty()) {
for (String nsToRefresh : toRefresh) {
BPOfferService bpos = bpByNameserviceId.get(nsToRefresh);
// 根据 BPOfferService 从配置信息 addrMap 中取出 NN 的 Socket 地址 InetSocketAddress,形成列表 addrs
ArrayList<InetSocketAddress> addrs = Lists.newArrayList(addrMap.get(nsToRefresh).values());
// 调用 BPOfferService 的 refreshNNList() 方法根据 addrs 刷新 NN 列表
bpos.refreshNNList(addrs);
}
}

三、源码修改

3.1. DataNode 锁优化

3.1.1. 存在问题

总体来说HDFS的锁是用得比较重的,有很多地方都值得优化,本次我们挑一个典型的地方进行锁的优化。我们看一下BPOfferService这个类,里面有读写锁的使用。

心跳 DatanodeProtocol.sendHeartbeat() 方法是一个高频的操作,心跳的方法会调用getBlockPoolId() 方法

在 getBlockPoolId() 方法中调用了写锁

上面心跳的方法 DatanodeProtocol.sendHeartbeat() 是一个高频的操作,默认3 秒一次,也就是说会因为调用getBlockPoolId方法而高频的加了读锁。

3.1.2. 修改

  1. 在 BPOfferService 新增一个 变量 blockPoolID,使用 volatile 保证其可见性

  2. blockPoolID 赋值

    BPOfferService 通过 bpNSInfo 字段保存当前块池对应的命名空间信息。如果这个字段为空,则证明当前 BPServiceActor 对应 Namenode 是第一个响应握手请求的 Namenode。

    verifyAndSetNamespacelnfo() 方法会首先对 bpNSInfo 赋值,然后调用 Datanode.initBlockPool() 初始化命名空间对应块池的本地存储。如果如果不是第一次连接,则命名空间中定义的另一个 Namenode 已经提前注册并且初始化了本地存储

  3. getBlockPoolId() 方法优化如下: