HDFS 会定时将 editlog 文件与 fsimage 文件合并以产生新的 fsimage 文件。Namenode 可以直接从 fsimage 加载元数据,而不用读取与合并 editlog 中的记录了。Namenode 命名空间的重建效率会大大提高,同时 Namenode 的启动时间也会相应减少。但是合并 fsimage 与editlog 以产生新的fsimage 文件是一项非常消耗 I/O 和 CPU 资源的操作。在执行检查点操作期间,Namenode 还需要限制其他用户对 HDFS 的访问和操作。为了预防这种情况的发生,HDFS 将检查点操作从Active Namenode 移动到 StandbyNamenode

Editlog 文件会变得非常大,甚至将磁盘空间写满。在 Namenode启动过程中一个很重要的部分就是逐条读取editlog文件中的记录,之后与 Namenode 命名空间合并。巨大的 editlog 文件会导致 Namenode 的启动时间过长,为了解决这个问题,HDFS引入了检查点机制

一、概述

截屏2022-03-18 下午11.25.39

  1. 变量初始化

    • tailerThread

      实例化 EditLogTailerThread

    • conf

    • namesystem

    • editLog

      从 NameSystem 中获取 editLog

    • lastLoadTimeMs

    最新加载 edit log 时间 lastLoadTimestamp 初始化为当前时间

    • logRollPeriodMs

      StandBy NameNode 滚动编辑日志的时间间隔 logRollPeriodMs

    • activeAddr

      1
      2
      3
      4
      5
      6
      if (logRollPeriodMs >= 0) {
      // 调用 getActiveNodeAddress() 方法初始化 Active NameNode 地址activeAddr
      this.activeAddr = getActiveNodeAddress();
      } else {
      ...
      }
    • sleepTimeMs

      StandByNameNode 检查是否存在可以读取的新的最终日志段的时间间隔 sleepTimeMs,取参数 dfs.ha.tail-edits.period,参数未配置默认为 1 min

      截屏2022-08-16 22.34.09
  2. start()

    start() 方法中启动了EditLogTailer 线程, StandbyNameNode 使用 EditLogTailer 线程来负责向远程的 QuorumJournalNode 读取新的EditLog文件

    截屏2022-03-17 下午4.30.51

    • 滚动条件

      1
      2
      3
      if (tooLongSinceLastLoad() && lastRollTriggerTxId < lastLoadedTxnId) {
      ...
      }
      • 达到 StandByNameNode 滚动编辑日志的时间间隔(2min)

        1
        2
        3
        4
        private boolean tooLongSinceLastLoad() {
        return logRollPeriodMs >= 0 &&
        (monotonicNow() - lastLoadTimeMs) > logRollPeriodMs ;
        }

        13

      • 上一次编辑日志滚动开始时的最新事务ID < StandBy NameNode加载的最高事务ID

        1
        lastRollTriggerTxId < lastLoadedTxnId
    • roll 操作

      NameNode roll操作会告知 QJM 也进行roll操作,这样,Standby Namenode就可以拉取到这个segment

      1
      2
      3
      4
      5
      6
      7
      8
      private void triggerActiveLogRoll() {
      try {
      getActiveNodeProxy().rollEditLog();
      lastRollTriggerTxId = lastLoadedTxnId;
      } catch (IOException ioe) {
      LOG.warn("Unable to trigger a roll of the active NN", ioe);
      }
      }
      1. 获得 ActiveNameNode 的代理,并调用其 rollEditLog() 方法滚动编辑日志

        • 检查 NameNode 是否已经启动

          1
          checkNNStartup();
        • 检查权限

          1
          namesystem.checkSuperuserPrivilege();
        • 滚动 edits

          1
          namesystem.rollEditLog();
          1. 检查权限操作

          2. 检查状态

            1
            checkOperation(OperationCategory.JOURNAL);
          3. 检查当前 NameNode 是否是安全模式

          4. 回滚

            1
            getFSImage().rollEditLog();
            • 双缓冲写日志

              1
              getEditLog().rollEditLog();
              1. 把处于正在写入的 editlog 文件变成完整的

                1
                endCurrentLogSegment(true);
              2. 获取最新的事务ID+1

                开始给下一个edit文件命名

                1
                long nextTxId = getLastWrittenTxId() + 1;
              3. 创建 editLogStream

                这个流会在刷元数据的时候使用到

                1
                startLogSegment(nextTxId, true);

                方法调用了 journalSet.startLogSegment() 方法在所有 editlog 文件的存储路径上构造输出流,并将这些输出流保存在 FSEditLog 的字段journalSet.journals 中。

                journalSet 的 journals 字段是一个 JournalAndStream 对象的集合,这个集合中的每一个 JournalAndStream 对象都封裝了一个 JournalManager,以及这个 JournalManager 打开的 editlog 文件的输出流

                startLogSegment()方法最后会将 FSEditlog.curSegmentTxId 字段(FSEditlog 当前正在写入 txid) 设置为传入的segmentTxid, 同时将 editlog 的状态更改为 IN_SEGMENT 状态。

                1
                2
                curSegmentTxId = segmentTxId;
                state = State.IN_SEGMENT;
            • 将当前事务ID记录到 dfs/name/current/seen_txid 里面,这个 id 就是当前 edit 文件的名称:fsimage_0000000000000000003

              1
              storage.writeTransactionIdFileToStorage(getEditLog().getCurSegmentTxId());
      2. 将上次 StandByNameNode加载的最高事务ID赋值给上次编辑日志滚动开始时的最新事务ID

        1
        lastRollTriggerTxId = lastLoadedTxnId
  • 拉取远程的 segment 文件

    1
    doTailEdits()
    • 加锁

    • 加载 fsimage

      1
      FSImage image = namesystem.getFSImage();
    • 获取最新的事务ID

      1
      long lastTxnId = image.getLastAppliedTxId();
    • 得到 JournalNode 上 editlog 的输入流

      FSEditLog 会调用 QuorumJournalManager.selectInputStreams0 方法打开共享存储上的 editlog 文件,selectlnputStreams() 方法首先会发送 RPC 请求 QJournalProtocol.getEditLogManifest() 到集群中的所有 JournalNode 上,以获取集群中所有 JournalNode 上保存的 editlog 段落文件信息。之后对于 JournalNode上保存的每一个 editlog文件,selectInputStreams() 都会构造一个EditLogFilelnputStream 对象,然后将这些对象都封装在一个 RedundantEditLogInputStream 输入流对象中。

      1
      streams = editLog.selectInputStreams(lastTxnId + 1, 0, null, false);

      Standby Namenode 就可以使用 RedundantEditLogInputStream 从任意一个 JournalNode 上读取每个 editlog 段落了。如果其中一个JournalNode 失败了,RedundantEditLogInputStream 会自动切换到另一个保存了这个 editlog 段落的 JournalNode 上。

      • 去 JournalNode 上加载日志

        1
        2
        3
        4
        5
        editsLoaded = image.loadEdits(streams, namesystem);
        if (editsLoaded > 0) {
        //更新最后一次我们从共享目录成功加载一个非零编辑的时间
        lastLoadTimeMs = monotonicNow();
        }
      • 标识当前进度为加载edit log文件

        1
        2
        StartupProgress prog = NameNode.getStartupProgress();
        prog.beginPhase(Phase.LOADING_EDITS);
        • 加载edit log最新事务ID

          1
          long prevLastAppliedTxId = lastAppliedTxId;
        • FSEditLogLoader负责对当前这一轮的多个stream(editStreams)进行读取并加载到内存

          1
          FSEditLogLoader loader = new FSEditLogLoader(target, lastAppliedTxId);
          • fsNamesys
          • lastAppliedTxId
        • loadFSEdits

        loader 会不断从远程读取新的 op,同时从 op 中提取 txId 来更新自己的 txId,从而让 StandbyNamenode 的 txid 逐渐向前递增, 从loader中获取当前已经 load 过来的最新的 txId, 更新到 FSImage 对象,用 lastAppliedTxId 记录当前已经成功读取并加载的位置
        这样,当 EditLogTailer 的下一轮运行开始的时候,就从这个 lastAppliedTxId 开始请求 edit log 文件,因为这个位置之前的操作已经完成了同步和加载。对应—>FSEditLogLoader loader = new FSEditLogLoader(target, lastAppliedTxId);

        1
        2
        3
        4
        5
        try {
        loader.loadFSEdits(editIn, lastAppliedTxId + 1, startOpt, recovery);
        } finally {
        lastAppliedTxId = loader.getLastAppliedTxId();
        }
        • 标识进度 Phase.LOADING_EDITS
        StartupProgress prog = NameNode.getStartupProgress();
         Step step = createStartupProgressStep(edits);
        prog.beginStep(Phase.LOADING_EDITS, step);
         
        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21

        - loadEditRecords

        - op = in.readOp()

        从编辑日志输入流 in 中读取操作符 op,如果操作符 op 为空,说明没有元数据,直接跳出循环,并返回

        - applyEditLogOp()

        在内存中将 edit 文件和 image 进行操作

        - OP_MKDIR

        创建目录日志,然后在内存中维护自己的元数据

        - editIn.getLastTxId()

        ```java
        if (editIn.getLastTxId() != HdfsConstants.INVALID_TXID) {
        lastAppliedTxId = editIn.getLastTxId();
        }
      • 记录同步过来的最后一个transactionId

        1
        lastLoadedTxnId = image.getLastAppliedTxId();
      • 去除写锁

        1
        namesystem.writeUnlock();