Hadoop-组件-HDFS-源码学习-数据读写-写文件-上传-写数据
一、概述
二、实现
2.1. 写数据
当用户代码通过 DistributedFileSystem.create() 方法创建了一个新文件,并获取了 DFSOutputStream 输出流对象之后,就可以在输出流对象上调用 write() 方法写数据了。
DFSOutputStream.write() 方法继承自 FSOutputSummer.write() 方法,用于向数据流管道中写入指定大小的数据以及校验和,是客户端写数据操作的入口。write()方法会循环调用 write1() 方法每次发送一个校验块数据,直到所有数据发送完毕。
write1() 方法每次写入一个校验块数据,如果数据长度不足一个校验块,则写入 buffer 缓冲区。
这个方法首先将数据写入 buffer 缓冲区,当 buffer 中的数据达到一个校验块(chunk) 的大小时,则调用 flushBuffer() 方法将缓存中的校验快写入底层 I/O 流。如果 buffer 为空,并且写入的数据大于一个校验块的大小时,则调用 writeChecksumChunk() 直接将校验块大小的数据写入 I/O 流,不经过 buffer 缓存。
无论是 writeChecksumChunk() 还是 flushBuffer() 方法,最终都调用了 writeChunk() 方法将一个校验块以及校验块对应的校验和写入 I/O 流。writeChunk() 在 FSOutputSummer 中是一个抽象方法,最终由 DFSOutputStream 实现。
2.1.1. writeChunk
writeChunk() 方法首先构造一个 Packet 对象保存数据包,然后将校验块数据以及校验和写入 Packet 对象中。当 Packet 对象写满时(每个数据包都可以写入 maxChunks 个校验块),则调用 waitAndQueueCurrentPacket() 方法将当前 Packet 对象放入输出队列 dataQueue 中等待 DataStreamer 线程的处理。如果当前数据块中的所有数据都已经发送完毕,则发送一个空数据包标识所有数据已经发送完毕。
当 Client 写入的字节流数据达到一个数据包的长度时,DFSOutputStream 会构造一个 DFSPacket 对象保存这个要发送的数据包。如果当前数据块中的所有数据包都发送完毕了,DFSOutputStream 会发送一个空的数据包标识数据块发送完毕。新构造的 DFSPacket 对象会被放到 DFSOutputStream.dataQueue 队列中,由 DFSOutputStream 的内部线程类 DataStreamer 处理。
检查 DFSClient 状态
检查 DFSOutputStream 状态
创建数据包
1
2
3
4
5
6if (currentPacket == null) {
currentPacket = createPacket(packetSize, chunksPerPacket, bytesCurBlock, currentSeqno++, false);
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=...");
}
}将当前校验数据、校验块写入数据包中
1
2
3
4
5
6
7
8// 向 packet 写入校验和 4 byte
currentPacket.writeChecksum(checksum, ckoff, cklen);
// 向 packet 写入数据 chunk 512 byte
currentPacket.writeData(b, offset, len);
// 累计一共有多少个 chunk (如果 packet 写满了 127 chunk , 那就是一个完整的 packet )
currentPacket.incNumChunks();
// 迭代累加 bytesCurBlock (如果当前的 bytesCurBlock == 128M,说明当前写完了一个 block 文件块)
bytesCurBlock += len;如果当前数据包已经满了,或者写满了一个数据块,则将当前的数据包放入发送队列中
1
if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() || bytesCurBlock == blockSize) {...}
写满一个数据包时,把数据包加入队列
当写满一个数据包时,DFSOutputStream 会调用 waitAndQueueCurrentPacket() 方法将数据包放入发送队列 dataQueue 中,等待 streamer 线程将数据包发送到数据流管道中。streamer 线程将 dataQueue 中的数据包发送出去之后,会将该数据包放入 ackQueue 队列中,只有当 ResponseProcessor 线程收到下游节点传来的 ack 消息之后,才会将数据包从 ackQueue 中移除。
1
waitAndQueueCurrentPacket();
如果之前的 chunk 没有写满,则当前 packet 只发送这个 append trunk。发送完后,将 checksum 和 appendChunk 重置
恢复 packet 大小,注意这里避免了越过数据块的边界
如果写满了一个教据块的长度,则发送一个空 packet 作为标识,表明发送了一个完整的教据块
2.2. streamer 线程处理数据
streamer 线程将 dataQueue 中的数据包发送到数据流管道中,将该数据包放入 ackQueue 队列中,只有当 ResponseProcessor 线程收到下游节点传来的 ack 消息之后,才会将数据包从 ackQueue 中移除。