Flink-源码学习-文件系统
Flink 在集群启动的一个操作就是初始化文件系统。Flink 中的文件系统主要有两个用途,
- Flink实现容错,存储程序状态,恢复数据,主要通过 FsDataOutputStream 实例来实现。
- 保存链接状态,避免每次创建链接的资源消耗。
一、概述
Apache Flink 使用文件系统来消费和持久化地存储数据,以处理应用结果以及容错与恢复。最常用的文件系统:本地存储,hadoop-hdfs, Amazon S3,阿里云 OSS 和 Azure Blob Storage。
文件使用的文件系统通过其 URI Scheme 指定。例如:
- file:///home/user/text.txt 表示一个在本地文件系统中的文件
- hdfs://namenode: 50010/data/user/text.txt 表示一个在指定 HDFS 集群中的文件。
文件系统在每个进程实例化一次,然后进行缓存池化,从而避免每次创建流时的配置开销,并强制执行特定的约束,如连接流的限制。
1.1. 本地文件系统
Flink 原生支持本地机器上的文件系统,包括任何挂载到本地文件系统的 NFS 或 SAN 驱动器,默认即可使用,无需额外配置。本地文件可通过 file://URI Scheme 引用。
1.2. 外部文件系统
Apache Flink 支持下列文件系统:
Amazon S3 对象存储
由 flink-s3-fs-presto 和 flink-s3-fs-hadoop 两种替代实现提供支持。这两种实现都是独立的,没有依赖项。
阿里云对象存储
由 flink-oss-fs-hadoop 支持,并通过 oss://URI scheme 使用
Google Cloud Storage
由 gcs-connector 支持,并通过 gs://URI
上述文件系统可以并且需要作为[插件]使用。使用外部文件系统时,在启动 Flink 之前需将对应的 jar 文件从 opt 目录复制到 Flink 发行版 plugin 目录下的某一文件夹。
二、实现
Flink 文件系统由类 org.apache.flink.core.fs.FileSystem
表示,该类定义了访问与修改文件系统中文件与对象的方法,是 Flink 文件系统的抽象基类,子类实现的可以是本地文件系统或者分布式文件系统
2.1. LocalFileSystem
LocalFilesystem 继承了 FileSystem,它使用的是本地文件系统来实现
2.2. HadoopFileSystem
HadoopFileSystem 继承了 FileSystem,它使用的是 HDFS 文件系统来实现
三、初始化文件系统
四、使用
4.1. OSS
五、自定义文件系统插件
要添加一个新的文件系统:
添加文件系统实现,它应是
org.apache.flink.core.fs.Filesystem
的子类。添加 Factory 类,以实例化该文件系统并声明文件系统所注册的 scheme,它应是
org.apache.flink.core.fs.FileSystemFactory
的子类。添加 Service Entry。创建文件
META-INF/services/org.apache.flink.core.fs.FileSystemFactory
,文件中包合文件系统 Factory 类的类名。
在插件检索时,文件系统 Factory 类会由一个专用的 Java 类加载器加载,从而避免与其他类或 Flink 组件冲突。
文件系统实现应避免使用 Thread.currentThread().getContextClassLoader 类加载器。