Flume 是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。支持在日志系统中定制各类数据发送方,用于收集数据; 同时, Flume 提供对数据进行简单处理,并写到各种数据接收方(比如文本、HDFS、Hbase等)的能力

一、架构设计

Flume 运行的核心是 Agent。Flume 以 agent 为最小的独立运行单位。一个 agent 就是一个 JVM。它是一个完整的数据收集工具,含有三个核心组件,分别是 source、 channel、 sink。通过这些组件, Event 可以从一个地方流向另一个地方

截屏2022-03-06 下午9.27.42

1.1. Source

用于采集数据,Source 是产生数据流的地方,同时 Source 会将产生的数据流传输到 Channel,这个有点类似于 Java IO 部分的 Channel

Source Desc
Avro Source 通过监听一个网络端口来接受数据,而且接受的数据必须是使用avro序列化框架序列化后的数据;
Thrift Source 监听Thrift端口并从外部Thrift客户端流接收事件
Exec Source 启动一个用户所指定的linux shell命令;采集这个linux shell命令的标准输出,作为收集到的数据,转为event写入channel
JMS Source 从JMS目标(例如队列或主题)读取消息;作为JMS应用程序,它应可与任何JMS提供程序一起使用,但仅经过ActiveMQ的测试;注意,应该使用plugins.d目录(首选),命令行上的–classpath或通过flume-env.sh中的FLUME_CLASSPATH变量将提供的JMS jar包含在Flume类路径中
Spooling Directory Source 监视一个指定的文件夹,如果文件夹下有没采集过的新文件,则将这些新文件中的数据采集,并转成event写入channel;注意:spooling目录中的文件必须是不可变的,而且是不能重名的!否则,source会loudly fail!
Taildir Source 监视指定目录下的一批文件,只要某个文件中有新写入的行,则会被tail到;它会记录每一个文件所tail到的位置,记录到一个指定的positionfile保存目录中,格式为json(如果需要的时候,可以人为修改,就可以让source从任意指定的位置开始读取数据);它对采集完成的文件,不会做任何修改
Kafka Source 就是用kafka consumer连接kafka,读取数据,然后转换成event,写入channel
NetCat Source 启动一个socket服务,监听一个端口;将端口上收到的数据,转成event写入channel
Sequence Generator Source 一个简单的序列生成器,它使用从0开始,递增1并在totalEvents处停止的计数器连续生成事件;当无法发送event到channel时会进行重试。通常用于测试。
Syslog Sources 读取系统日志数据生成event
HTTP Source 通过http post/get来接收数据,通常get用于测试;该source基于Jetty 9.4,并提供了设置其他特定于Jetty的参数的功能,这些参数将直接传递给Jetty组件
Stress Source 主要用于压测,用于可以配置要发生的event总数以及要发送成功event的最大数
Custom Source 自定义source
  • Spool Source

    在实际使用的过程中,可以结合log4j使用,使用log4j的时候,将log4j的文件分割机制设为1分钟一次,将文件拷贝到 spool的监控目录。log4j 有一个 TimeRolling 的插件,可以把 log4j 分割的文件到 spool 目录。基本实现了实时的监控。 Flume在传完文件之后,将会修改文件的后缀,变为.COMPLETED(后缀也可以在配置文件中灵活指定)

  • Taildir Source

    监听指定目录下的一批文件,只要某个文件被写入,那么就会被tail到.

    这里原理其实就是source会记录每个文件所读取到的位置,然后记录到一个指定的positionfile目录文件中,通常为json格式,而且是可见的,因此可以人为修改。由于该种机制,可以实现从任意指定位置读取数据,所以这个source是可以保障可靠性的。但是会有数据重复的问题

1.2. Channel

Channel 被设计为 Event 中转临时缓冲区,存储 Source 收集并且没有被 Sink 读取的 Event,为平衡 Source 收集和 Sink 读取的速度,可视为 Flume 内部的消息队列。

Channel 线程安全并且具有事务性,支持 Source 写失败写,和 Sink 读失败重复读的操作。常见的类型包括 Memory Channel,File Channel,Kafka Channel。

Channel Desc
Memory Channel event存储在内存中,且可以配置最大值。对于需要高吞吐而且可以容忍数据丢失的情况下,可以选择该channel
JDBC Channel event被持久到数据库中,目前支持derby.适用于可恢复的场景
Kafka Channel agent利用kafka作为channel数据缓存;kafka channel要跟 kafka source、 kafka sink区别开来;kafka channel在应用时,可以没有source
File Channel event被缓存在本地磁盘文件中;可靠性高,不会丢失;但在极端情况下可能会重复数据
Spillable Memory Channel event存储在内存和磁盘上。内存充当主存储,磁盘充当溢出。

1.2.1. 种类

  • Memory Channel

    读写速度,但是存储数据量,Flume 进程挂掉、服务器停机或者重启都会导致数据丢失。资源充足、不关心数据丢失的场景下可以用。

  • File Channel

    将 event 写入磁盘文件,与 Memory Channel 相比存储容量大,无数据丢失风险。File Channel 数据存储路径可以配置多磁盘文件路径,通过磁盘并行写入提高 File Channel 性能。Flume 将 Event 顺序写入到 File Channel 文件的末尾。可以在配置文件中通过设置 maxFileSize 参数配置数据文件大小,当被写入的文件大小达到上限的时候,Flume 会重新创建新的文件存储写入 Event。当一个已经关闭的只读数据文件的 Event 被读取完成,并且 Sink 已经提交读取完成的事务,则 Flume 把存储该数据的文件删除

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    a1.sources = s1
    a1.channels = c1 c2
    a1.sinks = k1

    a1.sources.s1.type=avro
    a1.sources.s1.bind=0.0.0.0
    a1.sources.s1.port=5555
    a1.sources.s1.selector.type = replicating

    a1.channels.c1.type=file
    a1.channels.c1.checkpointDir=/root/checkpoint
    a1.channels.c1.dataDirs=/root/flume_data

    a1.channels.c2.type=memory

    a1.sinks.k1.type=logger

    a1.sources.s1.channels=c1 c2
    a1.sinks.k1.channel=c2
  • Kafka Channel

    Memory Channel 有很大的丢数据风险,而且容量一般,File Channel 虽然能缓存更多的消息,但如果缓存下来的消息还没写入 Sink,此时 Agent 出现故障则 File Channel 中的消息一样不能被继续使用,直到该 Agent 恢复。而 Kafka Channel 容量大,容错能力强。

    有了 Kafka Channel 可以在日志收集层只配置 Source 组件和 Kafka 组件,不需要再配置 Sink 组件,减少了日志收集层启动的进程数,有效降低服务器内存、磁盘等资源的使用率。而日志汇聚层,可以只配置 Kafka Channel 和 Sink,不需要再配置 Source。

    kafka.consumer.auto.offset.reset,当 Kafka 中没有 Consumer 消费的初始偏移量或者当前偏移量在 Kafka 中不存在(比如数据已经被删除)情况下 Consumer 选择从 Kafka 拉取消息的方式,earliest 表示从最早的偏移量开始拉取,latest 表示从最新的偏移量开始拉取,none 表示如果没有发现该 Consumer 组之前拉取的偏移量则抛出异常。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    a1.sources = s1
    a1.channels = c1 c2
    a1.sinks = k1

    a1.sources.s1.type=avro
    a1.sources.s1.bind=0.0.0.0
    a1.sources.s1.port=5555
    a1.sources.s1.selector.type = replicating

    a1.channels.c1.type=org.apache.flume.channel.kafka.KafkaChannel
    a1.channels.c1.kafka.bootstrap.servers=192.168.254.139:9092
    a1.channels.c1.kafka.topic=test
    #a1.channels.c1.kafka.consumer.group.id=flume-channel
    a1.channels.c1.parseAsFlumeEvent=false

    a1.channels.c2.type=memory

    a1.sinks.k1.type=logger

    a1.sources.s1.channels=c1 c2
    a1.sinks.k1.channel=c2

1.2.2. Channel 选择器

Source 发送的 Event 通过 Channel 选择器来选择以哪种方式写入到 Channel 中,Flume 提供三种类型 Channel 选择器,分别是复制、复用和自定义选择器。

  1. 复制选择器: 一个 Source 以复制的方式将一个 Event 同时写入到多个 Channel 中,不同的 Sink 可以从不同的 Channel 中获取相同的 Event,比如一份日志数据同时写 Kafka 和 HDFS,一个 Event 同时写入两个 Channel,然后不同类型的 Sink 发送到不同的外部存储。
  2. 复用选择器: 需要和拦截器配合使用,根据 Event 的头信息中不同键值数据来判断 Event 应该写入哪个 Channel 中。

1.3. sink

Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者发送到另一个Flume Agent。Sink 是完全事务性的。在从 Channel 批量删除数据之前,每个 Sink 用 Channel 启动一个事务。批量事件一旦成功写出到存储系统或下一个Flume Agent,Sink 就利用 Channel 提交事务。事务一旦被提交,该 Channel 从自己的内部缓冲区删除事件。如果写入失败,将缓冲区 takeList 中的数据归还给 Channel。

Sink是完全事务性的,在从 Channel 批量删除数据之前,每个 Sink 用 Channel 启动一个事务。批量 Event 一旦成功写出到存储系统或下一个Agent,Sink就利用Channel提交事务。事务一旦被提交,该Channel从自己的内部缓冲区删除Event

Sink Desc
HDFS Sink 数据被最终发往hdfs;可以生成text文件或 sequence 文件,而且支持压缩;支持生成文件的周期性roll机制:基于文件size,或者时间间隔,或者event数量;目标路径,可以使用动态通配符替换,比如用%D代表当前日期;当然,它也能从event的header中,取到一些标记来作为通配符替换
Hive Sink 可将text或json数据直接存储到hive分区表
Logger Sink 数据输出到日志中,通常用于debug
Avro Sink avro sink用来向avro source发送avro序列化数据,这样就可以实现agent之间的级联
Thrift Sink 同avro sink
IRC Sink 同avro sink
File Roll Sink 数据存储到本地文件系统
Null Sink 直接丢弃
HBaseSink 数据存储到hbase中
HBase2Sink 等同于hbase 2版本的HBaseSink
AsyncHBaseSink 异步模式写入hbase
MorphlineSolrSink 该接收器从Flume事件中提取数据,对其进行转换,并将其几乎实时地加载到Apache Solr服务器中,后者再为最终用户或搜索应用程序提供查询
ElasticSearchSink 直接存储到es中
Kite Dataset Sink 将事件写入Kite数据集。该接收器将反序列化每个传入事件的主体,并将结果记录存储在Kite数据集中。它通过按URI加载数据集来确定目标数据集
Kafka Sink 存储到kafka中
HTTP Sink 将接收到的数据通过post请求发生到远程服务,event内容作为请求体发送
Custom Sink 自定义sink

Sink groups

一个 Agent 中,多个sink可以被组装到一个组中,而数据在组内多个sink之间发送。接收处理器可以在组内提供负载均衡的功能,或者是在临时故障的情况下实现从一个接收器转移到另一个接收器上。具体的接收器模式见下表格

Processor Desc
default 默认的接收处理器仅接受一个sink,当然用户也没有必要为了一个sink去创建processor
Failover 故障转移模式,即一个组内只有优先级高的sink在工作,而其他的sink处于等待中
load_balance 负载均衡模式,允许channel中的数据在一组sink中的多个sink之间进行轮转,具体的策略有:round-robin(轮流发送);random(随记发送)
Custom processor 自定义处理器

二、Flume 数据流

数据流的基本单元,由一个装载数据的字节数组(byte payload)和一系列可选的字符串属性来组成(可选头部).

Flume 的核心是把数据从数据源收集过来,再送到目的地。为了保证输送一定成功,在送到目的地之前,会先缓存数据,待数据真正到达目的地后,删除自己缓存的数据

Flume 传输的数据的基本单位是 Event,如果是文本文件,通常是一行记录,这也是事务的基本单位。 Event 从 Source,流向 Channel,再到 Sink,本身为一个 byte 数组,并可携带 headers 信息。 Event 代表着一个数据流的最小完整单元,从外部数据源来,向外部的目的地去

多个agent可以协同工作,并且支持Fan-in、Fan-out、Contextual Routing、Backup Routes,这也正是Flume强大之处

截屏2021-10-20 上午10.14.45

三、特点

3.1. 数据不丢失

根据 Flume 的架构原理,Flume 是不可能丢失数据的,其内部有完善的事务机制,Source 到 Channel 是事务性的,Channel 到 Sink 是事务性的,因此这两个环节不会出现数据的丢失,唯一可能丢失数据的情况是Channel采用memoryChannel,agent 宕机导致数据丢失,或者 Channel 存储数据已满,导致 Source 不再写入,未写入的数据丢失。

Flume 不会丢失数据,但是有可能造成数据的重复,例如数据已经成功由Sink发出,但是没有接收到响应,Sink会再次发送数据,此时可能会导致数据的重复。

3.2. 可靠性

当节点出现故障时,日志能够被传送到其他节点上而不会丢失。Flume提供了三种级别的可靠性保障,从强到弱依次分别为:

  • end-to-end: 收到数据 agent 首先将 event写到磁盘上,当数据传送成功后,再删除;如果数据发送失败,可以重新发送
  • Store on failure: 数据写到本地,待恢复后,继续发送
  • Besteffort: 数据发送到接收方后,不会进行确认

Sink groups 允许组织多个sink到一个实体上。 Sink processors能够提供在组内所有Sink之间实现负载均衡的能力,而且在失败的情况下能够进行故障转移从一个Sink到另一个Sink

四、事务

消息传递的可靠性保证的三种方式,可以类比 kafka,

  1. At-Least-Once
  2. At-Most-Once
  3. Exactly-Once

Flume 采用的是 At-Least-Once 策略,Flume 保障 at-least-once 的基础就是Transaction,Flume的transaction是有生命周期的,分别是start、commit、rollback和close.

4.1. Flume 推送事务流程

  • doPut

    将批数据先写入临时缓冲区putList,不是来一条Event就处理,是来一批Event才处理

  • doCommit

    检查Channel内存队列空间是否充足,充足则直接写入Channel内存队列,不足则doRollback回滚数据到putList,等待重新传递,回滚数据指的是putList的Event索引回退到之前

  • doRollback

    回滚数据

4.2. Flume 拉取事务流程

  • doTake

    先将数据取到临时缓冲区takeList

  • doCommit

    如果数据全部发送成功,则清除临时缓冲区takeList

  • doRollback

    数据发送过程中如果出现异常,将临时缓冲区takeList中的数据doRollback归还给Channel内存队列,等待重新传递

结合Flume事务流程,细化 Flume 内部整体的流程机制:

  1. Flume source 组件会监听数据源,接收数据,然后根据一定的规则做预处理或者分发
  2. 根据用户是否配置拦截器interceptors,对接收的event做一些定制化的处理
  3. 根据用户是否配置选择器(默认是选择多路复用选择器),基于选择器决定将event分发给具体哪个channel;这里就使用到了上小节的事务机制,当事务提交成功后,source会对源数据进行ack
  4. 根据用户是否配置processor,来选择是否需要进行容错或者负载均衡
  5. 基于配置的策略,sink读取channel中的event,然后写到目标源,这里使用事务机制,当写入成功后,会执行事务提交操作

五、FLUME 拓扑结构

5.1. 故障转移

FailoverSink Processor 会通过配置维护了一个优先级列表。保证每一个有效的事件都会被处理。在这配置中,要设置sinkgroups processor 为 failover,需要为所有的 sink 分配优先级,所有的优先级数字必须是唯一的。此外,failover time 的上限可以通过 maxpenalty 属性来进行设置。

故障转移的工作原理是将连续失败的sink分配到一个失败的消息队列中这个sink不会做任何事。

假如在一个 Agent 中,一个 Channen 对应三个 Sink, Sink1 和 Sink2,Sink3,优先级顺序为 Sink1>Sink2>Sink3,所以 Channel 过来的数据总是先给 Sink3,如果 Sink3 挂掉以后的话在的最大回退周期(也即是Failover Sink Processor 中的 processor.maxpenalty 属性值)内自动起来的话 Channel 传来的数据是不会再考虑Sink1 的,会传给下一个优先级比较高的Sink2,如果 Sink1 恢复的时间大于 processor.maxpenalty 这个值的话那么在下一次传输数据给Sink的时候还是会优先的考虑给Sink1.

使用Flume1监控一个端口,其sink组中的sink分别对接Flume2和Flume3,采用FailoverSinkProcessor,实现故障转移的功能。

截屏2021-10-20 上午10.45.00

在主节点的 hadoop102 上的在 /opt/module/flume/job 目录下创建group2目录

  1. 在 group2 目录中创建 flume-netcat-flume.conf 文件
    配置1个netcat source和 1 个channel、1个sink group(2个sink),分别输送给 flume-flume-console1 和flume-flume-console2

    在新创建的 flume-netcat-flume.conf 文件中添加如下内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# Name the components on this agent
a1.sources = r1
a1.channels = c1
a1.sinkgroups = g1
a1.sinks = k1 k2

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142

# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
  1. 在 group2 目录中创建 flume-flume-console1.conf 在文件中填入如下内容, 目的是配置上级 Flume 输出的 Source,输出是到本地控制台
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141

# Describe the sink
a2.sinks.k1.type = logger

# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
  1. 创建 flume-flume-console2.conf 添加如下内容作用是配置上级 Flume 输出的 Source,输出是到本地控制台,只不过他的优先级高于上面一个
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
BASH
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2

# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142

# Describe the sink
a3.sinks.k1.type = logger

# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2

复制一下hadoop102,因为每次启动一个 flume 的话都会处于阻塞状态,然后分别开启对应配置文件: flume-flume-console2,flume-flume-console1,flume-netcat-flume

在这里插入图片描述

分别在 4-hadoop102,3-hadoop102,2-hadoop102 的/opt/module/flume目录下**执行

1
2
3
4
5
6
bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group2/flume-flume-console2.conf -Dflume.root.logger=INFO,console
1
bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group2/flume-flume-console1.conf -Dflume.root.logger=INFO,console
1
bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group2/flume-netcat-flume.conf
1

2-hadoop102 上面执行完

1
2
SHELL
bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group2/flume-netcat-flume.conf

后发现3-hadoop1024-hadoop102 的机器上会在之前的启动上的日志后面显示绑定和连接上了Avro Sink

在这里插入图片描述

1-hadoop102机器上输入nc localhost 44444 回车以后随便输入内容

在这里插入图片描述

由于当时规定的是 k2 的优先级高,k2 对应的是 flume-flume-console2.conf 这个配置文件,而这个配置文件是在4-hadoop102 开着的,4-hadoop102 上面看一下也确实收到了消息。

在这里插入图片描述

因为 k1 的优先级低,所以3-hadoop102 上面的 flume-flume-console1.conf 自然就收不到消息,kill 掉4-hadoop102 上面的 flume 进程发现只有 3-hadoop102 的机器能接收到消息,这也验证了我们的故障转移的配置是可行的。

在这里插入图片描述