Hadoop-组件-HDFS-源码学习-集群启动-NameNode 启动-startCommonServices
终于到 NameNode启动的最后一步啦~, 开始看 startCommonService()~~~~
一、概述
FSNamesystem 是 NameNode 核心成员变量用来管理元数据,FSNamesystem的 startCommonServices() 将启动一些磁盘检查、安全模式等一些后台服务及线程等一些公共服务,同时 NameNode 的 RPC 服务就是在这里启动,包括 clientRpcServer 和 serviceRpcServer,NameNode要跟其它服务进行 rpc 通信远程调用其它服务的方法
二、源码分析
开始源码调试~~~
2.1. namesystem.startCommonServices(conf, haContext)
FSNamesystem 是 NameNode 核心成员变量用来管理元数据(实现对DataNode、Block的管理以及读写日志),创建NameNodeResourceChecker、激活BlockManager等
1 | void startCommonServices(Configuration conf, HAContext haContext) throws IOException |
2.1.1. 磁盘资源检查
1 | nnResourceChecker = new NameNodeResourceChecker(conf); |
这个构造方法主要是:
声明 namenode 容忍的磁盘大小的阈值
封装好需要检查的磁盘路径(fsimage和edits)
将需要检查的磁盘路径通过 addDirToCheck() 添加到 volumes 这个 map 集合里面,然后在 FSNameSystem 中有一个 NameNodeResourceMonitor 线程,不断的调用 checkAvailableResources 方法来检查磁盘的资源情况
在构造方法中变量的初始化
volumes
初始化 volumes Map,用来存放需要进行磁盘空间检查的路径
1
volumes = new HashMap<String, CheckedVolume>();
duReserved
最小容忍度
磁盘空间默认最小是 100M
1
duReserved = conf.getLong(DFSConfigKeys.DFS_NAMENODE_DU_RESERVED_KEY,DFSConfigKeys.DFS_NAMENODE_DU_RESERVED_DEFAULT);
extraCheckedVolumes
除了本地的 edits 外,本地的其他目录,其实就是指 fsimage
1
2Collection<URI> extraCheckedVolumes = Util.stringCollectionAsURIs(conf
.getTrimmedStringCollection(DFSConfigKeys.DFS_NAMENODE_CHECKED_VOLUMES_KEY));localEditDirs
1
2
3
4
5
6
7
8
9
10
11Collection<URI> localEditDirs = Collections2.filter(
FSNamesystem.getNamespaceEditsDirs(conf),
new Predicate<URI>() {
public boolean apply(URI input) {
if (input.getScheme().equals(NNStorage.LOCAL_URI_SCHEME)) {
return true;
}
return false;
}
});minimumRedundantVolumes
1
2
3minimumRedundantVolumes = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_CHECKED_VOLUMES_MINIMUM_KEY,
DFSConfigKeys.DFS_NAMENODE_CHECKED_VOLUMES_MINIMUM_DEFAULT);
2.1.2. 检查可用资源
NameNodeResourceChecker 是实际资源检查的类,调用的是类里面的 hasAvailableDiskSpace 方法,监控 NameNode 主机上的磁盘还是否可用。如果一旦发现有资源不足的情况,会使 NameNode 进入安全模式。如果随后返回的状态代表资源大小到达可使用的级别,那么这个线程就使NameNode退出安全模式。依照这个注释,去解读run()方法的代码逻辑:在一个 while 循环里,首先判断资源是否可用,如果不可用,日志里就会发出一个警告信息,然后调用 enterSafeMode(); 进入安全模式。
1 | checkAvailableResources(); |
通过时序图 将核心代码定位于 NameNodeResourcePolicy#areResourcesAvailable()
中,主要对volumns里面的url进行检查,看看这些url路径是否可用,是否满足继续运行的最小资源数
局部变量
需要的数量
如果不是当前 namenode 需要的资源(edits路径),则redundantResourceCount++;
1
int requiredResourceCount = 0;
冗余的数量
如果目录不可用,则
disabledRedundantResourceCount++
1
int redundantResourceCount = 0;
无法使用的冗余资源数
如果当前的路径是namenode需要的,并且空间不够100M,那么返回false,直接进入安全模式
1
int disabledRedundantResourceCount = 0;
返回值
冗余的数量 - 无法使用的冗余资源数 >= 继续运行所需要的最少冗余资源数
1
return redundantResourceCount - disabledRedundantResourceCount >=minimumRedundantResources;
同时 NameNode 启动时会启动一个守护线程 SafeModeMonittor,每隔一秒钟去检查资源,如果资源满足条件,则推出安全模式
2.1.3. 设置安全状态
无论是 ActiveNamenode 还是 StandbyNameNode 在刚启动时,都是处于安全模式下
1 | prog.beginPhase(Phase.SAFEMODE); |
2.1.4. 设置所有的 block
设置所有的 block,用于后面判断是否进入安全模式
1 | //处于一个等待汇报blocks的状态 |
![](https://hexo-blog-3494269.oss-cn-beijing.aliyuncs.com/images/%E6%88%AA%E5%B1%8F2022-03-12%20%E4%B8%8A%E5%8D%8812.42.50.png?x-oss-process=image/watermark,type_d3F5LXplbmhlaQ,text_d3d3Lnp4Y2hvbWUuY29tCg==,size_36,g_center,color_FFFFFF,shadow_100,t_100,g_se,x_30,y_130)
1 | safeMode.setBlockTotal((int)getCompleteBlocksTotal()); |
getCompleteBlocksTotal()
获取系统中的 COMPLETE 块总数。
NameNode 端的 block 有四种状态
- UnderConstruction(正在被写入的状态)
- UnderRecovery(正在被恢复的块)
- Committed(已经确定好它的字节大小与generation stamp值(可理解为版本号))
- Complete(写入执行操作结束状态)
启动的时候 ,namenode认为只有block状态为 Complete,才会被读取
1
2
3
4
5
6
7
8
9
10
11private long getCompleteBlocksTotal() {
// Calculate number of blocks under construction
long numUCBlocks = 0;
readLock();
numUCBlocks = leaseManager.getNumUnderConstructionBlocks();
try {
return getBlocksTotal() - numUCBlocks;
} finally {
readUnlock();
}
}获取所有正在构建的 block 块
1
leaseManager.getNumUnderConstructionBlocks();
该方法所有操作在一个 for 循环里,Lease对应HDFS一个目录, for 循环遍历所有的目录
读取文件
1
cons = this.fsnamesystem.getFSDirectory().getINode(path).asFile();
获取一个文件的所有block
1
BlockInfoContiguous[] blocks = cons.getBlocks();
遍历文件的所有 block,记录正在构建的block
1
2
3
4
5for(BlockInfoContiguous b : blocks) {
if(!b.isComplete())
// 记录正在构建的 block
numUCBlocks++;
}获取所有正常使用的 block 个数 (Complete)
1
2
3public long getBlocksTotal() {
return blockManager.getTotalBlocks();
}
设置 block 块
1 | safeMode.setBlockTotal() |
得到满足安全模式阈值条件所需的块数
填充复制队列之前所需的块数
检查是否需要进入安全模式
安全模式是HDFS的一种工作状态,处于安全模式的状态下,只向客户端提供文件的只读视图,不接受对命名空间的修改;同时 NameNode 节点也不会进行数据块的复制或者删除,如:副本的数量小于正常水平。
根据这个方法判断是否要进入安全模式
1
checkMode();
判断是否进入安全模式
1
if (smmthread == null && needEnter())
needEnter()
1
2
3
4
5private boolean needEnter() {
return (threshold != 0 && blockSafe < blockThreshold) ||
(datanodeThreshold != 0 && getNumLiveDataNodes() < datanodeThreshold) ||
(!nameNodeHasResourcesAvailable());
}进入安全模式有3个条件,任何一个满足,都会进入安全模式
threshold = 0.999f 不为 null && 汇报过来的数据块数 < 安全阈值(1000=999)
如果有 1000 个block,那么阈值blockThreshold=999 但是如果集群启动,datanode汇报过来的累计的块数小于<999,那么就会进入安全模式
1
threshold != 0 && blockSafe < blockThreshold
启动集群后,存活的datanode个数小于datanodeThreshold(默认0),则进入安全模式
1
datanodeThreshold != 0 && getNumLiveDataNodes() < datanodeThreshold
检查磁盘,是否大于100M
1
! nameNodeHasResourcesAvailable()
enter()
reached = -1 代表退出安全模式,reached = 0 代表进入安全模式
1
2
3
4private void enter() {
this.reached = 0;
this.reachedTimestamp = 0;
}退出安全模式
1
2
3
4if (!isOn() || extension <= 0 || threshold <= 0) { // don't need to wait
this.leave(); // 退出安全模式
return;
}判断是否开启复制以及删除数据块的操作
1
2
3if (!isPopulatingReplQueues() && shouldPopulateReplQueues()) {
initializeReplQueues();
}退出安全模式
这儿我们把 reached = -1 就代表退出安全模式啦~
1
2
3reached = -1;//退出安全模式
reachedTimestamp = -1;
safeMode = null;
启动 SafeModeMonitor 线程
启动 SafeModeMonitor 线程,每隔1秒去查看下,是否可以退出安全模式
1
2
3
4
5if (smmthread == null) {
smmthread = new Daemon(new SafeModeMonitor());
smmthread.start();
reportStatus("STATE* Safe mode extension entered.", true);
}主要步骤:
判断是否满足退出安全模式,如果满足则推出安全模式
1
2
3
4
5
6if (safeMode.canLeave()) {
// Leave safe mode.
safeMode.leave();
smmthread = null;
break;
}通过 3 个条件来判断是否可以离开安全模式
reached是否为 -1
1
2
3if (reached == 0) {
return false;
}在满足最小副本条件之后,namenode还需要处于安全模式的时间(30s)
1
2
3
4
5//extension 默认30s,也就是满足最低副本系数之后,离开安全模式的时间,这个时间用于等待剩余数据节点的数据块上报
if (monotonicNow() - reached < extension) {
reportStatus("STATE* Safe mode ON, in safe mode extension.", false);
return false;
}needEnter里面的3个条件
1
2
3
4
5private boolean needEnter() {
return (threshold != 0 && blockSafe < blockThreshold) ||
(datanodeThreshold != 0 && getNumLiveDataNodes() < datanodeThreshold) ||
(!nameNodeHasResourcesAvailable());
}
2.1.4. 激活BlockManager
启动 BlockManager 里面关于block副本处理的后台线程
1 | blockManager.activate(conf); |
- 启动等待复制线程
- 启动管理心跳服务
2.2. 启动 RPC 服务
1 | rpcServer.start(); |
2.2.1. 启动 clientRpcServer
1 | clientRpcServer.start(); |
2.2.2. 启动 serviceRpcServer
1 | if (serviceRpcServer != null) { |
三、总结
所以,startCommonServices() 主要作用:
- 将需要检查的URL添加到volumes中 , 后台有线程会一直执行hasAvailableDiskSpace来检查
- checkAvailableResources(); 进行资源检查
- NameNode启动,进入到safemode阶段,处于一个等待汇报blocks的状态
- 汇报所有的block,用于后面判断是否进入安全模式
- 激活BlockManager