一、概述

Flink SQL 可以将多种数据源或数据落地端映射为 table

二、实现

2.1. 架构设计

Flink 使用 SPI 机制加载Factory(DynamicTableSourceFactory DynamicTableSinkFactory同属 Factory)。在 flink-table-api-java-bridge 项目的 resources/META-INF/services 目录可以找到org.apache.flink.table.factories.Factory 文件,内容为:

1
2
3
org.apache.flink.table.factories.DataGenTableSourceFactory
org.apache.flink.table.factories.BlackHoleTableSinkFactory
org.apache.flink.table.factories.PrintTableSinkFactory

DataGenTableSourceFactory 为例:

2.1.1. DynamicTableSourceFactory

2.1.2. DynamicTableSource

DynamicTableSource 负责从外部系统创建出一个动态表。该接口包含有如下两个子接口: ScanTableSource 和 LookupTableSource

2.2. 实现

DataGenTableSourceFactory 为例:

创建 datagen 类型的 table:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
CREATE TABLE datagen (
f_sequence INT,
f_random INT,
f_random_str STRING,
ts AS localtimestamp,
WATERMARK FOR ts AS ts
) WITH (
'connector' = 'datagen',

-- optional options --

'rows-per-second'='5',

'fields.f_sequence.kind'='sequence',
'fields.f_sequence.start'='1',
'fields.f_sequence.end'='1000',

'fields.f_random.min'='1',
'fields.f_random.max'='1000',

'fields.f_random_str.length'='10'
)