0%

Flume快速上手

日志采集框架 Flume 的安装使用。


概述

Cloudera 开发的分布式日志收集系统 Flume,是 Hadoop 周边组件之一。其可以实时的将分布在不同节点、机器上的日志收集到 HDFS 中。Flume 初始的发行版本目前被统称为 Flume OG(original generation),属于 Cloudera。但随着 FLume 功能的扩展,Flume OG 代码工程臃肿、核心组件设计不合理、核心配置不标准等缺点暴露出来,尤其是在 Flume OG 的最后一个发行版本 0.94.0 中,日志传输不稳定的现象尤为严重。为了解决这些问题,2011 年 10 月 22 号,Cloudera 完成了 Flume-728,对 Flume 进行了里程碑式的改动:重构核心组件、核心配置以及代码架构,重构后的版本统称为 Flume NG(next generation);改动的另一原因是 Flume 被纳入 Apache 旗下,Cloudera Flume 改名为 Apache Flume。

  • Flume 是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统,可以适用于大部分的日常数据采集场景。
  • Flume 可以采集单个文件、目录下的文件、Socket 数据包、Kafka 等各种形式源数据,又可以将采集到的数据下沉(Sink)到HDFS、HBase、Hive、Kafka 等众多外部存储系统中。
  • 一般的采集需求,通过对 Flume 的简单配置即可实现。
  • Flume 针对特殊场景也具备良好的自定义扩展能力。

核心组件变化

FLUM OG

FLUM OG 架构图

FLUM OG 的特点是:

  • FLUM OG 有三种角色的节点,如上图:代理节点(Agent)、收集节点(Collector)、主节点(Master)。
  • Agent 从各个数据源收集日志数据,将收集到的数据集中到 Collector,然后由收集节点汇总存入 HDFS。master 负责管理 agent,collector 的活动。
  • Agent、Collector 都称为 node,node 的角色根据配置的不同分为 logical node(逻辑节点)、physical node(物理节点)。对 logical nodes 和 physical nodes 的区分、配置、使用一直以来都是使用者最头疼的地方。
  • Agent、Collector 由 Source、Sink 组成,代表在当前节点数据是从 Source 传送到 Sink。如下图。

OG 节点组成图

FLUM NG

FLUM NG 架构图

对应于 OG 的特点,FLUM NG 的特点是:

  • NG 只有一种角色的节点:代理节点(Agent)。
  • 没有 Collector、Master 节点。这是核心组件最核心的变化。
  • 去除了 physical nodes、logical nodes 的概念和相关内容。
  • Agent 节点的组成也发生了变化。如下图,NG Agent 由 Source、Sink、Channel 组成。

NG 节点组成图

运行机制

Flume 分布式系统中最核心的角色是 agent,Flume 采集系统由一个个 agent 所连接起来形成,每一个 agent 相当于一个数据传递员。Agent 内部有三个组件:

  • Source:采集组件,用于跟数据源对接,以获取数据。
  • Sink:下沉组件,用于往下一级 agent 传递数据或者往最终存储系统传递数据。
  • Channel:传输通道组件,用于从 Source 将数据传递到 Sink。

Flume 传输数据的基本单元是 event。Event 由 header 头信息和载有日志数据的字节数组(body)构成:

  • Header 是容纳了 key-value 字符串对的无序集合,key 在集合里是唯一的。通过 header 可以在上下文路由进行扩展。
  • Body 中载有的数据对 Flume 来说是不透明的。

安装测试

安装Flume

在 node03 解压安装 Flume:

/export/softwares/
1
tar -zxvf flume-ng-1.6.0-cdh5.14.0.tar.gz -C /export/servers/

测试需求

作为 Flume 的测试方案,node02 使用 Telent 向 node03 发送一些网络数据,node03 通过 Flume 采集网络数据,并将日志打印在控制台。

分析

Source 使用 netcat,Sink 使用 logger,Channel 可用 memory

Flume测试方案

NetCat Source

NetCat Source,组件 type 为 netcat

  • 监听给定的端口,并将每一行文本转换为一个 event。行为类似于 nc -k -l [host] [port]
  • 换句话说,它打开指定的端口并监听数据。期望提供的数据是换行符分隔的文本。每行文本都将变成 Flume event,并通过连接的 Channel 发送。
属性 默认值 解释
channels
type 组件类型,这个是: netcat
bind 要监听的 hostname 或 IP 地址
port 监听的端口
max-line-length 512 解析成 event body 的每行数据的最大字节数
ack-every-event true 对收到的每个 event 都响应 OK
selector.type replicating 可选值:replicatingmultiplexing ,分别表示: 复制、多路复用
selector.* Channel 选择器的相关属性,取决于selector.type 的值
interceptors 该 Source 所使用的拦截器,多个用空格分开
interceptors.* 拦截器相关的属性配置

Logger Sink

Logger Sink,组件 type 为 logger

  • 在 INFO 级别记录 event,通常用于测试、调试。
  • 该 Sink 是唯一一个不需要额外配置就能输出 event 原始内容的 Sink。参照 输出原始数据到日志
属性 默认值 解释
channel
type 组件类型,这个是: logger
maxBytesToLog 16 event body 输出到日志的最大字节数,超出的部分会被丢弃

配置测试方案

创建 netcat.conf,配置采集方案:

/export/servers/apache-flume-1.6.0-cdh5.14.0-bin/conf/netcat-logger.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 命名此agent上的各组件
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 描述和配置Source组件(netcat)
a1.sources.r1.type = netcat
a1.sources.r1.bind = 192.168.153.102
a1.sources.r1.port = 7777

# 描述和配置Sink组件(logger)
a1.sinks.k1.type = logger

# 描述和配置Channel组件(memory)
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 将Source和Sink绑定到Channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动Flume

有了这个配置文件,可以启动 Flume,等待接收消息:

/export/servers/apache-flume-1.6.0-cdh5.14.0-bin/
1
bin/flume-ng agent -c conf -f conf/netcat.conf -n a1  -Dflume.root.logger=INFO,console
  • -c <conf-dir><conf-dir> 目录需要包括一个 Shell 脚本 flume-env.sh 和一个 log4j 配置文件 log4j.properties
  • -f <conf-file>:指定采集方案。
  • -n <agent>:指定 agent。
  • -Dflume.root.logger=INFO,console:命令中传递了这个 Java 参数,用来在没有自定义环境脚本下强制 Flume 将日志信息显示在控制台。

启动Telnet

在 node02 上使用 Telnet 发送数据:

1
2
3
4
5
# 使用Telnet模拟数据发送
yum -y install telnet

# 启动Telnet
telnet node03 7777

发送数据:

Flume_telnet

查看控制台输出:

Flume_netcat_flume-ng.png

采集目录到HDFS

需求

某服务器的特定目录下,每当有新文件出现,就把文件采集到HDFS中去。

分析

Source 使用 spooldir,Sink 使用 hdfs,Channel 可用 memoryfile

Spooling Directory Source

Spooling Directory Source,组件 type 为 spooldir

  • 监视目录中出现的新文件,采集新文件中的内容。
  • 采集过的文件,会被重命名(默认添加一个“.COMPLETED”后缀)以标示完成。
  • Spooling Directory Source 是可靠的,即使 Flume 重新启动或终止也不会丢失数据。为了获取这种可靠性,放入监视目录中的文件名必须唯一
  • Flume 会检测以下问题,如果出现,将会在日志文件中打印错误并停止处理:
  • 对放入监视目录的文件进行写入操作。
  • 放入监视目录中的文件名不唯一。
Property Name Default Description
channels
type 组件类型,这个是: spooldir
spoolDir Flume Source监视的目录,该目录下的文件会被 Flume 采集
fileSuffix .COMPLETED 被 Flume 采集完成的文件被重命名的后缀。1.txt被Flume采集完成后会重命名为1.txt.COMPLETED
deletePolicy never 是否删除已完成采集的文件,可选值:neverimmediate
fileHeader false 是否添加文件的绝对路径名(绝对路径+文件名)到 header 中。
fileHeaderKey file 添加绝对路径名到 header 里面所使用的 key(配合上面的fileHeader一起使用)
basenameHeader false 是否添加文件名(只是文件名,不包括路径)到 header 中
basenameHeaderKey basename 添加文件名到 header 里面所使用的 key(配合上面的basenameHeader 一起使用)
includePattern ^.*$ 指定会被采集的文件名的正则表达式,跟下面的 ignorePattern 可以一起使用。如果一个文件名同时被这两个正则匹配到,则会被忽略,换句话说 ignorePattern 的优先级更高
ignorePattern ^$ 指定要忽略的文件名称正则表达式。它可以跟 includePattern 一起使用,如果一个文件被 ignorePatternincludePattern 两个正则都匹配到,这个文件会被忽略。
trackerDir .flumespool 用于存储与文件处理有关的元数据的目录。如果此路径不是绝对路径,则将其解释为相对于 spoolDir 的相对路径。
consumeOrder oldest 设定监视目录内文件的采集顺序。默认是 oldest(也就是修改时间最早的文件最先被采集,如果修改时间相同,采集词典顺序最小的文件),可选值有: oldestyoungestrandom 。当使用 oldestyoungest 的时候,Flume 会扫描整个目录以选择“最旧”/“最新”的文件,如果目录中有大量文件,这可能会很慢。当使用 random 时,如果一直有新文件产生,老文件可能要过很久才会被收集
pollDelay 500 轮询新文件时使用的延迟,单位:毫秒
recursiveDirectorySearch false 是否收集子目录下的新文件
maxBackoff 4000 如果 Channel 已满,则在两次尝试写入 Channel 之间最长的等待时间(退避时间),单位:毫秒。Source 会从较低的退避时间开始,并在每次 Channel 抛出 ChannelException 后以指数方式增加这个值,直到达到 maxBackoff 指定的值。
batchSize 100 批量转移到 Channel 的粒度
inputCharset UTF-8 解析器读取文件时使用的编码(解析器会把所有文件当做文本读取)
decodeErrorPolicy FAIL 当从文件读取时遇到不可解析的字符时如何处理。 FAIL :抛出异常,解析文件失败; REPLACE :替换掉这些无法解析的字符,通常是用 Unicode U+FFFDIGNORE :删除无法解析的字符。
deserializer LINE 指定用于将文件解析成 event 的解析器。默认将每行解析为一个 event。所有解析器必须实现 EventDeserializer.Builder 接口
deserializer.* 解析器的相关属性,因 event 解析器而异
bufferMaxLines (Obselete) 现在忽略此选项
bufferMaxLineLength 5000 (Deprecated) 提交缓冲区中一行的最大长度。请改用deserializer.maxLineLength
selector.type replicating 可选值:replicatingmultiplexing ,分别表示: 复制、多路复用
selector.* Channel 选择器的相关属性,取决于selector.type 的值
interceptors 该 Source 所使用的拦截器,多个用空格分开
interceptors.* 拦截器相关的属性配置

HDFS Sink

HDFS Sink,组件 type 为 hdfs

  • 支持创建文本和序列文件,以及压缩这两种类型的文件。
  • 可以根据经过的时间、文件大小或 event 数量定期滚动文件。滚动(roll)是指将临时文件以重命名的方式生成最终目标文件,会关闭当前文件并创建一个新的临时文件来存储新数据。
  • 它还根据时间戳或产生 event 的机器之类的属性对数据进行分桶/分区。
  • HDFS 目录路径可能包含转义字符串(如%{host}/%H/%M),这些转义字符串将由 HDFS Sink 自动替换,生成对应的目录或文件名。
Alias Description
%{host} 替换 event header 中 key 为 host 对应的 value。这个 host 可以是任意的,比如 %{aabc} 将读取 header 中 key 为 aabc 对应的 value
%t Unix 时间戳(精度:毫秒)
%a locale’s short weekday name (Mon, Tue, …)
%A locale’s full weekday name (Monday, Tuesday, …)
%b locale’s short month name (Jan, Feb, …)
%B locale’s long month name (January, February, …)
%c locale’s date and time (Thu Mar 3 23:05:25 2005)
%d 月份中的天(00到31)
%e 月份中的天(1到31)
%D 日期,与 %m/%d/%y 相同 ,例如:02/09/19
%H 小时(00到23)
%I 小时(01到12)
%j 年中的天数(001到366)
%k 小时(0到23),注意跟 %H的区别
%m 月份(01到12)
%n 月份(1到12,不填充0)
%M 分钟(00到59)
%p locale’s equivalent of am or pm
%s seconds since 1970-01-01 00:00:00 UTC
%S 秒(00到59)
%y 一年中的最后两位数(00到99),比如1998年的 %y 就是98
%Y 年(2010)
%z 数字时区(比如:-0400)
%[localhost] Agent 实例所在主机的 hostname
%[IP] Agent实例所在主机的 IP
%[FQDN] Agent 实例所在主机的完全限定域名

注意:%[localhost]%[IP]%[FQDN] 这三个转义符实际上都是用 Java 的 API 来获取的,在一些网络环境下可能会获取失败。

属性名 默认值 解释
channel
type 组件类型,这个是: hdfs
hdfs.path HDFS目录路径(例如:hdfs://namenode/flume/webdata/)
hdfs.filePrefix FlumeData Flume在HDFS文件夹下创建新文件的固定前缀
hdfs.fileSuffix Flume在HDFS文件夹下创建新文件的后缀(比如:.avro,注意这个“.”不会自动添加,需要显式配置)
hdfs.inUsePrefix Flume正在写入的临时文件前缀,默认没有
hdfs.inUseSuffix .tmp Flume正在写入的临时文件后缀
hdfs.rollInterval 30 当前文件写入达到该值时间后触发滚动创建新文件(0表示不按照时间来分割文件),单位:秒
hdfs.rollSize 1024 当前文件写入达到该大小后触发滚动创建新文件(0表示不根据文件大小来分割文件),单位:字节
hdfs.rollCount 10 当前文件写入Event达到该数量后触发滚动创建新文件(0表示不根据 Event 数量来分割文件)
hdfs.idleTimeout 0 关闭非活动文件的超时时间(0表示禁用自动关闭文件),单位:秒
hdfs.batchSize 100 向 HDFS 写入内容时每次批量操作的 Event 数量
hdfs.codeC 压缩算法。可选值:gzipbzip2lzolzop 、 ``snappy`
hdfs.fileType SequenceFile 文件格式,目前支持: SequenceFileDataStreamCompressedStream 。 1. DataStream 不会压缩文件,不能设置hdfs.codeC 2. CompressedStream 必须设置可用的hdfs.codeC参数
hdfs.maxOpenFiles 5000 允许打开的最大文件数,如果超过这个数量,最先打开的文件会被关闭
hdfs.minBlockReplicas 指定每个HDFS块的最小副本数。 如果未指定,则使用 classpath 中 Hadoop 的默认配置。
hdfs.writeFormat Writable 文件写入格式。可选值: TextWritable 。在使用 Flume 创建数据文件之前设置为 Text,否则 Impala 或 Hive 无法读取这些文件。
hdfs.callTimeout 10000 允许HDFS操作文件的时间(单位:毫秒),比如:open、write、flush、close。如果HDFS操作超时次数增加,应该适当调高这个这个值。
hdfs.threadsPoolSize 10 每个HDFS Sink实例操作HDFS IO时开启的线程数(open、write 等)
hdfs.rollTimerPoolSize 1 每个HDFS Sink实例调度定时文件滚动的线程数
hdfs.kerberosPrincipal 用于安全访问 HDFS 的 Kerberos 用户主体
hdfs.kerberosKeytab 用于安全访问 HDFS 的 Kerberos keytab 文件
hdfs.proxyUser 代理名
hdfs.round false 是否应将时间戳向下舍入(如果为true,则影响除 %t 之外的所有基于时间的转义符)
hdfs.roundValue 1 向下舍入到小于当前时间的最高倍数,默认为1,范围为1~60(使用hdfs.roundUnit配置的单位)。例子:假设当前时间是18:32:01,hdfs.roundUnit = minute,如果 roundValue = 5,则时间戳会取为 18:30;如果 roundValue=7,则时间戳会取为 18:28;如果 roundValue=10,则时间戳会取为:18:30。
hdfs.roundUnit second 向下舍入时的单位,可选值: secondminutehour
hdfs.timeZone Local Time 解析存储目录路径时候所使用的时区名,例如:America/Los_Angeles、Asia/Shanghai
hdfs.useLocalTimeStamp false 使用日期时间转义符时是否使用本地时间戳(而不是使用 event header 中自带的时间戳)
hdfs.closeTries 0 开始尝试关闭文件时最大的重命名文件的尝试次数(因为打开的文件通常都有个.tmp的后缀,写入结束关闭文件时要重命名把后缀去掉)。
如果设置为1,Sink在重命名失败(可能是因为 NameNode 或 DataNode 发生错误)后不会重试,这样就导致了这个文件会一直保持为打开状态,并且带着.tmp的后缀;
如果设置为0,Sink会一直尝试重命名文件直到成功为止;
关闭文件操作失败时这个文件可能仍然是打开状态,这种情况数据还是完整的不会丢失,只有在Flume重启后文件才会关闭。
hdfs.retryInterval 180 连续尝试关闭文件的时间间隔(秒)。 每次关闭操作都会调用多次 RPC 往返于 Namenode ,因此将此设置得太低会导致 Namenode 上产生大量负载。 如果设置为0或更小,则如果第一次尝试失败,将不会再尝试关闭文件,并且可能导致文件保持打开状态或扩展名为“.tmp”。
serializer TEXT Event 转为文件使用的序列化器。其他可选值有: avro_event 或其他 EventSerializer.Builderinterface 接口的实现类的全限定类名。
serializer.* 根据上面 serializer 配置的类型来根据需要添加序列化器的参数

Memory Channel

Memory Channel,组件 type 为 memory

  • event 存储在内存队列中,该队列可配置最大值。
  • 对于需要更高吞吐量、允许在 agent 发生故障时丢失分段数据的流而言,它是理想的选择。
属性 默认值 解释
type 组件类型,这个是: memory
capacity 100 Channel 中存储 event 的最大数量
transactionCapacity 100 每次事务中可以从 source 中拿到或者送到 sink 中的 event 的最大数量(不能比 capacity 大)
keep-alive 3 添加或删除一个 event 的超时时间,单位:秒
byteCapacityBufferPercentage 20 指定 event header 所占空间大小与 channel 中所有 event 的总大小之间的百分比
byteCapacity see description Channel 中最大允许存储所有 event 的总字节数。默认值为 JVM 可用最大内存的80%(即命令行上传递的 -Xmx 值的80%)。 计算总字节时只计算 event body,这也是提供 byteCapacityBufferPercentage 配置参数的原因。注意,当一个 JVM 里面有多个 Memory Channel,并且它们恰好持有相同的物理 event(例如:这些 Channel 通过复制通道选择器接收同一个 Source 中的 event),此时这些 event 占用的空间是累加的,而不会只计算一次。如果这个值设置为 0(不限制),就会达到 200G 左右的内部硬件限制。

如何理解 byteCapacity 和 byteCapacityBufferPercentage?

举两个例子来帮助理解最后两个参数,两个例子都有共同的前提:假设 JVM 最大的可用内存是 100 M(或者说 JVM 启动时指定了 `-Xmx=100m`)。

例子1: byteCapacityBufferPercentage 设置为 20byteCapacity 设置为 52428800(即 50 M),Memory Channel 可用内存是 50 M,此时内存中所有 event body 的总大小就被限制为 50 M *(1-20%) = 40 M。

例子2: byteCapacityBufferPercentage 设置为 10byteCapacity 不设置,Memory Channel 可用内存是 80 M,此时内存中所有 event body 的总大小就被限制为 100 M * 80% *(1-10%) = 72 M。

File Channel

File Channel,组件 type 为 file

  • 默认情况下,File Channel 使用默认的用户主目录内的 checkpoint 目录和数据目录的路径。因此:1.如果 Agent 中有多个活动的 File Channel 实例,则只有一个能够锁定目录并导致其他的 Channel 初始化失败;2.因此有必要为所有已配置的 Channel 显式配置不同的 checkpoint 目录,最好是在不同的磁盘上。
  • 由于 File Channel 将在每次提交后同步到磁盘,因此将其与和 event 一起批处理的 sink/source 耦合在一起可能很有必要,以便在多个磁盘不可用于 checkpoint 和数据目录时提供良好的性能。
Property Name Default Description
type 组件类型,这个是: file
checkpointDir ~/.flume/file-channel/checkpoint 存储 checkpoint 的目录
useDualCheckpoints false 是否备份 checkpoint。如果设置为 true必须设置 backupCheckpointDir 参数
backupCheckpointDir 备份 checkpoint 的目录。 此目录不能与数据目录或 checkpointDir 相同
dataDirs ~/.flume/file-channel/data 用逗号分隔的目录列表,用于存储日志文件。在不同的磁盘上使用多个目录可以提高 File Channel 的性能
transactionCapacity 10000 Channel 支持的单个事务的最大容量
checkpointInterval 30000 检查点的时间间隔(毫秒)
maxFileSize 2146435071 单个日志文件的最大字节数。这个默认值约等于 2047 MB
minimumRequiredSpace 524288000 最小空闲空间的字节数。为了避免数据损坏,当空闲空间低于这个值的时候,File Channel 将拒绝一切存取请求
capacity 1000000 Channel 的最大容量
keep-alive 3 存入 event 的最大等待时间(秒)
use-log-replay-v1 false (专家)是否使用老的回放逻辑 (Flume 默认是使用v2版本的回放方法,但是如果v2版本不能正常工作可以考虑通过这个参数改为使用v1版本,v1版本是从Flume1.2开始启用的,回放是指系统关闭或者崩溃前执行的校验检查点文件和文件channel记录是否一致程序)
use-fast-replay false (专家)是否开启快速回放(不使用队列)
checkpointOnClose true 控制 Channel 关闭时是否创建 checkpoint。在关闭位置创建 checkpoint 可以避免回放,从而加快了 File Channel 的后续启动速度
encryption.activeKey 加密数据所使用的 key 名称
encryption.cipherProvider 加密类型,支持的类型:AESCTRNOPADDING
encryption.keyProvider 密钥提供者类型,支持的类型: JCEKSFILE
encryption.keyProvider.keyStoreFile keystore 文件路径
encrpytion.keyProvider.keyStorePasswordFile keystore 密码文件路径
encryption.keyProvider.keys 所有 key 的列表,包含所有使用过的加密key名称
encyption.keyProvider.keys.*.passwordFile 可选的秘钥密码文件路径

配置采集方案

创建 spooldir-hdfs.conf,配置采集方案:

/export/servers/apache-flume-1.6.0-cdh5.14.0-bin/conf/spooldir.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# 命名此agent上的各组件
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 描述和配置Source组件(spooldir)
a1.sources.r1.type = spooldir
# 配置监视目录
a1.sources.r1.spoolDir = /export/data/dirfile
# 是否添加文件的绝对路径到header
a1.sources.r1.fileHeader = true

# 描述和配置Sink组件(hdfs)
a1.sinks.k1.type = hdfs
# 路径支持转义字符串
a1.sinks.k1.hdfs.path = hdfs://node01:8020/spooldir/files/%y-%m-%d/%H%M/
# Flume写入hdfs时在文件名前加入的前缀,默认为FlumeData
a1.sinks.k1.hdfs.filePrefix = events-
# 是否应将时间戳向下舍入(如果为true,则影响除“%t”之外的所有基于时间的转义符)
a1.sinks.k1.hdfs.round = true
# 向下舍入到小于当前时间的最高倍数,默认为1,范围为1~60(使用hdfs.roundUnit配置的单位)
a1.sinks.k1.hdfs.roundValue = 10
# 向下舍入时的单位,默认为second(second/minute/hour)
a1.sinks.k1.hdfs.roundUnit = minute
# 间隔多长时间触发滚动(临时文件转为最终目标文件,并创建新文件),默认为30(0表示不根据时间间隔滚动)
a1.sinks.k1.hdfs.rollInterval = 3
# 触发滚动的文件大小,以字节为单位,默认为1024(0表示不根据文件大小滚动)
a1.sinks.k1.hdfs.rollSize = 20
# 触发滚动的event数量,默认为10(0表示不根据event数量滚动)
a1.sinks.k1.hdfs.rollCount = 5
# 每个批次刷新到HDFS上的文件中的event数量,默认为100
a1.sinks.k1.hdfs.batchSize = 1
# 替换转义字符串时,使用本地时间(而不是event header中的时间戳),默认为false
a1.sinks.k1.hdfs.useLocalTimeStamp = true
# 生成的文件类型,默认为Sequencefile,可选DataStream(不支持压缩)和CompressedStream(支持压缩,需设置正确的hdfs.codeC)
a1.sinks.k1.hdfs.fileType = DataStream

# 描述和配置Channel组件(memory)
a1.channels.c1.type = memory
# 该通道中最大的可以存储的event数量,默认为100
a1.channels.c1.capacity = 1000
# 每次可以从Source中拿到或者送到Sink中的event的最大数量,默认为100
a1.channels.c1.transactionCapacity = 100

# 将Source和Sink绑定到Channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动Flume

启动 Flume,等待往监视目录添加文件:

/export/servers/apache-flume-1.6.0-cdh5.14.0-bin/
1
bin/flume-ng agent -c conf/ -f conf/spooldir-hdfs.conf -n a1 -Dflume.root.logger=INFO,console

移动文件到目录

再建立一个 node03 的连接,创建三个测试文件 test1.txttest1.txttest1.txt

移动 test1.txt 到监视目录,观察控制台输出:

移动test1

同时将test1.txttest1.txt两个文件移动到监视目录,观察控制台输出:

移动test2test3

查看 HDFS 中的文件,打印文件内容:

查看HDFS文件

采集文件到HDFS

需求

某业务系统的日志内容不断增加,需要把日志文件中追加的数据实时采集到 HDFS。

分析

Source 使用 exec,Sink 使用 hdfs,Channel 可用 memoryfile

Exec Source

Exec Source,组件 type 为 exec

  • 启动时会运行给定的 Unix 命令(如 cat [named pipe]tail -f [file]),该 Unix 命令往往是能在标准输出上连续产生数据(而 stderr 会直接被丢弃 ,除非将属性 logStdErr 设置为 true)。如果该 Unix 命令由于某种原因而退出,则该 Source 也将退出并且将不再产生任何数据(如 date 命令不能连续产生数据流,在产生数据后便退出)。
  • exec无法保证是可靠地。比如使用 tail -f [file] 时,Flume 会将指定文件的每一行作为 event 发送到 Channel,但如果 Channel 已满,数据将会丢失。想要获得更高的可靠性,建议使用 Spooling Directory Source、Taildir Source 或通过 SDK 与 Flume 直接集成。
Property Name Default Description
channels
type 组件类型,这个是: exec
command 要执行的命令,一般是 cattail
shell 设置用于运行命令的 shell,例如 /bin/sh -c。仅适用于依赖 shell 功能的命令,例如:通配符、back ticks、管道等
restartThrottle 10000 尝试重新启动之前等待的时间,单位:毫秒
restart false 如果执行的命令挂掉,是否重新启动
logStdErr false 是否记录命令的 stderr
batchSize 20 单次读取并发送到 Channel 的最大行数
batchTimeout 3000 在未达到缓冲区大小的情况下,将数据推送到下游之前要等待的时间,单位:毫秒
selector.type replicating 可选值:replicatingmultiplexing ,分别表示: 复制、多路复用
selector.* Channel 选择器的相关属性,取决于 selector.type 的值
interceptors 以空格分隔的拦截器列表
interceptors.* 拦截器相关的属性配置

配置采集方案

创建 tailfile-hdfs.conf,配置采集方案:

/export/servers/apache-flume-1.6.0-cdh5.14.0-bin/conf/tailfile-hdfs.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# Name the components on this agent
1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
# 要执行的命令
a1.sources.r1.command = tail -f /export/data/tailfile/access_log
a1.sources.r1.channels = c1

# Describe k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://node01:8020/exec/tailfile/%y-%m-%d/%H-%M
a1.sinks.k1.hdfs.filePrefix = access_log
a1.sinks.k1.hdfs.maxOpenFiles = 5000
a1.sinks.k1.hdfs.batchSize= 100
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollSize = 102400
a1.sinks.k1.hdfs.rollCount = 1000000
a1.sinks.k1.hdfs.rollInterval = 60
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.useLocalTimeStamp = true

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.keep-alive = 120
a1.channels.c1.capacity = 500000
a1.channels.c1.transactionCapacity = 600

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动Flume

启动 Flume,等待往文件追加数据:

/export/servers/apache-flume-1.6.0-cdh5.14.0-bin/
1
bin/flume-ng agent -c conf -f conf/tailfile-hdfs.conf -n agent1  -Dflume.root.logger=INFO,console

追加文件内容

创建 Shell 脚本文件 tailfile.sh,用来往文件中追加内容:

/export/data/tailfile/tailfile.sh
1
2
3
4
5
6
#!/bin/bash
while true
do
date >> /export/data/tailfile/access_log;
sleep 0.5;
done

执行脚本,观察控制台输出:

Flume_exec_flume-ng

查看 HDFS 中的文件,打印文件内容:

Flume_exec_ls

级联

需求

第一个 agent 负责采集日志文件中的数据,并通过网络发送到第二个 agent;第二个 agent 负责接收第一个 agent 发送的数据,并将数据保存到 HDFS。两个 agent 位于两个不同的服务器。

分析

agent1:位于 node02,Source 使用 exec,Sink 使用 avro,Channel 使用 memory

agent2:位于 node03,Source 使用 avro,Sink 使用 hdfs,Channel 使用 memory

Flume级联方案

Avro Source

Avro Source,组件 type 为 avro

  • 监听 Avro 端口,并从外部 Avro 客户端流接收 event。
  • 与另一个 Flume agent 上的内置 Avro Sink 配对时,可以创建出分层的采集拓扑结构。
Property Name Default Description
channels
type 组件类型,这个是: avro
bind 监听的服务器名 hostname 或 ip
port 监听的端口
threads 产生的最大工作线程数
selector.type 可选值:replicatingmultiplexing ,分别表示: 复制、多路复用
selector.* Channel 选择器的相关属性,取决于 selector.type 的值
interceptors 以空格分隔的拦截器列表
interceptors.*
compression-type none 压缩类型,可选值: nonedeflate 。这个类型必须跟上一级的 AvroSink 相匹配
ssl false 将此设置为 true 以启用SSL加密,同时还必须指定 keystorekeystore-password
keystore Java keystore 文件的路径。SSL 必需
keystore-password Java keystore 的密码。SSL 必需
keystore-type JKS Java keystore 的类型。可以是 JKSPKCS12
exclude-protocols SSLv3 要排除的 SSL/TLS 协议的空格分隔列表。除了指定的协议外,还将始终排除SSLv3
ipFilter false 将此设置为 true 可启用 netty 的 ipFiltering
ipFilterRules 使用此配置定义N个 netty ipFilter 模式规则

Avro Sink

Avro Sink,组件 type 为 avro

  • 该 Sink 用于构成 Flume 分层采集拓扑结构的另一半。发送到该 Sink 的 Flume event 将转换为 Avro event,并发送到已配置的主机名/端口。这些 event 是从已配置的 Channel 中以已配置的批次大小批量获取的。
Property Name Default Description
channel
type 组件类型,这个是: avro
hostname 要绑定的 hostname 或 IP 地址
port 要监听的端口号
batch-size 100 每批量发送的 event 数量
connect-timeout 20000 第一次连接请求(握手)的超时时间,单位:毫秒
request-timeout 20000 请求超时时间,单位:毫秒
reset-connection-interval none 重置与下一跳的连接之前的时间(秒)。这将强制 Avro Sink 重新连接到下一跳。这将允许 Sink 在添加了新的主机时连接到硬件负载均衡器后面的主机,而无需重新启动 Agent
compression-type none 压缩类型,可选值: nonedeflate 。这个类型必须跟下一级的 AvroSource 相匹配
compression-level 6 event 的压缩级别。0 = 无压缩,1-9 为压缩。数字越高,压缩越多
ssl false 设置为 true 可以为此 AvroSink 启用 SSL。配置 SSL 时,可以选择设置 truststoretruststore-passwordtruststore-type,并指定是否 trust-all-certs
trust-all-certs false 如果将其设置为 true,将不检查远程服务器(Avro Source)的 SSL 服务器证书。这不应在生产中使用,因为它使攻击者更容易执行中间人攻击并“监听”加密的连接
truststore 定制 Java truststore 文件的路径。Flume 使用此文件中的证书颁发机构信息来确定是否应信任远程 Avro Source 的 SSL 身份验证凭据。如果未指定,将使用默认的 Java JSSE 证书颁发机构文件(在 Oracle JRE 中通常为 jssecacertscacerts
truststore-password 指定的 truststore 的密码
truststore-type JKS Java truststore 的类型。可以是 JKS 或其他受支持的 Java truststore 类型
exclude-protocols SSLv3 要排除的 SSL/TLS 协议的空格分隔列表。除了指定的协议外,还将始终排除 SSLv3
maxIoWorkers 2 * 机器中可用处理器的数量 I/O 工作线程的最大数量。这个是在 NettyAvroRpcClient 的 NioClientSocketChannelFactory 上配置的

配置采集方案

node02 创建 tailfile-avro.conf

/export/servers/apache-flume-1.6.0-cdh5.14.0-bin/conf/tailfile-avro.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 命名此agent上的各组件
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 描述和配置Source组件(exec)
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /export/data/tailfile/access_log
a1.sources.r1.channels = c1

# 描述和配置Sink组件(avro)
# Sink端的avro组件是一个数据发送者
a1.sinks = k1
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = 192.168.153.102
a1.sinks.k1.port = 4396
a1.sinks.k1.batch-size = 100

# 描述和配置Channel组件(memory)
a1.channels.c1.type = memory
# 该通道中最大的可以存储的event数量,默认为100
a1.channels.c1.capacity = 1000
# 每次可以从Source中拿到或者送到Sink中的event的最大数量,默认为100
a1.channels.c1.transactionCapacity = 100

# 将Source和Sink绑定到Channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

node03 创建 avro-hdfs.conf

/export/servers/apache-flume-1.6.0-cdh5.14.0-bin/conf/avro-hdfs.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# 命名此agent上的各组件
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 描述和配置Source组件(avro)
# Source中的avro组件是一个数据接收者
a1.sources.r1.type = avro
a1.sources.r1.bind = 192.168.153.102
a1.sources.r1.port = 4396

# 描述和配置Sink组件(hdfs)
a1.sinks.k1.type = hdfs
# 路径支持转义字符串
a1.sinks.k1.hdfs.path = hdfs://node01:8020/avro/tailfile/%y-%m-%d/%H%M/
# Flume写入hdfs时在文件名前加入的前缀,默认为FlumeData
a1.sinks.k1.hdfs.filePrefix = access_log
# 是否应将时间戳向下舍入(如果为true,则影响除“%t”之外的所有基于时间的转义符)
a1.sinks.k1.hdfs.round = true
# 向下舍入到小于当前时间的最高倍数,默认为1,范围为1~60(使用hdfs.roundUnit配置的单位)
a1.sinks.k1.hdfs.roundValue = 10
# 向下舍入时的单位,默认为second(second/minute/hour)
a1.sinks.k1.hdfs.roundUnit = minute
# 间隔多长时间触发滚动(临时文件转为最终目标文件,并创建新文件),默认为30(0表示不根据时间间隔滚动)
a1.sinks.k1.hdfs.rollInterval = 3
# 触发滚动的文件大小,以字节为单位,默认为1024(0表示不根据文件大小滚动)
a1.sinks.k1.hdfs.rollSize = 20
# 触发滚动的event数量,默认为10(0表示不根据event数量滚动)
a1.sinks.k1.hdfs.rollCount = 5
# 每个批次刷新到HDFS上的文件中的event数量,默认为100
a1.sinks.k1.hdfs.batchSize = 1
# 替换转义字符串时,使用本地时间(而不是event header中的时间戳),默认为false
a1.sinks.k1.hdfs.useLocalTimeStamp = true
# 生成的文件类型,默认为Sequencefile,可选DataStream(不支持压缩)和CompressedStream(支持压缩,需设置正确的hdfs.codeC)
a1.sinks.k1.hdfs.fileType = DataStream

# 描述和配置Channel组件(memory)
a1.channels.c1.type = memory
# 该通道中最大的可以存储的event数量,默认为100
a1.channels.c1.capacity = 1000
# 每次可以从Source中拿到或者送到Sink中的event的最大数量,默认为100
a1.channels.c1.transactionCapacity = 100

# 将Source和Sink绑定到Channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动Flume

先启动 node03 上的 Flume,等待和 node02 建立连接,以及接收数据:

/export/servers/apache-flume-1.6.0-cdh5.14.0-bin/
1
bin/flume-ng agent -c conf/ -f conf/avro-hdfs.conf -n a1 -Dflume.root.logger=INFO,console

Flume_avro_node03_flume-ng

后启动 node02 上的 Flume,等待采集数据:

/export/servers/apache-flume-1.6.0-cdh5.14.0-bin/
1
bin/flume-ng agent -c conf/ -f conf/tailfile-avro.conf -n a1 -Dflume.root.logger=INFO,console

Flume_avro_node02_flume-ng

追加文件内容

创建 Shell 脚本文件 tailfile.sh,用来往文件中追加内容。

执行脚本,观察 node03 控制台输出:

Flume_avro_node03_flume-ng_receivedata

查看 HDFS 中的文件,打印文件内容:

Flume_avro_cat

可以发现数据存放在两个目录下,原因是在 avro-hdfs.confhdfs Sink 的配置:

  1. hdfs.path 的路径中配置了 .../%H%M/,分钟作为转义字符串出现在目录的路径中。
  2. 通过 hdfs.roundValuehdfs.roundUnit 配置了对时间戳以 10 min 为单位向下舍去。

故障转移

需求

使用 Flume NG 本身提供的 failover 机制,搭建一个高可用的 Flume NG 集群,实现自动切换和恢复。

分析

使用三个节点搭建 Flume 的高可用集群,角色分配如下:

名称 主机 角色
Agent1 node01 采集日志文件,将采集数据发送到 Agent2 或 Agent3
Agent2 node02 主 Collector,优先接收 Agnet1 采集的数据,存入 HDFS
Agent3 node03 备 Collector,如果主 Collector 发生故障,代替接收 Agnet1 采集的数据,存入 HDFS

第一个 agent:位于 node01,Source 使用 exec,Sink 使用两个 avro 分别连接 Agent2 和 Agent3,Channel 使用 memory 。此外还配置 Sink 组逻辑处理器(Sink Processors),类型设置为 failover

第二个 agent:位于 node02,Source 使用 avro,Sink 使用 hdfs,Channel 使用 memory

第三个 agent:位于 node03,配置和 node02 上的一样 。

Failover Sink Processor

Failover Sink Processor,组件 type 为 failover

  • Failover Sink Processor 维护了一个 Sink 的优先级列表,以确保发送的 event 能够得到处理(交付)。
  • 故障转移机制的工作原理是给故障的 Sink 分配一个退避时间并下放到“冷却池”,退避时间会随着故障次数的增加而增加(直到设置的最大退避时间)。 Sink 成功发送 event 后,它将恢复到“存活池”。Sink 有着与之关联优先级,数值越大,优先级越高。 如果一个 Sink 在发送 event 时发生故障,会尝试下一个优先级最高的 Sink。例如,优先级为 100 的 Sink 会先于优先级为 80 的 Sink 被激活。如果未指定优先级,则根据 Sink 的配置顺序来选取。
  • 要使用故障转移选择器,不仅要设置 Sink 组处理器的 type 为 failover,还要为每一个 Sink 设置一个唯一的优先级数值。 可以使用 maxpenalty 属性设置故障转移时间的上限(毫秒)。
属性 默认值 解释
sinks 这一组所有 Sink 的名字,多个用空格分开
processor.type default 组件类型,这个是: failover
processor.priority.<sinkName> 组内 Sink 的权重值,<sinkName> 必须是当前组关联的 Sink 之一。数值(绝对值)越高越早被激活
processor.maxpenalty 30000 发生异常的 Sink 的最大退避时间(毫秒)

Agent1

创建 failover-agent1.conf,配置采集方案:

/export/servers/apache-flume-1.6.0-cdh5.14.0-bin/conf/failover-agent1.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# 命名此agent上的各组件
agent1.channels = c1
agent1.sources = r1
agent1.sinks = k1 k2

# 描述和配置Source组件(exec)
agent1.sources.r1.type = exec
# 配置监视目录
agent1.sources.r1.command = tail -F /export/data/tailfile/access_log

# 描述和配置Sink组件(avro)
# set sink1
agent1.sinks.k1.type = avro
agent1.sinks.k1.hostname = node02
agent1.sinks.k1.port = 7777
# set sink2
agent1.sinks.k2.type = avro
agent1.sinks.k2.hostname = node03
agent1.sinks.k2.port = 7777

# 设置拦截器
#agent1.sources.r1.interceptors = i1 i2
#agent1.sources.r1.interceptors.i1.type = static
#agent1.sources.r1.interceptors.i1.key = Type
#agent1.sources.r1.interceptors.i1.value = LOGIN
#agent1.sources.r1.interceptors.i2.type = timestamp

# 设置Sink组逻辑处理器
agent1.sinkgroups = g1
# 设置该组包含的Sink
agent1.sinkgroups.g1.sinks = k1 k2
# 设置类型为故障转移
agent1.sinkgroups.g1.processor.type = failover
# 设置sink1的权重,权重高的为主Collector
agent1.sinkgroups.g1.processor.priority.k1 = 10
# 设置sink1的权重,权重低的为备Collector
agent1.sinkgroups.g1.processor.priority.k2 = 1
# 发生异常的 Sink 的最大退避时间(毫秒)
agent1.sinkgroups.g1.processor.maxpenalty = 10000

# 描述和配置Channel组件(memory)
agent1.channels.c1.type = memory
# 该通道中最大的可以存储的event数量,默认为100
agent1.channels.c1.capacity = 1000
# 每次可以从Source中拿到或者送到Sink中的event的最大数量,默认为100
agent1.channels.c1.transactionCapacity = 100

# 将Source和Sink绑定到Channel
agent1.sources.r1.channels = c1
agent1.sinks.k1.channel = c1
agent1.sinks.k2.channel = c1

Agent2

创建 failover-agent2.conf,配置采集方案:

/export/servers/apache-flume-1.6.0-cdh5.14.0-bin/conf/failover-agent2.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# 命名此agent上的各组件
agent2.sources = r1
agent2.channels = c1
agent2.sinks = k1

# 描述和配置Source组件(avro)
agent2.sources.r1.type = avro
agent2.sources.r1.bind = node02
agent2.sources.r1.port = 7777

# 描述和配置Sink组件(hdfs)
agent2.sinks.k1.type = hdfs
# 路径支持转义字符串
agent2.sinks.k1.hdfs.path = hdfs://node01:8020/flume/failover/
agent2.sinks.k1.hdfs.fileType = DataStream
agent2.sinks.k1.hdfs.writeFormat = TEXT
agent2.sinks.k1.hdfs.rollInterval = 10
agent2.sinks.k1.hdfs.filePrefix = %Y-%m-%d

# 设置拦截器
#agent2.sources.r1.interceptors = i1
#agent2.sources.r1.interceptors.i1.type = static
#agent2.sources.r1.interceptors.i1.key = Collector
#agent2.sources.r1.interceptors.i1.value = node02

# 描述和配置Channel组件(memory)
agent2.channels.c1.type = memory
# 该通道中最大的可以存储的event数量,默认为100
agent2.channels.c1.capacity = 1000
# 每次可以从Source中拿到或者送到Sink中的event的最大数量,默认为100
agent2.channels.c1.transactionCapacity = 100

# 将Source和Sink绑定到Channel
agent2.sources.r1.channels = c1
agent2.sinks.k1.channel = c1

Agent3

创建 failover-agent3.conf,配置采集方案:

/export/servers/apache-flume-1.6.0-cdh5.14.0-bin/conf/failover-agent3.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# 命名此agent上的各组件
agent2.sources = r1
agent2.channels = c1
agent2.sinks = k1

# 描述和配置Source组件(avro)
agent2.sources.r1.type = avro
agent2.sources.r1.bind = node02
agent2.sources.r1.port = 7777

# 描述和配置Sink组件(hdfs)
agent2.sinks.k1.type = hdfs
# 路径支持转义字符串
agent2.sinks.k1.hdfs.path = hdfs://node01:8020/flume/failover/
agent2.sinks.k1.hdfs.fileType = DataStream
agent2.sinks.k1.hdfs.writeFormat = TEXT
agent2.sinks.k1.hdfs.rollInterval = 10
agent2.sinks.k1.hdfs.filePrefix = %Y-%m-%d

# 设置拦截器
#agent2.sources.r1.interceptors = i1
#agent2.sources.r1.interceptors.i1.type = static
#agent2.sources.r1.interceptors.i1.key = Collector
#agent2.sources.r1.interceptors.i1.value = node03

# 描述和配置Channel组件(memory)
agent2.channels.c1.type = memory
# 该通道中最大的可以存储的event数量,默认为100
agent2.channels.c1.capacity = 1000
# 每次可以从Source中拿到或者送到Sink中的event的最大数量,默认为100
agent2.channels.c1.transactionCapacity = 100

# 将Source和Sink绑定到Channel
agent2.sources.r1.channels = c1
agent2.sinks.k1.channel = c1

启动Flume

先启动 node02 和 node03 上的 Flume,等待和 node01 建立连接,以接收数据:

/export/servers/apache-flume-1.6.0-cdh5.14.0-bin/
1
2
3
bin/flume-ng agent -n agent3 -c conf/ -f conf/agent3.conf -Dflume.root.logger=DEBUG,console

bin/flume-ng agent -n agent2 -c conf/ -f conf/agent2.conf -Dflume.root.logger=DEBUG,console

后启动 node01 上的 Flume,等待采集数据:

/export/servers/apache-flume-1.6.0-cdh5.14.0-bin/
1
bin/flume-ng agent -n agent1 -c conf/ -f conf/agent1.conf -Dflume.root.logger=DEBUG,console

追加文件内容

在 node01 执行 Shell 脚本文件 tailfile.sh,用来往文件中追加内容。

由于 Agent2 的权重大,观察到 node02 的控制台中输出了文件采集到 HDFS 的信息。此时关闭 Agent2,会由 Agent3 代替执行相应的工作。如果再次启动 Agent2,相应的采集工作会由 Agent3 转移到 Agent2。

打印 HDFS 中的文件列表,发现在下图框起来的部分出现了两次断层,这三部分文件分别由 Agent2、Agent3、Agent2 Sink 到 HDFS 的:

Flume_failover_ls_ps

查看文件内容,发现第一次断层处出现了数据丢失,第二次断层处没有数据丢失,说明在故障转移过程中、 Exec Source 在执行 tail 命令时,存在数据丢失现象。而且是在 Agent3 接替因故障离线的 Agent2 时出现的数据丢失;当 Agent2 恢复使用,采集任务平稳地从 Agent3 转移到 Agent2 时,没有出现数据丢失的情况。

Agent2 转移到 Agent3:

Flume_failover_cat_node02to03

Agent3 转移到 Agent2:

Flume_failover_cat_node03to02

负载均衡

需求

使用 Flume NG 本身提供的 load balance 机制,搭建一个负载均衡的 Flume NG 集群,实现负载的轮询分发。

分析

使用三个节点搭建 Flume 的高可用集群,角色分配如下:

名称 主机 角色
Agent1 node01 采集日志文件,将采集数据发送到 Agent2 或 Agent3
Agent2 node02 Collector,接收 Agnet1 采集的数据,存入 HDFS
Agent3 node03 Collector,接收 Agnet1 采集的数据,存入 HDFS

第一个 agent:位于 node01,Source 使用 exec,Sink 使用两个 avro 分别连接 Agent2 和 Agent3,Channel 使用 memory 。此外还配置 Sink 组逻辑处理器(Sink Processors),类型设置为 load_balance

第二个 agent:位于 node02,Source 使用 avro,Sink 使用 logger,Channel 使用 memory

第三个 agent:位于 node03,配置和 node02 上的一样 。

Load balancing Sink Processor

Load balancing Sink Processor,组件 type 为 load_balance

  • Load balancing Sink Processor 提供了负载均衡多个 Sink 上的流量的功能。 它维护着一个必须在其上分配负载的活动 Sink 列表的索引。 支持轮询(round_robin)和随机(random)两种选择机制来分配负载。选择机制默认为 round_robin ,可以通过配置更改,也支持继承了 AbstractSinkSelector 的自定义选择器。
  • 调用时,此选择器使用其配置的选择机制选择下一个 Sink 并调用它。如果所选 Sink 无法正常工作,则处理器通过其配置的选择机制选择下一个可用 Sink。 此实现不会将失败的 Sink 列入黑名单,而是继续乐观地尝试每个可用的 Sink。如果所有 Sink 的调用结果都为失败,选择器会将故障抛给 sink runner。
  • 如果 backoff 设置为 true,失败的 Sink 会被列入黑名单,达到一定的超时时间后再自动从黑名单放出。 如从黑名单出来后 Sink 仍然无法响应,则再次进入黑名单时设定的超时时间会翻倍,以免陷入对无响应 Sink 的长时间等待中。 如果禁用此功能,所有失败的 Sink 的负载都将传递到线路中的下一个 Sink,无法均衡地平衡。
属性 默认值 解释
processor.sinks 这一组所有 Sink 的名字,多个用空格分开
processor.type default 组件类型,这个是: load_balance
processor.backoff false 退避机制
processor.selector round_robin 选择机制,可选值:round_robin (轮询)、random(随机)、「自定义类的FQCN」:继承了 AbstractSinkSelector 的自定义选择器
processor.selector.maxTimeOut 30000 发生异常的Sink的最长退避时间(毫秒)

Agent1

创建 load_balance-agent1.conf,配置采集方案:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# 命名此agent上的各组件
agent1.channels = c1
agent1.sources = r1
agent1.sinks = k1 k2

# 描述和配置Source组件(exec)
agent1.sources.r1.type = exec
agent1.sources.r1.command = tail -F /export/data/tailfile/access_log

# 描述和配置sink组件(avro)
# set sink1
agent1.sinks.k1.type = avro
agent1.sinks.k1.hostname = node02
agent1.sinks.k1.port = 7777
# set sink2
agent1.sinks.k2.type = avro
agent1.sinks.k2.hostname = node03
agent1.sinks.k2.port = 7777

# 设置Sink组逻辑处理器
agent1.sinkgroups = g1
# 设置该组包含的Sink
agent1.sinkgroups.g1.sinks = k1 k2
# 设置类型为负载均衡
agent1.sinkgroups.g1.processor.type = load_balance
# 启用退避。失败的 Sink 会被列入黑名单,达到一定的超时时间后再自动从黑名单放出
agent1.sinkgroups.g1.processor.backoff = true
# 负载均衡机制,可选值:round_robin(轮询)、random(随机)
agent1.sinkgroups.g1.processor.selector = round_robin
# 发生异常的Sink最长退避时间(毫秒)
agent1.sinkgroups.g1.processor.selector.maxTimeOut=10000

# 描述和配置Channel组件(memory)
agent1.channels.c1.type = memory
# 该通道中最大的可以存储的event数量,默认为100
agent1.channels.c1.capacity = 1000
# 每次可以从Source中拿到或者送到Sink中的event的最大数量,默认为100
agent1.channels.c1.transactionCapacity = 100

# 将Source和Sink绑定到Channel
agent1.sources.r1.channels = c1
agent1.sinks.k1.channel = c1
agent1.sinks.k2.channel = c1

Agent2

创建 load_balance-agent2.conf,配置采集方案:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 命名此agent上的各组件
agent2.sources = r1
agent2.sinks = k1
agent2.channels = c1

# 描述和配置Source组件(avro)
agent2.sources.r1.type = avro
agent2.sources.r1.bind = node02
agent2.sources.r1.port = 7777

# 描述和配置Sink组件(logger)
agent2.sinks.k1.type = logger

# 描述和配置Channel组件(memory)
agent2.channels.c1.type = memory
# 该通道中最大的可以存储的event数量,默认为100
agent2.channels.c1.capacity = 1000
# 每次可以从Source中拿到或者送到Sink中的event的最大数量,默认为100
agent2.channels.c1.transactionCapacity = 100

# 将Source和Sink绑定到Channel
agent2.sources.r1.channels = c1
agent2.sinks.k1.channel = c1

Agent3

创建 load_balance-agent3.conf,配置采集方案:

/load_balance-agent3.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 命名此agent上的各组件
agent3.sources = r1
agent3.sinks = k1
agent3.channels = c1

# 描述和配置Source组件(avro)
agent3.sources.r1.type = avro
agent3.sources.r1.bind = node03
agent3.sources.r1.port = 7777

# 描述和配置Sink组件(logger)
agent3.sinks.k1.type = logger

# 描述和配置Channel组件(memory)
agent3.channels.c1.type = memory
# 该通道中最大的可以存储的event数量,默认为100
agent3.channels.c1.capacity = 1000
# 每次可以从Source中拿到或者送到Sink中的event的最大数量,默认为100
agent3.channels.c1.transactionCapacity = 100

# 将Source和Sink绑定到Channel
agent3.sources.r1.channels = c1
agent3.sinks.k1.channel = c1

启动Flume

先启动 node02 和 node03 上的 Flume,等待和 node01 建立连接,以接收数据:

/export/servers/apache-flume-1.6.0-cdh5.14.0-bin/
1
2
3
bin/flume-ng agent -n agent3 -c conf/ -f conf/load_balance-agent3.conf -Dflume.root.logger=DEBUG,console

bin/flume-ng agent -n agent2 -c conf/ -f conf/load_balance-agent2.conf -Dflume.root.logger=DEBUG,console

后启动 node01 上的 Flume,等待采集数据:

/export/servers/apache-flume-1.6.0-cdh5.14.0-bin/
1
bin/flume-ng agent -n agent1 -c conf/ -f conf/load_balance-agent1.conf -Dflume.root.logger=DEBUG,console

追加文件内容

在 node01 执行 Shell 脚本文件 tailtime.sh,用来往文件中追加内容:

/export/data/tailfile/tailtime.sh
1
2
3
4
5
6
#!/bin/bash
while true
do
date +%H:%M:%S >> /export/data/tailfile/access_log;
sleep 1;
done

观察 node02 和 node03 控制台的输出,可见 Agent2 和 Agent3 交替接收与处理 Agent1 采集来的数据:

Flume_load_balance_node02_logger

Flume_load_balance_node03_logger

拦截器

Flume 支持在运行时对 event 进行修改或丢弃,可以通过拦截器来实现。Flume 里面的拦截器是实现了 org.apache.flume.interceptor.Interceptor 接口的类。拦截器可以根据开发者的意图随意修改甚至丢弃 event。Flume 也支持链式的拦截器执行方式,在配置文件里面配置多个拦截器就可以了。拦截器的顺序取决于它们被初始化的顺序(实际也就是配置的顺序),event 就这样按照顺序经过每一个拦截器,如果想在拦截器里面丢弃 event, 在传递给下一级拦截器的 list 里面把它移除就行了。如果想丢弃所有的 event,返回一个空集合就行了。

需求

把 node01、node02 两台服务器中的日志文件 access.lognginx.logweb.log 采集汇总到 node03 并分类收集到 HDFS 中,要求在 HDFS 中的存储路径如下:

1
2
3
/flume/logs/access/20200220/events.1582189038642
/flume/logs/nginx/20200220/events.1582189039416
/flume/logs/web/20200220/events.1582189038810

分析

三个节点的角色分配如下:

名称 主机 角色
Agent1 node01 采集日志文件,将采集数据发送到 Agent3
Agent2 node02 采集日志文件,将采集数据发送到 Agent3
Agent3 node03 Collector,接收采集的数据,存入 HDFS

Agent1:位于 node01,Source 使用 exec,Sink 使用 avro 连接到 Agent3,Channel 使用 memory 。此外还配置静态拦截器(Static Interceptor),在 event header 中记录采集文件的类型。

Agent2:位于 node02,配置和 node01 上的一样 。

Agent3:位于 node03,Source 使用 avro,Sink 使用 hdfs,Channel 使用 memory

Static Interceptor

Static Interceptor,组件 type 为 static

  • Static Interceptor 可以向 event header 中写入一个固定的键值对属性。
  • 这个拦截器目前不支持写入多个属性,但是你可以通过配置多个 Static Interceptor 来实现。
属性 默认值 解释
type 组件类型,这个是: static
preserveExisting true 如果header中已经存在同名的属性是否保留
key key 写入header的key
value value 写入header的值

Agent1

创建 interceptors-agent1.conf,配置采集方案:

/export/servers/apache-flume-1.6.0-cdh5.14.0-bin/conf/interceptors-agent1.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# 命名此agent上的各组件
agent1.sources = r1 r2 r3
agent1.sinks = k1
agent1.channels = c1

# 描述和配置Source组件(exec)
# set sources1
agent1.sources.r1.type = exec
agent1.sources.r1.command = tail -F /export/data/logs/access.log
# 设置拦截器(static)
agent1.sources.r1.interceptors = i1
agent1.sources.r1.interceptors.i1.type = static
# 使用静态拦截器向 event header 中写入一个固定的key-value对
agent1.sources.r1.interceptors.i1.key = type
agent1.sources.r1.interceptors.i1.value = access
# set sources2
agent1.sources.r2.type = exec
agent1.sources.r2.command = tail -F /export/data/logs/nginx.log
agent1.sources.r2.interceptors = i2
agent1.sources.r2.interceptors.i2.type = static
agent1.sources.r2.interceptors.i2.key = type
agent1.sources.r2.interceptors.i2.value = nginx
# set sources3
agent1.sources.r3.type = exec
agent1.sources.r3.command = tail -F /export/data/logs/web.log
agent1.sources.r3.interceptors = i3
agent1.sources.r3.interceptors.i3.type = static
agent1.sources.r3.interceptors.i3.key = type
agent1.sources.r3.interceptors.i3.value = web

# 描述和配置sink组件(avro)
agent1.sinks.k1.type = avro
agent1.sinks.k1.hostname = node03
agent1.sinks.k1.port = 7777

# 描述和配置Channel组件(memory)
agent1.channels.c1.type = memory
# 该通道中最大的可以存储的event数量,默认为100
agent1.channels.c1.capacity = 2000
# 每次可以从Source中拿到或者送到Sink中的event的最大数量,默认为100
agent1.channels.c1.transactionCapacity = 1000

# 将Source和Sink绑定到Channel
agent1.sources.r1.channels = c1
agent1.sources.r2.channels = c1
agent1.sources.r3.channels = c1
agent1.sinks.k1.channel = c1

Agent2

创建 interceptors-agent2.conf,配置采集方案:

/export/servers/apache-flume-1.6.0-cdh5.14.0-bin/conf/interceptors-agent2.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# 命名此agent上的各组件
agent2.sources = r1 r2 r3
agent2.sinks = k1
agent2.channels = c1

# 描述和配置Source组件(exec)
# set sources1
agent2.sources.r1.type = exec
agent2.sources.r1.command = tail -F /export/data/logs/access.log
# 设置拦截器(static)
agent2.sources.r1.interceptors = i1
agent2.sources.r1.interceptors.i1.type = static
# 使用静态拦截器向 event header 中写入一个固定的key-value对
agent2.sources.r1.interceptors.i1.key = type
agent2.sources.r1.interceptors.i1.value = access
# set sources2
agent2.sources.r2.type = exec
agent2.sources.r2.command = tail -F /export/data/logs/nginx.log
agent2.sources.r2.interceptors = i2
agent2.sources.r2.interceptors.i2.type = static
agent2.sources.r2.interceptors.i2.key = type
agent2.sources.r2.interceptors.i2.value = nginx
# set sources3
agent2.sources.r3.type = exec
agent2.sources.r3.command = tail -F /export/data/logs/web.log
agent2.sources.r3.interceptors = i3
agent2.sources.r3.interceptors.i3.type = static
agent2.sources.r3.interceptors.i3.key = type
agent2.sources.r3.interceptors.i3.value = web

# 描述和配置sink组件(avro)
agent2.sinks.k1.type = avro
agent2.sinks.k1.hostname = node03
agent2.sinks.k1.port = 7777

# 描述和配置Channel组件(memory)
agent2.channels.c1.type = memory
# 该通道中最大的可以存储的event数量,默认为100
agent2.channels.c1.capacity = 20000
# 每次可以从Source中拿到或者送到Sink中的event的最大数量,默认为100
agent2.channels.c1.transactionCapacity = 10000

# 将Source和Sink绑定到Channel
agent2.sources.r1.channels = c1
agent2.sources.r2.channels = c1
agent2.sources.r3.channels = c1
agent2.sinks.k1.channel = c1

Agent3

创建 interceptors-agent3.conf,配置采集方案:

/export/servers/apache-flume-1.6.0-cdh5.14.0-bin/conf/interceptors-agent3.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# 命名此agent上的各组件
agent3.sources = r1
agent3.sinks = k1
agent3.channels = c1

# 描述和配置Source组件(avro)
agent3.sources.r1.type = avro
agent3.sources.r1.bind = node03
agent3.sources.r1.port = 7777

# 设置拦截器(timestamp),向每个event的header中添加一个时间戳属性进去
#agent3.sources.r1.interceptors = i1
#agent3.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
# agent3.sources.r1.interceptors.i1.type = timestamp

# 描述和配置sink组件(hdfs)
agent3.sinks.k1.type = hdfs
# 路径支持转义字符串
agent3.sinks.k1.hdfs.path = hdfs://node01:8020/flume/logs/%{type}/%Y%m%d
# Flume写入hdfs时在文件名前加入的前缀,默认为FlumeData
agent3.sinks.k1.hdfs.filePrefix = events
# 生成的文件类型,默认为Sequencefile,可选DataStream(不支持压缩)和CompressedStream(支持压缩,需设置正确的hdfs.codeC)
agent3.sinks.k1.hdfs.fileType = DataStream
# 文件写入格式,默认为Writable,可选Text
agent3.sinks.k1.hdfs.writeFormat = Text
# 替换转义字符串时,使用本地时间(而不是event header中的时间戳),默认为false
agent3.sinks.k1.hdfs.useLocalTimeStamp = true
# 间隔多长时间触发滚动(临时文件转为最终目标文件,并创建新文件),默认为30(0表示不根据时间间隔滚动)
agent3.sinks.k1.hdfs.rollInterval = 30
# 触发滚动的文件大小,以字节为单位,默认为1024(0表示不根据文件大小滚动)
agent3.sinks.k1.hdfs.rollSize = 10485760
# 触发滚动的event数量,默认为10(0表示不根据event数量滚动)
agent3.sinks.k1.hdfs.rollCount = 0
# 每个批次刷新到HDFS上的文件中的event数量,默认为100
agent3.sinks.k1.hdfs.batchSize = 10000
# flume操作hdfs的线程数(包括新建,写入等)
# 每个HDFS Sink实例执行HDFS IO操作(open、write等)时开启的线程数
agent3.sinks.k1.hdfs.threadsPoolSize=10
# 允许HDFS操作的时间(毫秒),比如:open、write、flush、close。如果HDFS操作超时的次数较多,应该适当调高这个这个值
agent3.sinks.k1.hdfs.callTimeout=30000

# 描述和配置Channel组件(memory)
agent3.channels.c1.type = memory
agent3.channels.c1.capacity = 20000
agent3.channels.c1.transactionCapacity = 10000

# 将Source和Sink绑定到Channel
agent3.sources.r1.channels = c1
agent3.sinks.k1.channel = c1

启动Flume

先启动 node03 上的 Flume,等待和 node01、node02 建立连接,以接收数据:

/export/servers/apache-flume-1.6.0-cdh5.14.0-bin/
1
bin/flume-ng agent -n agent3 -c conf -f conf/interceptors-agent3.conf -Dflume.root.logger=DEBUG,console

后启动 node01 上的 Flume,等待采集数据:

/export/servers/apache-flume-1.6.0-cdh5.14.0-bin/
1
2
3
bin/flume-ng agent -n agent1 -c conf -f conf/interceptors-agent1.conf -Dflume.root.logger=DEBUG,console

bin/flume-ng agent -n agent2 -c conf -f conf/interceptors-agent2.conf -Dflume.root.logger=DEBUG,console

追加文件内容

在 node01 和 node02 执行 Shell 脚本文件 tailtimes.sh,用来往文件中追加内容:

/export/data/tailfile/tailtimes.sh
1
2
3
4
5
6
7
8
#!/bin/bash
while true
do
date +"$HOSTNAME access "%H:%M:%S >> /export/data/logs/access.log;
date +"$HOSTNAME web "%H:%M:%S >> /export/data/logs/web.log;
date +"$HOSTNAME nginx "%H:%M:%S >> /export/data/logs/nginx.log;
sleep 1;
done

打印 HDFS 中的文件列表,发现根据 header 中的信息在 HDFS 生成了 accessnginxweb 三个目录;查看文件内容,发现来自不同服务器的同类数据成功汇聚进同一个文件:

Flume_interceptor_cat

自定义拦截器

需求

在数据采集之后,通过 Flume 的拦截器,过滤掉不需要的数据,并将指定的第一个字段进行 MD5 加密,加密之后保存 到 HDFS上面。

分析

创建工程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.6.0-cdh5.14.0</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.2</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*/RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

自定义拦截器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
package moe.sannaha.iterceptor;

import com.google.common.base.Charsets;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static moe.sannaha.iterceptor.CustomParameterInterceptor.Constants.*;

/*
执行顺序:
1.public static class Builder implements Interceptor.Builder会根据配置(spool-interceptor-hdfs.conf)对:
fields_separator:指明每一行字段的分隔符
indexs:通过分隔符分割后,指明需要那列的字段 下标
indexs_separator:多个下标下标的分隔符
field_index:需要加密的字段下标
进行赋值
2.再通过new CustomParameterInterceptor(fields_separator, indexs, indexs_separator,encrypted_field_index);的方式,
给CustomParameterInterceptor的这四个属性赋值。
3.CustomParameterInterceptor的intercept()方法被调用
4.该方法内先从event封装对象中获取一行数据,之后对该行数据中需要被加密的字段的数据进行加密处理,之后再封装回event封装对象中
*/
public class CustomParameterInterceptor implements Interceptor {
/** The field_separator.指明每一行字段的分隔符 */
private final String fields_separator;

/** The indexs.通过分隔符分割后,指明需要那列的字段 下标*/
private final String indexs;

/** The indexs_separator. 多个下标的分隔符*/
private final String indexs_separator;

/**
*
* @param indexs
* @param indexs_separator
*/
public CustomParameterInterceptor( String fields_separator,
String indexs, String indexs_separator,String encrypted_field_index) {
String f = fields_separator.trim();
String i = indexs_separator.trim();
this.indexs = indexs;
this.encrypted_field_index=encrypted_field_index.trim();
if (!f.equals("")) {
f = UnicodeToString(f);
}
this.fields_separator =f;
if (!i.equals("")) {
i = UnicodeToString(i);
}
this.indexs_separator = i;
}

/*
*
* \t 制表符 ('\u0009') \n 新行(换行)符 (' ') \r 回车符 (' ') \f 换页符 ('\u000C') \a 报警
* (bell) 符 ('\u0007') \e 转义符 ('\u001B') \cx 空格(\u0020)对应于 x 的控制符
*
* @param str
* @return
* @data:2015-6-30
*/

/** The encrypted_field_index. 需要加密的字段下标*/
private final String encrypted_field_index;
public static String UnicodeToString(String str) {
Pattern pattern = Pattern.compile("(\\\\u(\\p{XDigit}{4}))");
Matcher matcher = pattern.matcher(str);
char ch;
while (matcher.find()) {
ch = (char) Integer.parseInt(matcher.group(2), 16);
str = str.replace(matcher.group(1), ch + "");
}
return str;
}

/*
* @see org.apache.flume.interceptor.Interceptor#intercept(org.apache.flume.Event)
* 单个event拦截逻辑
*/
public Event intercept(Event event) {
if (event == null) {
return null;
}
try {
//从event这个封装对象中获取一行数据
String line = new String(event.getBody(), Charsets.UTF_8);
String[] fields_spilts = line.split(fields_separator);
//indexs也是从builder构造传入的,表示我们需要获取哪些下标的字段数据
String[] indexs_split = indexs.split(indexs_separator);
String newLine="";
for (int i = 0; i < indexs_split.length; i++) {
int parseInt = Integer.parseInt(indexs_split[i]);
//对加密字段进行加密
if(!"".equals(encrypted_field_index)&&encrypted_field_index.equals(indexs_split[i])){
newLine+=StringUtils.GetMD5Code(fields_spilts[parseInt]);
}else{
newLine+=fields_spilts[parseInt];
}

if(i!=indexs_split.length-1){
newLine+=fields_separator;
}
}
event.setBody(newLine.getBytes(Charsets.UTF_8));
return event;
} catch (Exception e) {
return event;
}
}

/*
* @see org.apache.flume.interceptor.Interceptor#intercept(java.util.List)
* 批量event拦截逻辑
*/
public List<Event> intercept(List<Event> events) {
List<Event> out = new ArrayList<Event>();
for (Event event : events) {
Event outEvent = intercept(event);
if (outEvent != null) {
out.add(outEvent);
}
}
return out;
}

/*
* @see org.apache.flume.interceptor.Interceptor#initialize()
*/
public void initialize() {
// TODO Auto-generated method stub

}

/*
* @see org.apache.flume.interceptor.Interceptor#close()
*/
public void close() {
// TODO Auto-generated method stub

}


/**
* 相当于自定义Interceptor的工厂类
* 在flume采集配置文件中通过制定该Builder来创建Interceptor对象
* 可以在Builder中获取、解析flume采集配置文件中的拦截器Interceptor的自定义参数:
* 字段分隔符,字段下标,下标分隔符、加密字段下标 ...等
* @author
*
*/
public static class Builder implements Interceptor.Builder {

/** The fields_separator.指明每一行字段的分隔符 */
private String fields_separator;

/** The indexs.通过分隔符分割后,指明需要那列的字段 下标*/
private String indexs;

/** The indexs_separator. 多个下标下标的分隔符*/
private String indexs_separator;

/** The encrypted_field. 需要加密的字段下标*/
private String encrypted_field_index;
/*
* @see org.apache.flume.conf.Configurable#configure(org.apache.flume.Context)
*/
public void configure(Context context) {
fields_separator = context.getString(FIELD_SEPARATOR, DEFAULT_FIELD_SEPARATOR);
indexs = context.getString(INDEXS, DEFAULT_INDEXS);
indexs_separator = context.getString(INDEXS_SEPARATOR, DEFAULT_INDEXS_SEPARATOR);
encrypted_field_index= context.getString(ENCRYPTED_FIELD_INDEX, DEFAULT_ENCRYPTED_FIELD_INDEX);
}
/*
* @see org.apache.flume.interceptor.Interceptor.Builder#build()
*/
public Interceptor build() {
return new CustomParameterInterceptor(fields_separator, indexs, indexs_separator,encrypted_field_index);
}
}
/**
* 常量
*/
public static class Constants {
/** The Constant FIELD_SEPARATOR. */
public static final String FIELD_SEPARATOR = "fields_separator";

/** The Constant DEFAULT_FIELD_SEPARATOR. */
public static final String DEFAULT_FIELD_SEPARATOR =" ";

/** The Constant INDEXS. */
public static final String INDEXS = "indexs";

/** The Constant DEFAULT_INDEXS. */
public static final String DEFAULT_INDEXS = "0";

/** The Constant INDEXS_SEPARATOR. */
public static final String INDEXS_SEPARATOR = "indexs_separator";

/** The Constant DEFAULT_INDEXS_SEPARATOR. */
public static final String DEFAULT_INDEXS_SEPARATOR = ",";

/** The Constant ENCRYPTED_FIELD_INDEX. */
public static final String ENCRYPTED_FIELD_INDEX = "encrypted_field_index";

/** The Constant DEFAUL_TENCRYPTED_FIELD_INDEX. */
public static final String DEFAULT_ENCRYPTED_FIELD_INDEX = "";

/** The Constant PROCESSTIME. */
public static final String PROCESSTIME = "processTime";
/** The Constant PROCESSTIME. */
public static final String DEFAULT_PROCESSTIME = "a";

}
/**
* 工具类:字符串md5加密
*/
public static class StringUtils {
// 全局数组
private final static String[] strDigits = { "0", "1", "2", "3", "4", "5",
"6", "7", "8", "9", "a", "b", "c", "d", "e", "f" };
// 返回形式为数字跟字符串
private static String byteToArrayString(byte bByte) {
int iRet = bByte;
// System.out.println("iRet="+iRet);
if (iRet < 0) {
iRet += 256;
}
int iD1 = iRet / 16;
int iD2 = iRet % 16;
return strDigits[iD1] + strDigits[iD2];
}

// 返回形式只为数字
private static String byteToNum(byte bByte) {
int iRet = bByte;
System.out.println("iRet1=" + iRet);
if (iRet < 0) {
iRet += 256;
}
return String.valueOf(iRet);
}

// 转换字节数组为16进制字串
private static String byteToString(byte[] bByte) {
StringBuffer sBuffer = new StringBuffer();
for (int i = 0; i < bByte.length; i++) {
sBuffer.append(byteToArrayString(bByte[i]));
}
return sBuffer.toString();
}

public static String GetMD5Code(String strObj) {
String resultString = null;
try {
resultString = new String(strObj);
MessageDigest md = MessageDigest.getInstance("MD5");
// md.digest() 该函数返回值为存放哈希值结果的byte数组
resultString = byteToString(md.digest(strObj.getBytes()));
} catch (NoSuchAlgorithmException ex) {
ex.printStackTrace();
}
return resultString;
}
}
}

打包

将自定义拦截器打包得到 flume_interceptor.jar 并放到flume的 lib 目录下。

配置采集方案

创建 custom-interceptor.conf,配置采集方案:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
a1.channels = c1
a1.sources = r1
a1.sinks = s1
#channel
a1.channels.c1.type = memory
a1.channels.c1.capacity=100000
a1.channels.c1.transactionCapacity=50000
#source
a1.sources.r1.channels = c1
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /export/data/intercept
a1.sources.r1.batchSize= 50
a1.sources.r1.inputCharset = UTF-8

a1.sources.r1.interceptors =i1 i2
a1.sources.r1.interceptors.i1.type = moe.sannaha.iterceptor.CustomParameterInterceptor$Builder
a1.sources.r1.interceptors.i1.fields_separator=\\u0009
a1.sources.r1.interceptors.i1.indexs =0,1,3,5,6
a1.sources.r1.interceptors.i1.indexs_separator =\\u002c
a1.sources.r1.interceptors.i1.encrypted_field_index =0
a1.sources.r1.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
#sink
a1.sinks.s1.channel = c1
a1.sinks.s1.type = hdfs
a1.sinks.s1.hdfs.path =hdfs://192.168.83.110:8020/flume/intercept/%Y%m%d
a1.sinks.s1.hdfs.filePrefix = event
a1.sinks.s1.hdfs.fileSuffix = .log
a1.sinks.s1.hdfs.rollSize = 10485760
a1.sinks.s1.hdfs.rollInterval =20
a1.sinks.s1.hdfs.rollCount = 0
a1.sinks.s1.hdfs.batchSize = 1500
a1.sinks.s1.hdfs.round = true
a1.sinks.s1.hdfs.roundUnit = minute
a1.sinks.s1.hdfs.threadsPoolSize = 25
a1.sinks.s1.hdfs.useLocalTimeStamp = true
a1.sinks.s1.hdfs.minBlockReplicas = 1
a1.sinks.s1.hdfs.fileType =DataStream
a1.sinks.s1.hdfs.writeFormat = Text
a1.sinks.s1.hdfs.callTimeout = 60000
a1.sinks.s1.hdfs.idleTimeout =60a1.channels = c1
a1.sources = r1
a1.sinks = s1
#channel
a1.channels.c1.type = memory
a1.channels.c1.capacity=100000
a1.channels.c1.transactionCapacity=50000
#source
a1.sources.r1.channels = c1
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /export/servers/intercept
a1.sources.r1.batchSize= 50
a1.sources.r1.inputCharset = UTF-8

a1.sources.r1.interceptors =i1 i2
a1.sources.r1.interceptors.i1.type =moe.sannaha.iterceptor.CustomParameterInterceptor$Builder
a1.sources.r1.interceptors.i1.fields_separator=\\u0009
a1.sources.r1.interceptors.i1.indexs =0,1,3,5,6
a1.sources.r1.interceptors.i1.indexs_separator =\\u002c
a1.sources.r1.interceptors.i1.encrypted_field_index =0
a1.sources.r1.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
#sink
a1.sinks.s1.channel = c1
a1.sinks.s1.type = hdfs
a1.sinks.s1.hdfs.path =hdfs://192.168.83.110:8020/flume/intercept/%Y%m%d
a1.sinks.s1.hdfs.filePrefix = event
a1.sinks.s1.hdfs.fileSuffix = .log
a1.sinks.s1.hdfs.rollSize = 10485760
a1.sinks.s1.hdfs.rollInterval =20
a1.sinks.s1.hdfs.rollCount = 0
a1.sinks.s1.hdfs.batchSize = 1500
a1.sinks.s1.hdfs.round = true
a1.sinks.s1.hdfs.roundUnit = minute
a1.sinks.s1.hdfs.threadsPoolSize = 25
a1.sinks.s1.hdfs.useLocalTimeStamp = true
a1.sinks.s1.hdfs.minBlockReplicas = 1
a1.sinks.s1.hdfs.fileType =DataStream
a1.sinks.s1.hdfs.writeFormat = Text
a1.sinks.s1.hdfs.callTimeout = 60000
a1.sinks.s1.hdfs.idleTimeout =60

启动Flume

1
bin/flume-ng agent -c -n a1 conf -f conf/spool-interceptor-hdfs.conf -Dflume.root.logger=DEBUG,console

移动文件到目录

新建测试数据文件,移动到 /export/data/intercept/ 目录下:

/export/data/intercept/data.txt
1
2
3
4
5
6
7
8
9
10
11
12
13601249301	100	200	300	400	500	600	700
13601249302 100 200 300 400 500 600 700
13601249303 100 200 300 400 500 600 700
13601249304 100 200 300 400 500 600 700
13601249305 100 200 300 400 500 600 700
13601249306 100 200 300 400 500 600 700
13601249307 100 200 300 400 500 600 700
13601249308 100 200 300 400 500 600 700
13601249309 100 200 300 400 500 600 700
13601249310 100 200 300 400 500 600 700
13601249311 100 200 300 400 500 600 700
13601249312 100 200 300 400 500 600 700

Flume 1.6.0 User Guide
Flume 1.8用户手册中文版
Using Flume by Hari Shreedharan - Channels
Flume NG:Flume 发展史上的第一次革命

  • 本文作者: SANNAHA
  • 本文链接: https://sannaha.moe/Flume/
  • 版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!