HDFS 添加了Namenode 高可用性支持,但是所有的 HA 实现方案都依赖于一个保存 editlog 的共享存储。这个共享存储必须是高可用的,并且能够被集群中的所有 Namenode 同时访问。

一、概述

在 Quorum Journal 模式之前,HDFS 中使用最多的共享存储方案是 NAS+NFS。但是这种方案有个缺点,就是为了预防脑裂的情况,它要求有一个互斥脚本在 Namenode 发生故障切换时关闭上一个活动节点,或者阻止上一个活动节点访问共享存储。为了解决这个问题,cloudera 提供了 Quorum Journal 设计方案,这是一个基于 Paxos 算法实现的 HA 方案

二、实现

2.1. 相关类

  1. JournalNode

    运行在 N 台独立的物理机器上,它将 editlog 文件保存在 JournalNode 的本地磁盘上,同时 JouralNode 还对外提供 RPC 接口 QJournalProtocol 以执行远程读写 editlog 文件的功能。

  2. QuorumJournalManager

    运行在 Namenode 上(目前 HA 集群中只有两个 Namenode, Active & Standby),通过调用 RPC 接口QJouralProtocol 中的方法向JournalNode 发送写入、互斥、同步 editlog。

    当 Namenode 启动后,并且转换为 Active 状态时,会调用 FSEditLog.initJournalsForWrite() 方法初始化存放 editlog 文件的存储,基于 QJM 的共享存储方式,FSEditLog 会创建一个 QuorumJournalManager 对象管理基于Quorum Journal 的共享存储,然后将这个对象加入 FSEditlog.journalSet 集合统一管理。

  3. AsyncLogger

    AsyncLogger 定义了 QuorumJournalManager 与集群中一个 JournalNode 之间的所有异步通信接口。AsyncLogger包装了 QJournalProtocol 接口,并将 QJournalProtocol 接口适配成带有异步调用机制的接口。

  4. AsyncLoggerSet

    AsyncLoggerSet 类其实就是一组 AsyncLogger 对象的集合,AsyncLoggerSet 对集合中所有 AsyncLogger 对象上的调用进行了封装,并使用一个 QuorumCall 对象保存所有 JournalNode 返回的结果,实现了 QuorumJournalManager 与集群中所有 JournalNode 的异步通信

  5. QuorumCall

    QuorumCall 这个类包装了 AsyncLoggerSet 的整个异步调用过程。

2.2. 初始化 QuorumOutputStream 输出流

开始源码啦~~~

Hadoop-组件-HDFS-源码学习-元数据管理-持久化 EditLog-EditLogOutputStream 小节中介绍了 Namenode 会调用 startLogSegment() 方法初始化 editlog 文件的输出流,然后使用输出流对象向 editlog 文件写入数据。

对于 QuorumJournal 的共享存储,startLogSegment() 调用 QuorumJournalManager.startLogSegment() 在共享存储上打开一个 editlog 段落文件,创建 QuorumOutputStream 输出流对象并返回。

1
2
3
4
5
public EditLogOutputStream startLogSegment(long txId, int layoutVersion) throws IOException {
QuorumCall<AsyncLogger, Void> q = loggers.startLogSegment(txId, layoutVersion);
loggers.waitForWriteQuorum(q, startSegmentTimeoutMs, "startLogSegment(" + txId + ")");
return new QuorumOutputStream(loggers, txId,outputBufferCapacity, writeTxnsTimeoutMs);
}

代码简化时序图~~~

2.2.1. AsyncLoggerSet.startLogSegment()

AsyncLoggerSet 类其实就是一组 AsyncLogger 对象的集合,AsyncLoggerSet 对集合中所有 AsyncLogger 对象上的调用进行了封装,并使用一个 QuorumCall 对象保存所有 JournalNode 返回的结果。

1
QuorumCall<AsyncLogger, Void> q = loggers.startLogSegment(txId, layoutVersion);
  1. 遍历 AsyncLoggerSet

    AsyncLoggerSet.startLogSegment() 方法会遍历所有的 AsyncLogger 对象并调用 startLogSegment() 方法,AsyncLogger.startLogSegment() 底层调用了 IPCLoggerChannel.startLogSegment()

    1
    2
    3
    4
    5
    6
    7
    public QuorumCall<AsyncLogger, Void> startLogSegment(long txid, int layoutVersion) {
    Map<AsyncLogger, ListenableFuture<Void>> calls = Maps.newHashMap();
    for (AsyncLogger logger : loggers) {
    calls.put(logger, logger.startLogSegment(txid, layoutVersion));
    }
    return QuorumCall.create(calls);
    }

    根据时序图点点点~~~来到 IPCLoggerChannel.startLogSegment() 方法

    IPCLoggerChannel.startLogSegment() 在 IPCLoggerChannel 的线程池对象 singleThreadExecutor 上提交了一个 Callable 任务,这个 Callable 任务在 JournalNode 对应的 QJournalProtocol 代理对象上调用了 startLogSegment()请求,并将线程执行结果封装在一个 ListenableFuture 对象中。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    public ListenableFuture<Void> startLogSegment(final long txid, final int layoutVersion) {
    return singleThreadExecutor.submit(new Callable<Void>() {
    @Override
    public Void call() throws IOException {
    getProxy().startLogSegment(createReqInfo(), txid, layoutVersion);
    synchronized (IPCLoggerChannel.this) {
    if (outOfSync) {
    outOfSync = false;
    }
    }
    return null;
    }
    });
    }

    AsyncLogger 类定义了 outOfSync 字段,outofSync 用于标识当前 JournalNode 上写 editlog 的操作是否出现了错误,
    例如在 editlog 段落(segment)中丢失了部分数据等。一旦出现这种情况 (outOfSync——true),AsyncLogger 对象就不可以继续在当前 editlog 段落中写入新的数据了,直到 Namenode 调用startLogSegment() 方法重新开启了一个新的 editlog 段落,startLogSegment() 方法会重置 outofSync 字段。

    IPCLoggerChannel 更形象的说是一个连向一个远程 JournalNode 的 Hadoop IPC 的代理类

  2. QuorumCall.create(calls)

    AsyncLoggerSet 对集合中所有 AsyncLogger 对象上的调用进行了封裝,使用一个QuorumCall 对象保存所有 JournalNode 返回的结果,QuorumCall 包装了 AsyncLoggerSet 的整个异步调用过程。每次 AsyncLoggerset 对象向 2N+1个 JournalNode 发送写日志请求都是异步的,AsyncLoggerSet 在请求发出之后并不同步等待每个 JournalNode 的返回值,而是在每个 AsyncLogger 返回的 ListenableFuture 对象上注册回调函数。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    static <KEY, RESULT> QuorumCall<KEY, RESULT> create(
    Map<KEY, ? extends ListenableFuture<RESULT>> calls) {
    final QuorumCall<KEY, RESULT> qr = new QuorumCall<KEY, RESULT>();
    for (final Entry<KEY, ? extends ListenableFuture<RESULT>> e : calls.entrySet()) {
    Futures.addCallback(e.getValue(), new FutureCallback<RESULT>() {
    @Override
    public void onFailure(Throwable t) {
    qr.addException(e.getKey(), t);
    }
    @Override
    public void onSuccess(RESULT res) {
    qr.addResult(e.getKey(), res);
    }
    });
    }
    return qr;
    }

    每当调用返回时,都会执行回调函数把 QuorumCall 的响应计数加 1

    • 如果返回是 suecess,则把 success 计数加 1

      1
      2
      3
      4
      private synchronized void addException(KEY k, Throwable t) {
      exceptions.put(k, t);
      notifyAll();
      }
    • 如果返回是 failure,则把 failure 计数加 1

      1
      2
      3
      4
      private synchronized void addResult(KEY k, RESULT res) {
      successes.put(k, res);
      notifyAll();
      }

2.2.2. AsyncLoggerSet.waitForWriteQuorum()

之后 startLogSegment() 方法会调用 AsyncLoggerSet.waitForWriteQuorum()方法等待所有异步请求的返回值,waitForWriteQuorum()方法调用QuorumCall.waitFor()方法统计 JournalNode 的返回情况,如果大多数 JournalNode 返回成功,则 waitForWriteQuorum() 方法返回成功:如果执行成功的 JournalNode 数量不足,则抛出异常。

  1. 获取 majority

    1
    int majority = getMajoritySize();

​ majority = loggers.size() / 2 + 1,比如现在 5 台journalNode , 那么majority = 3

  1. waitFor

    有了回调函数,QuorumJournalManager 端只需要发出请求,然后循环检测 QuorumCall 对象是否有足够的 success 响应、足够的 exception 响应或者 timeout 即可。

    1
    2
    3
    4
    5
    q.waitFor(
    loggers.size(), // journalNode的数量 5
    majority, // 3
    majority, // 3,
    timeoutMs, operationName);//timeoutMs = 20s
    • 有足够的响应,则返回

      1
      if (minResponses > 0 && countResponses() >= minResponses) return;
    • 有是够的成功响应,则返回

      1
      if (minSuccesses > 0 && countSuccesses() >= minSuccesses) return;
    • 有足够的异常响应,则返回

      1
      if (maxExceptions >= 0 && countExceptions() > maxExceptions) return;
  2. 当返回写操作成功达不到有一半以上的(≥N+1) JournalNode 时抛出异常

    1
    2
    3
    if (q.countSuccesses() < majority) {
    q.rethrowException("Got too many exceptions to achieve quorum size " + getMajorityString());
    }

最后 startLogSegment()方法会构造 QuorumOutputStream 对象并返回。

2.2. QuorumOutputStream#write()[FSEditLogOp—>bufCurrent]

获取了 QuorumOutputStream 输出流对象后,Namenode 会调用 QuorumOutputStream.write() 方法将 editlog 数据写入缓冲区。

QuorumOutputStream 的实现类似于 EditLogFileOutputStream,底层也使用了 EditsDoubleBuffer 双缓冲区。数据会先写入其中一个缓冲区中,

2.3. QurumOutpuStream#flushAndsync()[bufCurrent—>磁盘]

调用 QurumOutpuStream.flushAndsync() 时将缓沖区中的数据发送给 JournalNode。

1
2
3
4
if (logStream != null) {
// 将缓冲区数据刷到磁盘
logStream.flush(); //tmp/hadoop-angel/dfs/name/current
}

2.3.1. 获取元数据大小

已经处于 ready 状态的 buf 的大小

1
2
3
public int countReadyBytes() {
return bufReady.size();
}

2.3.2. 创建元数据副本

做一份元数据的副本,主要是高并发的异步操作,防止在写数据的时候造成这块儿数据改变,所以先写到一个buf副本

1
2
DataOutputBuffer bufToSend = new DataOutputBuffer(numReadyBytes);
buf.flushTo(bufToSend);

2.3.3. sendEdits()

flushAndSync() 调用了 AsyncLoggerSet.sendEdits() 方法向集群中的所有 JournalNode 写入数据。

1
QuorumCall<AsyncLogger, Void> qcall = loggers.sendEdits( segmentTxId, firstTxToFlush, numReadyTxns, data);
  1. AsyncLogger#sendEdits()

    AsyncLoggerSet.sendEdits() 方法会遍历所有的 AsyncLogger 对象并调用 sendEdits() 方法。

    每一个 AsyncLogger 代表一个 journalNode,往 journalNode 里面写元数据

    1
    2
    3
    4
    for (AsyncLogger logger : loggers) {
    ListenableFuture<Void> future = logger.sendEdits(segmentTxId, firstTxnId, numTxns, data);
    calls.put(logger, future);
    }
    • 校验元数据

      对待写的元数据长度做校验。待 flush 的元数据等待队列长度进行校验,查看是否超过队列的最大值

      1
      reserveQueueSpace(data.length);
    • 异步发送

      AsyncLogger.sendEdits() 方法调用 QJournalProtocol.journal() 方法向指定 JournalNode 写入editlog 文件,如果 JournalNode 在写入 editlog 时出现错误,则 sendEdits() 方法会将 AsyncLogger.outOfSync 字段设置为 true,之后 AsyncLogger 都不可以再向这个 editlog 段落文件写入数据了,直到调用 AsyncLogger.startLogSegment() 方法开启了新 editlog 段落时才可以恢复写操作。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      try {
      // 通过动态代理,调用 JournalNodeRpcServer(在服务器 jps 的时候就能看到这个)
      getProxy().journal(createReqInfo(), segmentTxId, firstTxnId, numTxns, data);
      } catch (IOException e) {
      synchronized (IPCLoggerChannel.this) {
      outOfSync = true;
      }
      throw e;
      }

      当 JournalNode 收到 QJournalProtocol.journal() 请求后,JournalNode 会执行如下操作:

      1. checkFormatted()

        检查是否格式化了 journalNode

      2. 验证 epoch number 是否正确

        1
        2
        3
        4
        5
        6
        7
        8
        private synchronized void checkWriteRequest(RequestInfo reqInfo) throws IOException {
        checkRequest(reqInfo);
        if (reqInfo.getEpoch() != lastWriterEpoch.get()) {
        throw new IOException("IPC's epoch " + reqInfo.getEpoch() +
        " is not the current writer epoch " +
        lastWriterEpoch.get());
        }
        }
      3. 确认写入数据对应的 txid 是否连续

        1
        2
        3
        4
        5
        6
        private void checkSync(boolean expression, String msg,
        Object... formatArgs) throws JournalOutOfSyncException {
        if (!expression) {
        throw new JournalOutOfSyncException(String.format(msg, formatArgs));
        }
        }
      4. 向内存写数据

        1
        curSegment.writeRaw(records, 0, records.length);
      5. 交换内存

        1
        curSegment.setReadyToFlush();
      6. 将数据持久化到 JournalNode 的本地磁盘

        1
        curSegment.flush(shouldFsync);
        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        public void flushAndSync(boolean durable) throws IOException {
        // ...
        if (doubleBuf.isFlushed()) {
        return;
        }
        preallocate(); // preallocate file if necessary
        doubleBuf.flushTo(fp);
        if (durable && !shouldSkipFsyncForTests && !shouldSyncWritesAndSkipFsync) {
        fc.force(false); // metadata updates not needed
        }
        }
    • 回调

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      Futures.addCallback(ret, new FutureCallback<Void>() {
      @Override
      public void onFailure(Throwable t) {
      unreserveQueueSpace(data.length);
      }
      @Override
      public void onSuccess(Void t) {
      //如果成功,则提交的 edit 从队列中移除
      unreserveQueueSpace(data.length);
      }
      });
  2. QuorumCall.create(calls)

    AsyncLoggerSet 对集合中所有 AsyncLogger 对象上的调用进行了封裝,使用一个QuorumCall 对象保存所有 JournalNode 返回的结果

    1
    return QuorumCall.create(calls);

2.3.4. waitForWriteQuorum()

1
loggers.waitForWriteQuorum(qcall, writeTimeoutMs, "sendEdits");

调用 AsyncLoggerSet.waitForWriteQuorum() 方法等待集群中的大多数 JournalNode 响应写请求,同样是调用 QuorumCall.waitFor()方法,区别只是 operationName 不一样(一个是 startLogSegment(txId) ,一个是 sendEdits)

2.3.5. 记录最近提交的 txid

完成了写操作后,flushAndSync() 方法会调用 AsyncLoggerSet.setCommittedTxId() 记录最近提交的 txid。