终于到 NameNode启动的最后一步啦~, 开始看 startCommonService()~~~~

一、概述

FSNamesystem 是 NameNode 核心成员变量用来管理元数据,FSNamesystem的 startCommonServices() 将启动一些磁盘检查、安全模式等一些后台服务及线程等一些公共服务,同时 NameNode 的 RPC 服务就是在这里启动,包括 clientRpcServer 和 serviceRpcServer,NameNode要跟其它服务进行 rpc 通信远程调用其它服务的方法

二、源码分析

开始源码调试~~~

2.1. namesystem.startCommonServices(conf, haContext)

FSNamesystem 是 NameNode 核心成员变量用来管理元数据(实现对DataNode、Block的管理以及读写日志),创建NameNodeResourceChecker、激活BlockManager等

1
void startCommonServices(Configuration conf, HAContext haContext) throws IOException 

2.1.1. 磁盘资源检查

1
nnResourceChecker = new NameNodeResourceChecker(conf);

这个构造方法主要是:

  • 声明 namenode 容忍的磁盘大小的阈值

  • 封装好需要检查的磁盘路径(fsimage和edits)

  • 将需要检查的磁盘路径通过 addDirToCheck() 添加到 volumes 这个 map 集合里面,然后在 FSNameSystem 中有一个 NameNodeResourceMonitor 线程,不断的调用 checkAvailableResources 方法来检查磁盘的资源情况

  1. 在构造方法中变量的初始化

    • volumes

      初始化 volumes Map,用来存放需要进行磁盘空间检查的路径

      1
      volumes = new HashMap<String, CheckedVolume>();
    • duReserved

      最小容忍度

      磁盘空间默认最小是 100M

      1
      duReserved = 	conf.getLong(DFSConfigKeys.DFS_NAMENODE_DU_RESERVED_KEY,DFSConfigKeys.DFS_NAMENODE_DU_RESERVED_DEFAULT);

    • extraCheckedVolumes

      除了本地的 edits 外,本地的其他目录,其实就是指 fsimage

      1
      2
      Collection<URI> extraCheckedVolumes = Util.stringCollectionAsURIs(conf
      .getTrimmedStringCollection(DFSConfigKeys.DFS_NAMENODE_CHECKED_VOLUMES_KEY));
    • localEditDirs

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      Collection<URI> localEditDirs = Collections2.filter(
      FSNamesystem.getNamespaceEditsDirs(conf),
      new Predicate<URI>() {
      @Override
      public boolean apply(URI input) {
      if (input.getScheme().equals(NNStorage.LOCAL_URI_SCHEME)) {
      return true;
      }
      return false;
      }
      });
    • minimumRedundantVolumes

      1
      2
      3
      minimumRedundantVolumes = conf.getInt(
      DFSConfigKeys.DFS_NAMENODE_CHECKED_VOLUMES_MINIMUM_KEY,
      DFSConfigKeys.DFS_NAMENODE_CHECKED_VOLUMES_MINIMUM_DEFAULT);

2.1.2. 检查可用资源

NameNodeResourceChecker 是实际资源检查的类,调用的是类里面的 hasAvailableDiskSpace 方法,监控 NameNode 主机上的磁盘还是否可用。如果一旦发现有资源不足的情况,会使 NameNode 进入安全模式。如果随后返回的状态代表资源大小到达可使用的级别,那么这个线程就使NameNode退出安全模式。依照这个注释,去解读run()方法的代码逻辑:在一个 while 循环里,首先判断资源是否可用,如果不可用,日志里就会发出一个警告信息,然后调用 enterSafeMode(); 进入安全模式。

1
checkAvailableResources();

通过时序图 将核心代码定位于 NameNodeResourcePolicy#areResourcesAvailable()中,主要对volumns里面的url进行检查,看看这些url路径是否可用,是否满足继续运行的最小资源数

  1. 局部变量

    • 需要的数量

      如果不是当前 namenode 需要的资源(edits路径),则redundantResourceCount++;

      1
      int requiredResourceCount = 0;
    • 冗余的数量

      如果目录不可用,则 disabledRedundantResourceCount++

      1
      int redundantResourceCount = 0;
    • 无法使用的冗余资源数

      如果当前的路径是namenode需要的,并且空间不够100M,那么返回false,直接进入安全模式

      1
      int disabledRedundantResourceCount = 0;
  2. 返回值

    冗余的数量 - 无法使用的冗余资源数 >= 继续运行所需要的最少冗余资源数

    1
    return redundantResourceCount - disabledRedundantResourceCount >=minimumRedundantResources;

    同时 NameNode 启动时会启动一个守护线程 SafeModeMonittor,每隔一秒钟去检查资源,如果资源满足条件,则推出安全模式

2.1.3. 设置安全状态

无论是 ActiveNamenode 还是 StandbyNameNode 在刚启动时,都是处于安全模式下

1
prog.beginPhase(Phase.SAFEMODE);

2.1.4. 设置所有的 block

设置所有的 block,用于后面判断是否进入安全模式

1
2
3
4
//处于一个等待汇报blocks的状态
prog.setTotal(Phase.SAFEMODE, STEP_AWAITING_REPORTED_BLOCKS, getCompleteBlocksTotal());
//设置所有的block,用于后面判断是否进入安全模式
setBlockTotal();
1
safeMode.setBlockTotal((int)getCompleteBlocksTotal());
  1. getCompleteBlocksTotal()

    获取系统中的 COMPLETE 块总数。

    NameNode 端的 block 有四种状态

    • UnderConstruction(正在被写入的状态)
    • UnderRecovery(正在被恢复的块)
    • Committed(已经确定好它的字节大小与generation stamp值(可理解为版本号))
    • Complete(写入执行操作结束状态)

    启动的时候 ,namenode认为只有block状态为 Complete,才会被读取

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    private long getCompleteBlocksTotal() {
    // Calculate number of blocks under construction
    long numUCBlocks = 0;
    readLock();
    numUCBlocks = leaseManager.getNumUnderConstructionBlocks();
    try {
    return getBlocksTotal() - numUCBlocks;
    } finally {
    readUnlock();
    }
    }
    • 获取所有正在构建的 block 块

      1
      leaseManager.getNumUnderConstructionBlocks();

      该方法所有操作在一个 for 循环里,Lease对应HDFS一个目录, for 循环遍历所有的目录

      • 读取文件

        1
        cons = this.fsnamesystem.getFSDirectory().getINode(path).asFile();
      • 获取一个文件的所有block

        1
        BlockInfoContiguous[] blocks = cons.getBlocks();
      • 遍历文件的所有 block,记录正在构建的block

        1
        2
        3
        4
        5
        for(BlockInfoContiguous b : blocks) {
        if(!b.isComplete())
        // 记录正在构建的 block
        numUCBlocks++;
        }
      • 获取所有正常使用的 block 个数 (Complete)

        1
        2
        3
        public long getBlocksTotal() {
        return blockManager.getTotalBlocks();
        }
  2. 设置 block 块

1
safeMode.setBlockTotal()
  • 得到满足安全模式阈值条件所需的块数

  • 填充复制队列之前所需的块数

  • 检查是否需要进入安全模式

    安全模式是HDFS的一种工作状态,处于安全模式的状态下,只向客户端提供文件的只读视图,不接受对命名空间的修改;同时 NameNode 节点也不会进行数据块的复制或者删除,如:副本的数量小于正常水平。

    根据这个方法判断是否要进入安全模式

    1
    checkMode();
    • 判断是否进入安全模式

      1
      if (smmthread == null && needEnter())
      • needEnter()

        1
        2
        3
        4
        5
        private boolean needEnter() {
        return (threshold != 0 && blockSafe < blockThreshold) ||
        (datanodeThreshold != 0 && getNumLiveDataNodes() < datanodeThreshold) ||
        (!nameNodeHasResourcesAvailable());
        }

        进入安全模式有3个条件,任何一个满足,都会进入安全模式

        • threshold = 0.999f 不为 null && 汇报过来的数据块数 < 安全阈值(1000=999)

          如果有 1000 个block,那么阈值blockThreshold=999 但是如果集群启动,datanode汇报过来的累计的块数小于<999,那么就会进入安全模式

          1
          threshold != 0 && blockSafe < blockThreshold
        • 启动集群后,存活的datanode个数小于datanodeThreshold(默认0),则进入安全模式

          1
          datanodeThreshold != 0 && getNumLiveDataNodes() < datanodeThreshold
        • 检查磁盘,是否大于100M

          1
          ! nameNodeHasResourcesAvailable()
      • enter()

        reached = -1 代表退出安全模式,reached = 0 代表进入安全模式

        1
        2
        3
        4
        private void enter() {
        this.reached = 0;
        this.reachedTimestamp = 0;
        }
      • 退出安全模式

        1
        2
        3
        4
        if (!isOn() || extension <= 0 || threshold <= 0) {  // don't need to wait
        this.leave(); // 退出安全模式
        return;
        }
        • 判断是否开启复制以及删除数据块的操作

          1
          2
          3
          if (!isPopulatingReplQueues() && shouldPopulateReplQueues()) {
          initializeReplQueues();
          }
        • 退出安全模式

          这儿我们把 reached = -1 就代表退出安全模式啦~

          1
          2
          3
          reached = -1;//退出安全模式
          reachedTimestamp = -1;
          safeMode = null;
    • 启动 SafeModeMonitor 线程

      启动 SafeModeMonitor 线程,每隔1秒去查看下,是否可以退出安全模式

      1
      2
      3
      4
      5
      if (smmthread == null) {
      smmthread = new Daemon(new SafeModeMonitor());
      smmthread.start();
      reportStatus("STATE* Safe mode extension entered.", true);
      }

      主要步骤:

      • 判断是否满足退出安全模式,如果满足则推出安全模式

        1
        2
        3
        4
        5
        6
        if (safeMode.canLeave()) {
        // Leave safe mode.
        safeMode.leave();
        smmthread = null;
        break;
        }

        通过 3 个条件来判断是否可以离开安全模式

        • reached是否为 -1

          1
          2
          3
          if (reached == 0) {
          return false;
          }
        • 在满足最小副本条件之后,namenode还需要处于安全模式的时间(30s)

          1
          2
          3
          4
          5
          //extension 默认30s,也就是满足最低副本系数之后,离开安全模式的时间,这个时间用于等待剩余数据节点的数据块上报
          if (monotonicNow() - reached < extension) {
          reportStatus("STATE* Safe mode ON, in safe mode extension.", false);
          return false;
          }
        • needEnter里面的3个条件

          1
          2
          3
          4
          5
          private boolean needEnter() {
          return (threshold != 0 && blockSafe < blockThreshold) ||
          (datanodeThreshold != 0 && getNumLiveDataNodes() < datanodeThreshold) ||
          (!nameNodeHasResourcesAvailable());
          }

2.1.4. 激活BlockManager

启动 BlockManager 里面关于block副本处理的后台线程

1
blockManager.activate(conf);
  1. 启动等待复制线程
  2. 启动管理心跳服务

2.2. 启动 RPC 服务

1
rpcServer.start();

2.2.1. 启动 clientRpcServer

1
clientRpcServer.start();

2.2.2. 启动 serviceRpcServer

1
2
3
if (serviceRpcServer != null) {
serviceRpcServer.start();
}

三、总结

所以,startCommonServices() 主要作用:

  1. 将需要检查的URL添加到volumes中 , 后台有线程会一直执行hasAvailableDiskSpace来检查
  2. checkAvailableResources(); 进行资源检查
  3. NameNode启动,进入到safemode阶段,处于一个等待汇报blocks的状态
  4. 汇报所有的block,用于后面判断是否进入安全模式
  5. 激活BlockManager