Hadoop-组件-HDFS-源码学习-元数据管理-checkpoint-checkpointer 线程
Namenode 会定期将内存中的命名空间(文件目录树、文件目录元信息)保存到 Fsimage 文件中,以防止 Namenode 掉电或者进程崩溃。如果 Namenode 实时地将内存中的元数据同步到 fsimage 文件中,会非常消耗资源且造成 Namenode 运行缓慢。所以 Namenode 会先将命名空间的修改操作保存在 editlog 文件中,然后定期合并 Fsimage 和 editlog 文件。
Namenode 启动时,首先会将 fsimage 文件中记录的命名空间加载到 Namenode 内存中,然后再一条一条地将 editlog 文件中记录的更新操作加载并合并到命名空间中。接下来 Namenode 会等待各个 Datanode 向自己汇报数据块信息来组装 blockMap,从而离开安全模式。Namenode 每次启动时都会调用 FSImage.loadFsImage() 方法执行加载 fsimage 和 editlog 文件的操作。
一个正常大小的 editlog 文件往往在几十到几百个字节之间,但在某些极端的情况下,editlog 文件会变得非常大,甚至将磁盘空间写满,在 Namenode
启动过程中一个很重要的部分就是逐条读取 editlog 文件中的记录,之后与 Namenode 命名空间合并。巨大的 editlog 文件会导致 Namenode 的启动时间过长,为了解决这个问题,HDFS 引入了检查点机制 (checkpointing)。
是当NameNode 处于 standby 状态时用于从共享的 edit log 读取数据,它的构造是在 FSNamesystem 的startStandbyServices()方法中
StandbyNamenode 会持有一个 StandbyCheckpointer 类,这个类维护着一个叫作 Checkpointer Thread 的线程
1 | if (standbyShouldCheckpoint) { |
整个检查点逻辑是在 CheckpointerThread.doWork()方法中实现的。Checkpointer Thread 线程会每隔 1000*Math.min(checkpointCheckPeriod, checkpointPeriod)
秒检测是否执行一次检查点逻辑
1 | //定义checkpoint的时间间隔 1分钟 |
checkpointCheckPeriod 由
dfs.namenode.checkpoint.period
配置,checkpointPeriod 则由dfs.namenode.checkpoint.check.period
配置
2.3.1. 检查是否满足触发检查点操作
doWork()方法首先会判断是否满足检查点操作的两个条件,如果满足则调用doCheckpoint() 执行检查点操作
条件(1):如果超过了100万条日志没有做checkpoint , 则做一次checkpoint
获得最后一次往 JournalNode 写入的 txid 和最近一次做检查点的 txid 的差值
1
2
3
4
5
6
7final long uncheckpointed = countUncheckpointedTxns();
---
private long countUncheckpointedTxns() {
FSImage img = namesystem.getFSImage();
//在 FSImage 中的 saveFSImage 保存了上一次 chekpoint 的事务ID
return img.getLastAppliedOrWrittenTxId() - img.getStorage().getMostRecentCheckpointTxId();
}needCheckpoint = true
当最后一次往 JournalNode 写入的txid 和最近一次做检查,点的 txid 的差值大于或者等于 dfs.namenode.checkpoint.txns 配置的数量(默认为1000000)时做一次合并
1
2
3if (uncheckpointed >= checkpointConf.getTxnCount()) {
needCheckpoint = true;
}
条件(2):如果超过了1小时没有做checkpoint , 则做一次checkpoint
计算当前时间和上一次检查点操作时间的间隔,当时间间隔大于或者等于 dfs.namenode. checkpoint.period 配置的时间时做合并
1
2
3if (secsSinceLast >= checkpointConf.getPeriod()) {
needCheckpoint = true;
}
取消 checkpoint
如果调用了checkpoint的 stop 操作, 则此会做取消checkpoint的操作
1
2
3
4
5
6
7
8synchronized (cancelLock) {
if (now < preventCheckpointsUntil) {
canceledCount++;
continue;
}
assert canceler == null;
canceler = new Canceler();
}
2.3.2. doCheckpoint()
满足条件,需要做 checkpoint,整个检查点执行操作的逻辑都是在 doCheckpoint() 方法中实现的。
校验
doCheckpoint()方法首先获取当前保存的 fsimage 的 prevCheckpointTxld,然后获取最近更新的 editlog 的 thisCheckpointTxld,只有新的 thisCheckpointTxld 大于 prevCheckpointTxld,也就是当前命名空间有更新,但是并没有保存到新的 fsimage 文件中时,才进行一次检查点操作。
1
2
3
4
5
6
7
8// 获取当前 Standby Namenode 上保存的最新的 fsimage 对象
FSImage img = namesystem.getFSImage();
// 获取 fsimage 中保存的 txid,也就是上一次进行检查点操作时保存的 txid
long prevCheckpointTxId = img.getStorage().getMostRecentCheckpointTxId();
// 获取当前命名空间的最新的 txid,也就是收到的 edit1og 的最新的 txid
long thisCheckpointTxId = img.getLastAppliedOrWrittenTxId();
// 校验: 如果相等则没有必要执行检查点操作,当前 fsimage 已经是最新的了
assert thisCheckpointTxId >= prevCheckpointTxId;saveNamespace()
判断完成后,doCheckpoint() 会调用 saveNamespace() 方法将最新的命名空间保存到 fsimage 文件中。之后构造一个线程,将新产生的 fsimage 文件通过 HTTP 方式上传到 AvtiveNamenode 中。
重置(roll) editlog
saveNamespace()方法首先重置(roll)了 editlog 文件,将当前的 edit_ inprogress文件关闭并重命名,为与 fsimage 文件合并做准备。
1
2
3
4boolean editLogWasOpen = editLog.isSegmentOpen();
if (editLogWasOpen) {
editLog.endCurrentLogSegment(true);
}saveFSImagelnAlIDirs()
调用 saveFSImagelnAlIDirs() 将 fsimage 和 editlog 文件加载到命名空间中,并将更新的命名空间保存到新的 fsimage 文件中。最后开启新的 edit_ inprogress 文件,用于记录新的操作。
构造保存命名空间操作的上下文
1
SaveNamespaceContext ctx = new SaveNamespaceContext(source, txid, canceler);
在每一个保存路径上启动一个线程
Namenode 可以定义多个存储路径来保存 fsimage 文件,对于每一个存储路径,saveFSImagelnAlDirs() 方法都会启动一个线程负责在这个路径上保存 fsimage 文件,线程使用 FSImageSaver 类保存 fsimage 文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15List<Thread> saveThreads = new ArrayList<Thread>();
//对于每一个 storage 目录,都创建一个独立线程,同时进行 fsimage 文件的生成,提高生成效率
for (Iterator<StorageDirectory> it = storage.dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
StorageDirectory sd = it.next();
// fsimage.ckpt
FSImageSaver saver = new FSImageSaver(ctx, sd, nnf);
Thread saveThread = new Thread(saver, saver.toString());
saveThreads.add(saveThread);
saveThread.start();
}
//等待所有的 saveThread 完成存储操作
waitForThreads(saveThreads);
saveThreads.clear();
//移除掉有错误的目录(FSImage里面的run,cache住的异常)
storage.reportErrorsOnDirectories(ctx.getErrorSDs());命名空间具体的保存操作是由 FSImageSaver 这个类承担,FSImageSaver 是 FSImage 中的内部类,也是一个线程类,它的run() 方法调用了 saveFSImage() 方法来保存 fsimage 文件。
saveFslmage()方法会使用一个 FSImageFormat.Saver 对象来完成保存操作,FSImageFormat.Saver 类会以 fsimage 文件定义的格式保存 Namenode 的命名空间信息,需要注意命名空间信息会先写入 fsimage.ckpt 文件中。saveFSImage()方法还会生成 fsimage 文件的md5 校验文件,以确保fsimage 文件的正确性。
获取当前命名空间中记录的最新事务的 txid
1
long txid = context.getTxId();
生成以 fsimage.ckpt 开头的中间文件
生成 MD5 文件
1
File dstFile = NNStorage.getStorageFile(sd, dstType, txid);
按照压缩配置,将 fsimage 存入到以fsimage.ckpt开头的中间文件中
1
2
3FSImageFormatProtobuf.Saver saver = new FSImageFormatProtobuf.Saver(context);
FSImageCompression compression = FSImageCompression.createCompression(conf);
saver.save(newFile, compression);保存 md5 文件
1
MD5FileUtils.saveMD5File(dstFile, saver.getSavedDigest());
保存 txid
此处保存,是为了做 checkpoint 的时候,能知道上一次 checkpoint 的位置
1
storage.setMostRecentCheckpointInfo(txid, Time.now());
将 fsimage.ckpt 改名为 fsimage
为了防止保存过程中出现错误,命名空间信息首先会被保存在一个 fsimage.ckpt 文件中,当保存操作全部完成之后,才会将 fsimage.ckpt 重命名为 fsimage 文件。
1
renameCheckpoint(txid, NameNodeFile.IMAGE_NEW, nnf, false);
最后一个参数 false,代表不需要 rename md5 文件,这是因为在 FSImageSaver 中生成临时文件的时候已经生成了最终的md5文件
清理 Namenode 元数据存储文件夹中过期的 editlog 文件和 fsimage 文件。
1
purgeOldStorage(nnf);
采用异步方式,将当前的 fsimage 文件发送到远程的Active NameNode
因为 fsimage 文件一般较大,传输较为耗时,因此会创建一个单独线程执行,通过 ExecutorService 创建了一个独立线程负责文件传输过程。
1
2
3
4
5
6
7
8
9ExecutorService executor = Executors.newSingleThreadExecutor(uploadThreadFactory);
Future<Void> upload = executor.submit(new Callable<Void>() {
public Void call() throws IOException {
TransferFsImage.uploadImageFromStorage(activeNNAddress, conf,
namesystem.getFSImage().getStorage(), imageType, txid, canceler);
return null;
}
});