一、概述

数据节点最重要的功能就是管理物理存储上的数据块,数据节点通过存储 DataStorage 和文件系统数据集 FSDataset,将数据块的物理存储抽象为对象上的服务,流式接口就是构建在这个服务之上的、数据节点的另一个基本功能,HDFS 对读写操作提供了基于 TCP 流的数据访问接口 DataTransferProtocol。

二、架构设计

DataTransferProtocol 是用来描述写入或者读出 Datanode 上数据的流式接口。DataTransferProtocol 有两个子类: Sender 和 Receiver。

  1. Sender 类封装了 DataTransferProtocol 的调用操作,用于发起流式接口请求
  2. Receiver 类封装了Data TransferProtocol 的执行操作,用于响应流式接口请求。

2.1. DataTransferProtocol 调用流程

假设 DFSClient 发起了一个 DataTransferProtocol.readBlock() 操作,那么 DFSClient 会调用 Sender 类将这个请求序列化,并传输给远程的 Receiver。远程的 Receiver 类接收到这个请求后,会反序列化请求,然后调用执行代码执行读取操作。

DataTransferProtocol 调用流程

2.2. Sender 类

Sender 类用于发起 DataTransterProtocol 请求。Sender 类首先使用 ProtoBuf 将参数序列化,然后用一个枚举类 Op 描述
调用的是什么方法,最后将序列化后的参数和 Op 一起发送给接收方。

Op 是一个枚举类型,使用一个 byte 类型的变量 code 标识操作码。一个操作码对应 DataTransterProtocol 接口中的一个方法,例如操作码80 对应 DataTransferProtocol.writeBlock() 方法。

1
2
3
4
5
6
7
8
9
10
11
12
public enum Op {
WRITE_BLOCK((byte)80),
READ_BLOCK((byte)81),
READ_METADATA((byte)82),
REPLACE_BLOCK((byte)83),
COPY_BLOCK((byte)84),
BLOCK_CHECKSUM((byte)85),
TRANSFER_BLOCK((byte)86),
REQUEST_SHORT_CIRCUIT_FDS((byte)87),
RELEASE_SHORT_CIRCUIT_FDS((byte)88),
REQUEST_SHORT_CIRCUIT_SHM((byte)89);
}

2.3. Receiver 类

Receiver 类封装了 DataTransferProtocol 的执行操作,用于执行远程节点发起的流式接口请求。Receiver 是一个抽象类,它提供了解析 Sender 请求操作码的 readOp() 方法,以及处理 Sender 请求的 processOp() 方法。

这两个方法都是在 DataXceiver.run() 中循环调用

2.3.1. DataXceiverServer

Datanode 的流式接口就参考了 Socket 的实现,设计了 DataXceiverServer 以及 DataXceiver 两个对象,其中 DataXceiverServer 对象用于在 Datanode 上监听流式接口的请求,每当有 Client 通过 Sender 类发起流式接口请求时,DataXceiverServer 就会监听并接收这个请求,然后创建一个 DataXceiver 对象用于响应这个请求并执行对应的操作。

在 Java 的 Socket 实现中,首先需要创建一个 java.net.ServerSocket 对象,绑定到某个指定的端口,然后通过 ServerSocket.accept() 方法监听是否有连接请求到达这个端口。当有 Socket 连接请求时,ServerSocket.accept() 方法会返回一个 Socket 对象,之后服务器就可以通过这个 Socket 对象与客户端通信了。

2.3.2. DataXceiver

DataXceiverServer 主要用于监听并接收流式请求,然后创建并启动 DataXceiver 对象。DataXceiver 是 Receiver 的子类,DataTransferProtocol 真正的响应操作都是在 DataXceiver 类中实现。

DataXceiverServer 只处理客户端的连接请求,实际的请求处理和数据交换都交由 DataXceiver 处理。

Receiver.processOp() 方法用于处理流式接口的请求,它首先从数据流中读取序列化后的参数,对参数反序列化,然后根据操作码调用
DataTransferProtocol 中定义的方法,这些方法都是在 DataXceiver 中具体实现的。