一、概述

二、实现

write

2.1. 写数据

当用户代码通过 DistributedFileSystem.create() 方法创建了一个新文件,并获取了 DFSOutputStream 输出流对象之后,就可以在输出流对象上调用 write() 方法写数据了。

DFSOutputStream.write() 方法继承自 FSOutputSummer.write() 方法,用于向数据流管道中写入指定大小的数据以及校验和,是客户端写数据操作的入口。write()方法会循环调用 write1() 方法每次发送一个校验块数据,直到所有数据发送完毕。

write1() 方法每次写入一个校验块数据,如果数据长度不足一个校验块,则写入 buffer 缓冲区。

这个方法首先将数据写入 buffer 缓冲区,当 buffer 中的数据达到一个校验块(chunk) 的大小时,则调用 flushBuffer() 方法将缓存中的校验快写入底层 I/O 流。如果 buffer 为空,并且写入的数据大于一个校验块的大小时,则调用 writeChecksumChunk() 直接将校验块大小的数据写入 I/O 流,不经过 buffer 缓存。

FSOutputSummer.write

无论是 writeChecksumChunk() 还是 flushBuffer() 方法,最终都调用了 writeChunk() 方法将一个校验块以及校验块对应的校验和写入 I/O 流。writeChunk() 在 FSOutputSummer 中是一个抽象方法,最终由 DFSOutputStream 实现。

2.1.1. writeChunk

writeChunk() 方法首先构造一个 Packet 对象保存数据包,然后将校验块数据以及校验和写入 Packet 对象中。当 Packet 对象写满时(每个数据包都可以写入 maxChunks 个校验块),则调用 waitAndQueueCurrentPacket() 方法将当前 Packet 对象放入输出队列 dataQueue 中等待 DataStreamer 线程的处理。如果当前数据块中的所有数据都已经发送完毕,则发送一个空数据包标识所有数据已经发送完毕。

当 Client 写入的字节流数据达到一个数据包的长度时,DFSOutputStream 会构造一个 DFSPacket 对象保存这个要发送的数据包。如果当前数据块中的所有数据包都发送完毕了,DFSOutputStream 会发送一个空的数据包标识数据块发送完毕。新构造的 DFSPacket 对象会被放到 DFSOutputStream.dataQueue 队列中,由 DFSOutputStream 的内部线程类 DataStreamer 处理。

  1. 检查 DFSClient 状态

  2. 检查 DFSOutputStream 状态

  3. 创建数据包

    1
    2
    3
    4
    5
    6
    if (currentPacket == null) {
    currentPacket = createPacket(packetSize, chunksPerPacket, bytesCurBlock, currentSeqno++, false);
    if (DFSClient.LOG.isDebugEnabled()) {
    DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=...");
    }
    }
  4. 将当前校验数据、校验块写入数据包中

    1
    2
    3
    4
    5
    6
    7
    8
    // 向 packet 写入校验和 4 byte
    currentPacket.writeChecksum(checksum, ckoff, cklen);
    // 向 packet 写入数据 chunk 512 byte
    currentPacket.writeData(b, offset, len);
    // 累计一共有多少个 chunk (如果 packet 写满了 127 chunk , 那就是一个完整的 packet )
    currentPacket.incNumChunks();
    // 迭代累加 bytesCurBlock (如果当前的 bytesCurBlock == 128M,说明当前写完了一个 block 文件块)
    bytesCurBlock += len;
  5. 如果当前数据包已经满了,或者写满了一个数据块,则将当前的数据包放入发送队列中

    1
    if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() || bytesCurBlock == blockSize) {...}
    • 写满一个数据包时,把数据包加入队列

      当写满一个数据包时,DFSOutputStream 会调用 waitAndQueueCurrentPacket() 方法将数据包放入发送队列 dataQueue 中,等待 streamer 线程将数据包发送到数据流管道中。streamer 线程将 dataQueue 中的数据包发送出去之后,会将该数据包放入 ackQueue 队列中,只有当 ResponseProcessor 线程收到下游节点传来的 ack 消息之后,才会将数据包从 ackQueue 中移除。

      1
      waitAndQueueCurrentPacket();
    • 如果之前的 chunk 没有写满,则当前 packet 只发送这个 append trunk。发送完后,将 checksum 和 appendChunk 重置

    • 恢复 packet 大小,注意这里避免了越过数据块的边界

    • 如果写满了一个教据块的长度,则发送一个空 packet 作为标识,表明发送了一个完整的教据块

2.2. streamer 线程处理数据

streamer 线程将 dataQueue 中的数据包发送到数据流管道中,将该数据包放入 ackQueue 队列中,只有当 ResponseProcessor 线程收到下游节点传来的 ack 消息之后,才会将数据包从 ackQueue 中移除。

2.3. 错误处理