Hadoop-组件-HDFS-源码学习-集群启动-DataNode 启动-refreshNamenodes
refreshNamenodes() 方法是 DataNode 启动的核心代码之一啦!内容很多~,开始看源码~~~过程中我们还会针对 DataNode 锁进行优化哦😯~~~
一、概述
在 DataNode 的构造方法里,初始化了 BlockPoolManager 对象,通过其 blockPoolManager.refreshNamenodes(conf);从配置文件中获取该DataNode 相关的 namenode信息,然后向其发生注册和心跳信息。
1.1. 相关类
1.1.1. BPServiceActor
在 BlockPoolManager 的实现中,使用 BPServiceActor 类负责与一个 Namenode 通信。每 BPServiceActor 的实例都是一个独立运行的线程,这个线程主要实现了以下功能。
- 与 Namenode 进行第一次握手,获取命名空间的信息。
- 向 Namenode 注册当前 Datanode。
- 定期向 Namenode 发送心跳、增量块汇报、全量块汇报以及缓存块汇报。
- 执行 Namenode 传回的名字节点指令。
1.1.2. BPOfferService
在 HDFS Federation 部署中,一个 HDFS 集群可以定义多个命名空间,每一个命名空间在 Datanode 上都有一个对应的块池存储这个命名空间的数据块,这个块池是由一个 BPOfferService 实例管理的。由于在 HDFS HA 部署中,每个命名空间又会同时拥有两个 Namenode,一个作为活动的(Active) Namenode, 另一个作为热备的(Standby) Namenode,所以每个 BPOfferService 都会包含两个 BPServiceActor 对象。每个 BPServiceActor 对象都封装了与该命名空间中单个 Namenode 的操作,包括定时向这个Namenode 发送心跳(heartbeat)、增量块汇报(blockReceivedAndDeleted )、全量块汇报(blockreport)、缓存块汇报(cacheReport),以及执行 Namenode 通过心跳/块汇报响应传回的名字节点指令等操作。同时为了防止出现脑裂的情况,需要保证命名空间中有且只能有一个处于活动状态的 Namenode, BPOfferService 类还需要管理当前 Datanode 认为是 Active 状态的 Namenode 的引用(通过 bpService ToActive 字段)
1.1.3. BlockPoolManager
BlockPoolManager 用于管理 DataNode上所有的块池。 BlockPoolManager 会持有多个 BPOfferService 对象,每个 BPOfferService 对象都封装了对单个块池操作的 API。
二、源码
开始调试源码~~~
得到时序图~
从配置信息 conf 中获取 nameserviceid->{namenode名称->InetSocketAddress} 的映射集合 newAddressMap,第二步调用 doRefreshNamenodes()方法执行集合 newAddressMap 中 NameNodes 的刷新。
2.1. 从配置信息 conf 中获取 newAddressMap
从配置信息conf中获取 nameserviceid->{namenodeid->InetSocketAddress} 的映射集合 newAddressMap
1 | Map<String, Map<String, InetSocketAddress>> newAddressMap = |
2.1.1. 获取 NameNode 的地址(localhost:9000)
1 | InetSocketAddress address = NameNode.getAddress(conf); |
2.1.2. 获取 hdfs 的内部命名服务
1 | Collection<String> parentNameServices = conf.getTrimmedStringCollection(DFSConfigKeys.DFS_INTERNAL_NAMESERVICES_KEY); |
2.1.3. 获取 nameserviceId->{nameNodeId->InetSocketAddress}
对应关系的集合
1 | Map<String, Map<String, InetSocketAddress>> addressList = getAddressesForNsIds(conf, parentNameServices, defaultAddress, DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, DFS_NAMENODE_RPC_ADDRESS_KEY); |
- 通过 getAddressesForNameserviceId() 方法获取
nameNodeId->InetSocketAddress
的对应关系,$nameNodeId$ 来自参数dfs.ha.namenodes.nsId
- 获取
dfs.ha.namenodes.nsId
- 根据 keys 获取 address
- 将 address 封装成 InetSocketAddress,得到 isa
- 将
nnId->InetSocketAddress
的对应关系放入到 Map 中
- 获取
- 将 nameserviceId->{nameNodeId->InetSocketAddress} 的对应关系放入集合 ret
- 最后返回 nameserviceId->{nameNodeId->InetSocketAddress} 对应关系的集合 ret。
2.2. 刷新
doRefreshNamenodes() 方法用于根据 HDFS 配置添加、删除以及更新命名空间。在BlockPoolManager 的实现中,就是对指定命名空间的 BPOfferSerivce 引用进行更新。本方法分成 6 个步骤来完成对命名空间的新增、刷新、删除、以及对新增命名空间添加 BPOfferService 线程,并启动所有线程
1 | doRefreshNamenodes(newAddressMap) |
确保当前线程在 refreshNamenodesLock 上拥有互斥锁
1 | Thread.holdsLock(refreshNamenodesLock) |
构造 toRefresh、 toAdd、 toRemove 三个队列,添加、更新、删除 BPOfferService,方法中的 addrMap 变量保存了配置的命名空间列表,bpByNameserviceld 变量则保存了当前BlockPoolManager 中己有的命名空间列表。
待刷新的 toRefresh
如果命名空间列表中存在,BlockPoolManager 的 Map 中也存在,则将当前的 nameserviceld 添加到 toRefresh 队列中。
1
Set<String> toRefresh = Sets.newLinkedHashSet();
待添加的 toAdd
如果命名空间列表中存在,BlockPoolManager 的 Map 中不存在,则将当前的 nameserviceld 添加到 toAdd 队列中。
1
Set<String> toAdd = Sets.newLinkedHashSet();
遍历 addrMap,有多少套联邦,就会有多少 Nameservice,并添加到 toAdd 集合
针对 nameserviceid->{namenode名称->InetSocketAddress} 的映射集合 newAddressMap 中每个 nameserviceid,确认它是一个完全新 nameservice,还是一个其 NameNode 列表被更新的 nameservice,分别加入待添加 toAdd 和待刷新 toRefresh 集合
1
2
3
4
5
6
7
8for (String nameserviceId : addrMap.keySet()) {
// 如果 bpByNameserviceId 集合中存在 nameserviceId,加入待刷新集合 toRefresh,否则加入到待添加集合 toAdd
if (bpByNameserviceId.containsKey(nameserviceId)) {
toRefresh.add(nameserviceId);
} else {
toAdd.add(nameserviceId);
}
}待移除的 toRemove
如果 BlockPoolManager 的 Map 中存在,命名空间列表中不存在,则将当前的 nameserviceld 添加到 toRemove 队列中。
1
Set<String> toRemove;
2.2.1. 处理 toAdd 队列
toAdd 队列中保存了需要添加到 BlockPoolManager 中的命名空间
1 | if (!toAdd.isEmpty()) { |
创建 BPOfferService 对象
refreshNamenodes() 方法会首先调用 createBPOS() 方法创建 BPOfferService 对象
1
2
3protected BPOfferService createBPOS(List<InetSocketAddress> nnAddrs) {
return new BPOfferService(nnAddrs, dn);
}BPOfferService 的构造方法会对命名空间中定义的每一个 Namenode 构造其对应的 BPServiceActor 对象,并将这些新创建的 BPServiceActor 对象放入 BPOferService 的数据结构中保存。
1
2
3
4
5
6
7BPOfferService(List<InetSocketAddress> nnAddrs, DataNode dn) {
this.dn = dn;
// 每个 Namenode 一个 BPServiceActor
for (InetSocketAddress addr : nnAddrs) {
this.bpServices.add(new BPServiceActor(addr, this));
}
}BPOfferService 构造完成后,将会放入BlockPoolManager.bpByNameserviceld 映射中保存
将 nameserviceId->BPOfferService 的对应关系添加到集合 bpByNameserviceId 中
添加新创建的 BPOfferService
调用 startAll() 方法启动所有的 BPOfferService, BPOfferService 会级联启动所有的 BPServiceActor 的工作线程。通过BPOfferService#start()方法循环启动 BPServiceActor 线程,BPServiceActor 向其对应的 namenode发送注册和心跳消息。
1 | synchronized void startAll() throws IOException { |
BPServiceActor 是个线程,具体的实现方法在BPServiceActor#run()方法中。所以我们定位到其 run()方法里~~~
与 NameNode握手,注册
BPServiceActor.run()方法会首先调用 BPServiceActor.connectToNNAndHandshake()方法与Namenode 握手并初始化 Datanode 上该命名空间对应块池(BlockPool)的存储,然后在该Namenode 上注册当前 Datanode。
1
connectToNNAndHandshake();
引用本站文章Hadoop-组件-HDFS-源码学习-集群启动-DataNode-注册JokerofferService()
执行完初始化操作之后,run()方法会循环调用 BPServiceActor.offerService()方法定期向Namenode 发送心跳、块汇报、缓存汇报以及增量汇报。offerService()方法还会处理 Namenode通过响应带回的名字节点指令。
offerService() 方法会一直循环运行,直到 Datanode 关闭或者客户端调用 ClientDatanodeProtocol.refreshNodes() 重新加载 Namenode 配置。
1
offerService();
心跳
引用本站文章Hadoop-组件-HDFS-源码学习-集群启动-DataNode-心跳Joker数据块管理
Datanode 除了定期向 Namenode 发送心跳外,还需要向 Namenode 汇报 Datanode 最近新添加和删除的数据块(汇报间隔是100*心跳间隔,也就是300秒)。在 offerService() 方法中,调用 reporReceivedDeletedBlocks() 方法执行这个操作。
引用本站文章Hadoop-组件-HDFS-源码学习-集群启动-DataNode-数据块管理Joker
2.2.2. 处理 toRemove 集合元素
toRemove 队列中保存了要从 BlockPoolManager 中删除的命名空间。对于 toRemove 队列中保存的命名空间,停止它对应的 BPOfferService 的服务,也就是停止 BPOfferService 中所有的 BPServiceActor 线程。当 BPOfferService 的所有 BPServiceActor 线程都停止后,会自动
从 BlockPoolManager.bpByNameserviceld 映射中移除当前的 BPOfferService 对象。
1 | if (!toRemove.isEmpty()) { |
2.2.3. 处理 toRefresh 队列
对于 toRefresh 队列中保存的命名空间,则调用 BPOfferService.refreshNNList() 方法更新该命名空间中的 Namenode 列表。
1 | if (!toRefresh.isEmpty()) { |
三、源码修改
3.1. DataNode 锁优化
3.1.1. 存在问题
总体来说HDFS的锁是用得比较重的,有很多地方都值得优化,本次我们挑一个典型的地方进行锁的优化。我们看一下BPOfferService这个类,里面有读写锁的使用。
心跳 DatanodeProtocol.sendHeartbeat() 方法是一个高频的操作,心跳的方法会调用getBlockPoolId() 方法
在 getBlockPoolId() 方法中调用了写锁
上面心跳的方法 DatanodeProtocol.sendHeartbeat() 是一个高频的操作,默认3 秒一次,也就是说会因为调用getBlockPoolId方法而高频的加了读锁。
3.1.2. 修改
在 BPOfferService 新增一个 变量 blockPoolID,使用 volatile 保证其可见性
blockPoolID 赋值
BPOfferService 通过
bpNSInfo
字段保存当前块池对应的命名空间信息。如果这个字段为空,则证明当前 BPServiceActor 对应 Namenode 是第一个响应握手请求的 Namenode。verifyAndSetNamespacelnfo() 方法会首先对
bpNSInfo
赋值,然后调用 Datanode.initBlockPool() 初始化命名空间对应块池的本地存储。如果如果不是第一次连接,则命名空间中定义的另一个 Namenode 已经提前注册并且初始化了本地存储getBlockPoolId() 方法优化如下: