日志采集框架 Flume 的安装使用。
概述
Cloudera 开发的分布式日志收集系统 Flume,是 Hadoop 周边组件之一。其可以实时的将分布在不同节点、机器上的日志收集到 HDFS 中。Flume 初始的发行版本目前被统称为 Flume OG(original generation),属于 Cloudera。但随着 FLume 功能的扩展,Flume OG 代码工程臃肿、核心组件设计不合理、核心配置不标准等缺点暴露出来,尤其是在 Flume OG 的最后一个发行版本 0.94.0 中,日志传输不稳定的现象尤为严重。为了解决这些问题,2011 年 10 月 22 号,Cloudera 完成了 Flume-728,对 Flume 进行了里程碑式的改动:重构核心组件、核心配置以及代码架构,重构后的版本统称为 Flume NG(next generation);改动的另一原因是 Flume 被纳入 Apache 旗下,Cloudera Flume 改名为 Apache Flume。
- Flume 是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统,可以适用于大部分的日常数据采集场景。
- Flume 可以采集单个文件、目录下的文件、Socket 数据包、Kafka 等各种形式源数据,又可以将采集到的数据下沉(Sink)到HDFS、HBase、Hive、Kafka 等众多外部存储系统中。
- 一般的采集需求,通过对 Flume 的简单配置即可实现。
- Flume 针对特殊场景也具备良好的自定义扩展能力。
核心组件变化
FLUM OG
FLUM OG 的特点是:
- FLUM OG 有三种角色的节点,如上图:代理节点(Agent)、收集节点(Collector)、主节点(Master)。
- Agent 从各个数据源收集日志数据,将收集到的数据集中到 Collector,然后由收集节点汇总存入 HDFS。master 负责管理 agent,collector 的活动。
- Agent、Collector 都称为 node,node 的角色根据配置的不同分为 logical node(逻辑节点)、physical node(物理节点)。对 logical nodes 和 physical nodes 的区分、配置、使用一直以来都是使用者最头疼的地方。
- Agent、Collector 由 Source、Sink 组成,代表在当前节点数据是从 Source 传送到 Sink。如下图。
FLUM NG
对应于 OG 的特点,FLUM NG 的特点是:
- NG 只有一种角色的节点:代理节点(Agent)。
- 没有 Collector、Master 节点。这是核心组件最核心的变化。
- 去除了 physical nodes、logical nodes 的概念和相关内容。
- Agent 节点的组成也发生了变化。如下图,NG Agent 由 Source、Sink、Channel 组成。
运行机制
Flume 分布式系统中最核心的角色是 agent,Flume 采集系统由一个个 agent 所连接起来形成,每一个 agent 相当于一个数据传递员。Agent 内部有三个组件:
- Source:采集组件,用于跟数据源对接,以获取数据。
- Sink:下沉组件,用于往下一级 agent 传递数据或者往最终存储系统传递数据。
- Channel:传输通道组件,用于从 Source 将数据传递到 Sink。
Flume 传输数据的基本单元是 event。Event 由 header 头信息和载有日志数据的字节数组(body)构成:
- Header 是容纳了 key-value 字符串对的无序集合,key 在集合里是唯一的。通过 header 可以在上下文路由进行扩展。
- Body 中载有的数据对 Flume 来说是不透明的。
安装测试
安装Flume
在 node03 解压安装 Flume:
1 | tar -zxvf flume-ng-1.6.0-cdh5.14.0.tar.gz -C /export/servers/ |
测试需求
作为 Flume 的测试方案,node02 使用 Telent 向 node03 发送一些网络数据,node03 通过 Flume 采集网络数据,并将日志打印在控制台。
分析
Source 使用 netcat
,Sink 使用 logger
,Channel 可用 memory
。
NetCat Source
NetCat Source,组件 type 为 netcat
:
- 监听给定的端口,并将每一行文本转换为一个 event。行为类似于
nc -k -l [host] [port]
。 - 换句话说,它打开指定的端口并监听数据。期望提供的数据是换行符分隔的文本。每行文本都将变成 Flume event,并通过连接的 Channel 发送。
属性 | 默认值 | 解释 |
---|---|---|
channels | – | |
type | – | 组件类型,这个是: netcat |
bind | – | 要监听的 hostname 或 IP 地址 |
port | – | 监听的端口 |
max-line-length | 512 | 解析成 event body 的每行数据的最大字节数 |
ack-every-event | true | 对收到的每个 event 都响应 OK |
selector.type | replicating | 可选值:replicating 或 multiplexing ,分别表示: 复制、多路复用 |
selector.* | Channel 选择器的相关属性,取决于selector.type 的值 |
|
interceptors | – | 该 Source 所使用的拦截器,多个用空格分开 |
interceptors.* | 拦截器相关的属性配置 |
Logger Sink
Logger Sink,组件 type 为 logger
:
- 在 INFO 级别记录 event,通常用于测试、调试。
- 该 Sink 是唯一一个不需要额外配置就能输出 event 原始内容的 Sink。参照 输出原始数据到日志 。
属性 | 默认值 | 解释 |
---|---|---|
channel | – | |
type | – | 组件类型,这个是: logger |
maxBytesToLog | 16 | event body 输出到日志的最大字节数,超出的部分会被丢弃 |
配置测试方案
创建 netcat.conf
,配置采集方案:
1 | # 命名此agent上的各组件 |
启动Flume
有了这个配置文件,可以启动 Flume,等待接收消息:
1 | bin/flume-ng agent -c conf -f conf/netcat.conf -n a1 -Dflume.root.logger=INFO,console |
-c <conf-dir>
:<conf-dir>
目录需要包括一个 Shell 脚本flume-env.sh
和一个 log4j 配置文件log4j.properties
。-f <conf-file>
:指定采集方案。-n <agent>
:指定 agent。-Dflume.root.logger=INFO,console
:命令中传递了这个 Java 参数,用来在没有自定义环境脚本下强制 Flume 将日志信息显示在控制台。
启动Telnet
在 node02 上使用 Telnet 发送数据:
1 | # 使用Telnet模拟数据发送 |
发送数据:
查看控制台输出:
采集目录到HDFS
需求
某服务器的特定目录下,每当有新文件出现,就把文件采集到HDFS中去。
分析
Source 使用 spooldir
,Sink 使用 hdfs
,Channel 可用 memory
或 file
。
Spooling Directory Source
Spooling Directory Source,组件 type 为 spooldir
:
- 监视目录中出现的新文件,采集新文件中的内容。
- 采集过的文件,会被重命名(默认添加一个“.COMPLETED”后缀)以标示完成。
- Spooling Directory Source 是可靠的,即使 Flume 重新启动或终止也不会丢失数据。为了获取这种可靠性,放入监视目录中的文件名必须唯一。
- Flume 会检测以下问题,如果出现,将会在日志文件中打印错误并停止处理:
- 对放入监视目录的文件进行写入操作。
- 放入监视目录中的文件名不唯一。
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | 组件类型,这个是: spooldir |
spoolDir | – | Flume Source监视的目录,该目录下的文件会被 Flume 采集 |
fileSuffix | .COMPLETED | 被 Flume 采集完成的文件被重命名的后缀。1.txt 被Flume采集完成后会重命名为1.txt.COMPLETED |
deletePolicy | never | 是否删除已完成采集的文件,可选值:never 和 immediate |
fileHeader | false | 是否添加文件的绝对路径名(绝对路径+文件名)到 header 中。 |
fileHeaderKey | file | 添加绝对路径名到 header 里面所使用的 key(配合上面的fileHeader 一起使用) |
basenameHeader | false | 是否添加文件名(只是文件名,不包括路径)到 header 中 |
basenameHeaderKey | basename | 添加文件名到 header 里面所使用的 key(配合上面的basenameHeader 一起使用) |
includePattern | ^.*$ | 指定会被采集的文件名的正则表达式,跟下面的 ignorePattern 可以一起使用。如果一个文件名同时被这两个正则匹配到,则会被忽略,换句话说 ignorePattern 的优先级更高 |
ignorePattern | ^$ | 指定要忽略的文件名称正则表达式。它可以跟 includePattern 一起使用,如果一个文件被 ignorePattern 和 includePattern 两个正则都匹配到,这个文件会被忽略。 |
trackerDir | .flumespool | 用于存储与文件处理有关的元数据的目录。如果此路径不是绝对路径,则将其解释为相对于 spoolDir 的相对路径。 |
consumeOrder | oldest | 设定监视目录内文件的采集顺序。默认是 oldest (也就是修改时间最早的文件最先被采集,如果修改时间相同,采集词典顺序最小的文件),可选值有: oldest 、 youngest 和 random 。当使用 oldest 或 youngest 的时候,Flume 会扫描整个目录以选择“最旧”/“最新”的文件,如果目录中有大量文件,这可能会很慢。当使用 random 时,如果一直有新文件产生,老文件可能要过很久才会被收集 |
pollDelay | 500 | 轮询新文件时使用的延迟,单位:毫秒 |
recursiveDirectorySearch | false | 是否收集子目录下的新文件 |
maxBackoff | 4000 | 如果 Channel 已满,则在两次尝试写入 Channel 之间最长的等待时间(退避时间),单位:毫秒。Source 会从较低的退避时间开始,并在每次 Channel 抛出 ChannelException 后以指数方式增加这个值,直到达到 maxBackoff 指定的值。 |
batchSize | 100 | 批量转移到 Channel 的粒度 |
inputCharset | UTF-8 | 解析器读取文件时使用的编码(解析器会把所有文件当做文本读取) |
decodeErrorPolicy | FAIL |
当从文件读取时遇到不可解析的字符时如何处理。 FAIL :抛出异常,解析文件失败; REPLACE :替换掉这些无法解析的字符,通常是用 Unicode U+FFFD ; IGNORE :删除无法解析的字符。 |
deserializer | LINE |
指定用于将文件解析成 event 的解析器。默认将每行解析为一个 event。所有解析器必须实现 EventDeserializer.Builder 接口 |
deserializer.* | 解析器的相关属性,因 event 解析器而异 | |
bufferMaxLines | – | (Obselete) 现在忽略此选项 |
bufferMaxLineLength | 5000 | (Deprecated) 提交缓冲区中一行的最大长度。请改用deserializer.maxLineLength |
selector.type | replicating | 可选值:replicating 或 multiplexing ,分别表示: 复制、多路复用 |
selector.* | Channel 选择器的相关属性,取决于selector.type 的值 |
|
interceptors | – | 该 Source 所使用的拦截器,多个用空格分开 |
interceptors.* | 拦截器相关的属性配置 |
HDFS Sink
HDFS Sink,组件 type 为 hdfs
:
- 支持创建文本和序列文件,以及压缩这两种类型的文件。
- 可以根据经过的时间、文件大小或 event 数量定期滚动文件。滚动(roll)是指将临时文件以重命名的方式生成最终目标文件,会关闭当前文件并创建一个新的临时文件来存储新数据。
- 它还根据时间戳或产生 event 的机器之类的属性对数据进行分桶/分区。
- HDFS 目录路径可能包含转义字符串(如
%{host}
/%H
/%M
),这些转义字符串将由 HDFS Sink 自动替换,生成对应的目录或文件名。
Alias | Description |
---|---|
%{host} | 替换 event header 中 key 为 host 对应的 value。这个 host 可以是任意的,比如 %{aabc} 将读取 header 中 key 为 aabc 对应的 value |
%t | Unix 时间戳(精度:毫秒) |
%a | locale’s short weekday name (Mon, Tue, …) |
%A | locale’s full weekday name (Monday, Tuesday, …) |
%b | locale’s short month name (Jan, Feb, …) |
%B | locale’s long month name (January, February, …) |
%c | locale’s date and time (Thu Mar 3 23:05:25 2005) |
%d | 月份中的天(00到31) |
%e | 月份中的天(1到31) |
%D | 日期,与 %m/%d/%y 相同 ,例如:02/09/19 |
%H | 小时(00到23) |
%I | 小时(01到12) |
%j | 年中的天数(001到366) |
%k | 小时(0到23),注意跟 %H的区别 |
%m | 月份(01到12) |
%n | 月份(1到12,不填充0) |
%M | 分钟(00到59) |
%p | locale’s equivalent of am or pm |
%s | seconds since 1970-01-01 00:00:00 UTC |
%S | 秒(00到59) |
%y | 一年中的最后两位数(00到99),比如1998年的 %y 就是98 |
%Y | 年(2010) |
%z | 数字时区(比如:-0400) |
%[localhost] | Agent 实例所在主机的 hostname |
%[IP] | Agent实例所在主机的 IP |
%[FQDN] | Agent 实例所在主机的完全限定域名 |
注意:%[localhost]
、%[IP]
和 %[FQDN]
这三个转义符实际上都是用 Java 的 API 来获取的,在一些网络环境下可能会获取失败。
属性名 | 默认值 | 解释 |
---|---|---|
channel | – | |
type | – | 组件类型,这个是: hdfs |
hdfs.path | – | HDFS目录路径(例如:hdfs://namenode/flume/webdata/) |
hdfs.filePrefix | FlumeData | Flume在HDFS文件夹下创建新文件的固定前缀 |
hdfs.fileSuffix | – | Flume在HDFS文件夹下创建新文件的后缀(比如:.avro ,注意这个“.”不会自动添加,需要显式配置) |
hdfs.inUsePrefix | – | Flume正在写入的临时文件前缀,默认没有 |
hdfs.inUseSuffix | .tmp |
Flume正在写入的临时文件后缀 |
hdfs.rollInterval | 30 | 当前文件写入达到该值时间后触发滚动创建新文件(0表示不按照时间来分割文件),单位:秒 |
hdfs.rollSize | 1024 | 当前文件写入达到该大小后触发滚动创建新文件(0表示不根据文件大小来分割文件),单位:字节 |
hdfs.rollCount | 10 | 当前文件写入Event达到该数量后触发滚动创建新文件(0表示不根据 Event 数量来分割文件) |
hdfs.idleTimeout | 0 | 关闭非活动文件的超时时间(0表示禁用自动关闭文件),单位:秒 |
hdfs.batchSize | 100 | 向 HDFS 写入内容时每次批量操作的 Event 数量 |
hdfs.codeC | – | 压缩算法。可选值:gzip 、 bzip2 、 lzo 、 lzop 、 ``snappy` |
hdfs.fileType | SequenceFile | 文件格式,目前支持: SequenceFile 、 DataStream 和 CompressedStream 。 1. DataStream 不会压缩文件,不能设置hdfs.codeC 2. CompressedStream 必须设置可用的hdfs.codeC 参数 |
hdfs.maxOpenFiles | 5000 | 允许打开的最大文件数,如果超过这个数量,最先打开的文件会被关闭 |
hdfs.minBlockReplicas | – | 指定每个HDFS块的最小副本数。 如果未指定,则使用 classpath 中 Hadoop 的默认配置。 |
hdfs.writeFormat | Writable | 文件写入格式。可选值: Text 、 Writable 。在使用 Flume 创建数据文件之前设置为 Text ,否则 Impala 或 Hive 无法读取这些文件。 |
hdfs.callTimeout | 10000 | 允许HDFS操作文件的时间(单位:毫秒),比如:open、write、flush、close。如果HDFS操作超时次数增加,应该适当调高这个这个值。 |
hdfs.threadsPoolSize | 10 | 每个HDFS Sink实例操作HDFS IO时开启的线程数(open、write 等) |
hdfs.rollTimerPoolSize | 1 | 每个HDFS Sink实例调度定时文件滚动的线程数 |
hdfs.kerberosPrincipal | – | 用于安全访问 HDFS 的 Kerberos 用户主体 |
hdfs.kerberosKeytab | – | 用于安全访问 HDFS 的 Kerberos keytab 文件 |
hdfs.proxyUser | 代理名 | |
hdfs.round | false | 是否应将时间戳向下舍入(如果为true,则影响除 %t 之外的所有基于时间的转义符) |
hdfs.roundValue | 1 | 向下舍入到小于当前时间的最高倍数,默认为1,范围为1~60(使用hdfs.roundUnit 配置的单位)。例子:假设当前时间是18:32:01,hdfs.roundUnit = minute ,如果 roundValue = 5,则时间戳会取为 18:30;如果 roundValue =7,则时间戳会取为 18:28;如果 roundValue =10,则时间戳会取为:18:30。 |
hdfs.roundUnit | second | 向下舍入时的单位,可选值: second 、 minute 、 hour |
hdfs.timeZone | Local Time | 解析存储目录路径时候所使用的时区名,例如:America/Los_Angeles、Asia/Shanghai |
hdfs.useLocalTimeStamp | false | 使用日期时间转义符时是否使用本地时间戳(而不是使用 event header 中自带的时间戳) |
hdfs.closeTries | 0 | 开始尝试关闭文件时最大的重命名文件的尝试次数(因为打开的文件通常都有个.tmp的后缀,写入结束关闭文件时要重命名把后缀去掉)。 如果设置为1,Sink在重命名失败(可能是因为 NameNode 或 DataNode 发生错误)后不会重试,这样就导致了这个文件会一直保持为打开状态,并且带着.tmp的后缀; 如果设置为0,Sink会一直尝试重命名文件直到成功为止; 关闭文件操作失败时这个文件可能仍然是打开状态,这种情况数据还是完整的不会丢失,只有在Flume重启后文件才会关闭。 |
hdfs.retryInterval | 180 | 连续尝试关闭文件的时间间隔(秒)。 每次关闭操作都会调用多次 RPC 往返于 Namenode ,因此将此设置得太低会导致 Namenode 上产生大量负载。 如果设置为0或更小,则如果第一次尝试失败,将不会再尝试关闭文件,并且可能导致文件保持打开状态或扩展名为“.tmp”。 |
serializer | TEXT |
Event 转为文件使用的序列化器。其他可选值有: avro_event 或其他 EventSerializer.Builderinterface 接口的实现类的全限定类名。 |
serializer.* | 根据上面 serializer 配置的类型来根据需要添加序列化器的参数 |
Memory Channel
Memory Channel,组件 type 为 memory
:
- event 存储在内存队列中,该队列可配置最大值。
- 对于需要更高吞吐量、允许在 agent 发生故障时丢失分段数据的流而言,它是理想的选择。
属性 | 默认值 | 解释 |
---|---|---|
type | – | 组件类型,这个是: memory |
capacity | 100 | Channel 中存储 event 的最大数量 |
transactionCapacity | 100 | 每次事务中可以从 source 中拿到或者送到 sink 中的 event 的最大数量(不能比 capacity 大) |
keep-alive | 3 | 添加或删除一个 event 的超时时间,单位:秒 |
byteCapacityBufferPercentage | 20 | 指定 event header 所占空间大小与 channel 中所有 event 的总大小之间的百分比 |
byteCapacity | see description | Channel 中最大允许存储所有 event 的总字节数。默认值为 JVM 可用最大内存的80%(即命令行上传递的 -Xmx 值的80%)。 计算总字节时只计算 event body,这也是提供 byteCapacityBufferPercentage 配置参数的原因。注意,当一个 JVM 里面有多个 Memory Channel,并且它们恰好持有相同的物理 event(例如:这些 Channel 通过复制通道选择器接收同一个 Source 中的 event),此时这些 event 占用的空间是累加的,而不会只计算一次。如果这个值设置为 0 (不限制),就会达到 200G 左右的内部硬件限制。 |
如何理解 byteCapacity 和 byteCapacityBufferPercentage?
例子1: byteCapacityBufferPercentage
设置为 20
, byteCapacity
设置为 52428800
(即 50 M),Memory Channel 可用内存是 50 M,此时内存中所有 event body 的总大小就被限制为 50 M *(1-20%) = 40 M。
例子2: byteCapacityBufferPercentage
设置为 10
, byteCapacity
不设置,Memory Channel 可用内存是 80 M,此时内存中所有 event body 的总大小就被限制为 100 M * 80% *(1-10%) = 72 M。
File Channel
File Channel,组件 type 为 file
:
- 默认情况下,File Channel 使用默认的用户主目录内的 checkpoint 目录和数据目录的路径。因此:1.如果 Agent 中有多个活动的 File Channel 实例,则只有一个能够锁定目录并导致其他的 Channel 初始化失败;2.因此有必要为所有已配置的 Channel 显式配置不同的 checkpoint 目录,最好是在不同的磁盘上。
- 由于 File Channel 将在每次提交后同步到磁盘,因此将其与和 event 一起批处理的 sink/source 耦合在一起可能很有必要,以便在多个磁盘不可用于 checkpoint 和数据目录时提供良好的性能。
Property Name Default | Description | |
---|---|---|
type | – | 组件类型,这个是: file |
checkpointDir | ~/.flume/file-channel/checkpoint | 存储 checkpoint 的目录 |
useDualCheckpoints | false | 是否备份 checkpoint。如果设置为 true ,必须设置 backupCheckpointDir 参数 |
backupCheckpointDir | – | 备份 checkpoint 的目录。 此目录不能与数据目录或 checkpointDir 相同 |
dataDirs | ~/.flume/file-channel/data | 用逗号分隔的目录列表,用于存储日志文件。在不同的磁盘上使用多个目录可以提高 File Channel 的性能 |
transactionCapacity | 10000 | Channel 支持的单个事务的最大容量 |
checkpointInterval | 30000 | 检查点的时间间隔(毫秒) |
maxFileSize | 2146435071 | 单个日志文件的最大字节数。这个默认值约等于 2047 MB |
minimumRequiredSpace | 524288000 | 最小空闲空间的字节数。为了避免数据损坏,当空闲空间低于这个值的时候,File Channel 将拒绝一切存取请求 |
capacity | 1000000 | Channel 的最大容量 |
keep-alive | 3 | 存入 event 的最大等待时间(秒) |
use-log-replay-v1 | false | (专家)是否使用老的回放逻辑 (Flume 默认是使用v2版本的回放方法,但是如果v2版本不能正常工作可以考虑通过这个参数改为使用v1版本,v1版本是从Flume1.2开始启用的,回放是指系统关闭或者崩溃前执行的校验检查点文件和文件channel记录是否一致程序) |
use-fast-replay | false | (专家)是否开启快速回放(不使用队列) |
checkpointOnClose | true | 控制 Channel 关闭时是否创建 checkpoint。在关闭位置创建 checkpoint 可以避免回放,从而加快了 File Channel 的后续启动速度 |
encryption.activeKey | – | 加密数据所使用的 key 名称 |
encryption.cipherProvider | – | 加密类型,支持的类型:AESCTRNOPADDING |
encryption.keyProvider | – | 密钥提供者类型,支持的类型: JCEKSFILE |
encryption.keyProvider.keyStoreFile | – | keystore 文件路径 |
encrpytion.keyProvider.keyStorePasswordFile | – | keystore 密码文件路径 |
encryption.keyProvider.keys | – | 所有 key 的列表,包含所有使用过的加密key名称 |
encyption.keyProvider.keys.*.passwordFile | – | 可选的秘钥密码文件路径 |
配置采集方案
创建 spooldir-hdfs.conf
,配置采集方案:
1 | # 命名此agent上的各组件 |
启动Flume
启动 Flume,等待往监视目录添加文件:
1 | bin/flume-ng agent -c conf/ -f conf/spooldir-hdfs.conf -n a1 -Dflume.root.logger=INFO,console |
移动文件到目录
再建立一个 node03 的连接,创建三个测试文件 test1.txt
、 test1.txt
、 test1.txt
。
移动 test1.txt
到监视目录,观察控制台输出:
同时将test1.txt
、 test1.txt
两个文件移动到监视目录,观察控制台输出:
查看 HDFS 中的文件,打印文件内容:
采集文件到HDFS
需求
某业务系统的日志内容不断增加,需要把日志文件中追加的数据实时采集到 HDFS。
分析
Source 使用 exec
,Sink 使用 hdfs
,Channel 可用 memory
或 file
。
Exec Source
Exec Source,组件 type 为 exec
:
- 启动时会运行给定的 Unix 命令(如
cat [named pipe]
、tail -f [file]
),该 Unix 命令往往是能在标准输出上连续产生数据(而stderr
会直接被丢弃 ,除非将属性logStdErr
设置为true
)。如果该 Unix 命令由于某种原因而退出,则该 Source 也将退出并且将不再产生任何数据(如date
命令不能连续产生数据流,在产生数据后便退出)。 exec
源无法保证是可靠地。比如使用tail -f [file]
时,Flume 会将指定文件的每一行作为 event 发送到 Channel,但如果 Channel 已满,数据将会丢失。想要获得更高的可靠性,建议使用 Spooling Directory Source、Taildir Source 或通过 SDK 与 Flume 直接集成。
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | 组件类型,这个是: exec |
command | – | 要执行的命令,一般是 cat 或 tail |
shell | – | 设置用于运行命令的 shell,例如 /bin/sh -c 。仅适用于依赖 shell 功能的命令,例如:通配符、back ticks、管道等 |
restartThrottle | 10000 | 尝试重新启动之前等待的时间,单位:毫秒 |
restart | false | 如果执行的命令挂掉,是否重新启动 |
logStdErr | false | 是否记录命令的 stderr |
batchSize | 20 | 单次读取并发送到 Channel 的最大行数 |
batchTimeout | 3000 | 在未达到缓冲区大小的情况下,将数据推送到下游之前要等待的时间,单位:毫秒 |
selector.type | replicating | 可选值:replicating 或 multiplexing ,分别表示: 复制、多路复用 |
selector.* | Channel 选择器的相关属性,取决于 selector.type 的值 |
|
interceptors | – | 以空格分隔的拦截器列表 |
interceptors.* | 拦截器相关的属性配置 |
配置采集方案
创建 tailfile-hdfs.conf
,配置采集方案:
1 | # Name the components on this agent |
启动Flume
启动 Flume,等待往文件追加数据:
1 | bin/flume-ng agent -c conf -f conf/tailfile-hdfs.conf -n agent1 -Dflume.root.logger=INFO,console |
追加文件内容
创建 Shell 脚本文件 tailfile.sh
,用来往文件中追加内容:
1 |
|
执行脚本,观察控制台输出:
查看 HDFS 中的文件,打印文件内容:
级联
需求
第一个 agent 负责采集日志文件中的数据,并通过网络发送到第二个 agent;第二个 agent 负责接收第一个 agent 发送的数据,并将数据保存到 HDFS。两个 agent 位于两个不同的服务器。
分析
agent1:位于 node02,Source 使用 exec
,Sink 使用 avro
,Channel 使用 memory
。
agent2:位于 node03,Source 使用 avro
,Sink 使用 hdfs
,Channel 使用 memory
。
Avro Source
Avro Source,组件 type 为 avro
:
- 监听 Avro 端口,并从外部 Avro 客户端流接收 event。
- 与另一个 Flume agent 上的内置 Avro Sink 配对时,可以创建出分层的采集拓扑结构。
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | 组件类型,这个是: avro |
bind | – | 监听的服务器名 hostname 或 ip |
port | – | 监听的端口 |
threads | – | 产生的最大工作线程数 |
selector.type | 可选值:replicating 或 multiplexing ,分别表示: 复制、多路复用 |
|
selector.* | Channel 选择器的相关属性,取决于 selector.type 的值 |
|
interceptors | – | 以空格分隔的拦截器列表 |
interceptors.* | ||
compression-type | none | 压缩类型,可选值: none 或 deflate 。这个类型必须跟上一级的 AvroSink 相匹配 |
ssl | false | 将此设置为 true 以启用SSL加密,同时还必须指定 keystore 和 keystore-password |
keystore | – | Java keystore 文件的路径。SSL 必需。 |
keystore-password | – | Java keystore 的密码。SSL 必需。 |
keystore-type | JKS | Java keystore 的类型。可以是 JKS 或 PKCS12 |
exclude-protocols | SSLv3 | 要排除的 SSL/TLS 协议的空格分隔列表。除了指定的协议外,还将始终排除SSLv3 |
ipFilter | false | 将此设置为 true 可启用 netty 的 ipFiltering |
ipFilterRules | – | 使用此配置定义N个 netty ipFilter 模式规则 |
Avro Sink
Avro Sink,组件 type 为 avro
:
- 该 Sink 用于构成 Flume 分层采集拓扑结构的另一半。发送到该 Sink 的 Flume event 将转换为 Avro event,并发送到已配置的主机名/端口。这些 event 是从已配置的 Channel 中以已配置的批次大小批量获取的。
Property Name | Default | Description |
---|---|---|
channel | – | |
type | – | 组件类型,这个是: avro |
hostname | – | 要绑定的 hostname 或 IP 地址 |
port | – | 要监听的端口号 |
batch-size | 100 | 每批量发送的 event 数量 |
connect-timeout | 20000 | 第一次连接请求(握手)的超时时间,单位:毫秒 |
request-timeout | 20000 | 请求超时时间,单位:毫秒 |
reset-connection-interval | none | 重置与下一跳的连接之前的时间(秒)。这将强制 Avro Sink 重新连接到下一跳。这将允许 Sink 在添加了新的主机时连接到硬件负载均衡器后面的主机,而无需重新启动 Agent |
compression-type | none | 压缩类型,可选值: none 或 deflate 。这个类型必须跟下一级的 AvroSource 相匹配 |
compression-level | 6 | event 的压缩级别。0 = 无压缩,1-9 为压缩。数字越高,压缩越多 |
ssl | false | 设置为 true 可以为此 AvroSink 启用 SSL。配置 SSL 时,可以选择设置 truststore 、truststore-password 、truststore-type ,并指定是否 trust-all-certs |
trust-all-certs | false | 如果将其设置为 true ,将不检查远程服务器(Avro Source)的 SSL 服务器证书。这不应在生产中使用,因为它使攻击者更容易执行中间人攻击并“监听”加密的连接 |
truststore | – | 定制 Java truststore 文件的路径。Flume 使用此文件中的证书颁发机构信息来确定是否应信任远程 Avro Source 的 SSL 身份验证凭据。如果未指定,将使用默认的 Java JSSE 证书颁发机构文件(在 Oracle JRE 中通常为 jssecacerts 或 cacerts ) |
truststore-password | – | 指定的 truststore 的密码 |
truststore-type | JKS | Java truststore 的类型。可以是 JKS 或其他受支持的 Java truststore 类型 |
exclude-protocols | SSLv3 | 要排除的 SSL/TLS 协议的空格分隔列表。除了指定的协议外,还将始终排除 SSLv3 |
maxIoWorkers | 2 * 机器中可用处理器的数量 | I/O 工作线程的最大数量。这个是在 NettyAvroRpcClient 的 NioClientSocketChannelFactory 上配置的 |
配置采集方案
node02 创建 tailfile-avro.conf
:
1 | # 命名此agent上的各组件 |
node03 创建 avro-hdfs.conf
:
1 | # 命名此agent上的各组件 |
启动Flume
先启动 node03 上的 Flume,等待和 node02 建立连接,以及接收数据:
1 | bin/flume-ng agent -c conf/ -f conf/avro-hdfs.conf -n a1 -Dflume.root.logger=INFO,console |
后启动 node02 上的 Flume,等待采集数据:
1 | bin/flume-ng agent -c conf/ -f conf/tailfile-avro.conf -n a1 -Dflume.root.logger=INFO,console |
追加文件内容
创建 Shell 脚本文件 tailfile.sh
,用来往文件中追加内容。
执行脚本,观察 node03 控制台输出:
查看 HDFS 中的文件,打印文件内容:
可以发现数据存放在两个目录下,原因是在 avro-hdfs.conf
中 hdfs
Sink 的配置:
- 在
hdfs.path
的路径中配置了.../%H%M/
,分钟作为转义字符串出现在目录的路径中。 - 通过
hdfs.roundValue
和hdfs.roundUnit
配置了对时间戳以 10 min 为单位向下舍去。
故障转移
需求
使用 Flume NG 本身提供的 failover 机制,搭建一个高可用的 Flume NG 集群,实现自动切换和恢复。
分析
使用三个节点搭建 Flume 的高可用集群,角色分配如下:
名称 | 主机 | 角色 |
---|---|---|
Agent1 | node01 | 采集日志文件,将采集数据发送到 Agent2 或 Agent3 |
Agent2 | node02 | 主 Collector,优先接收 Agnet1 采集的数据,存入 HDFS |
Agent3 | node03 | 备 Collector,如果主 Collector 发生故障,代替接收 Agnet1 采集的数据,存入 HDFS |
第一个 agent:位于 node01,Source 使用 exec
,Sink 使用两个 avro
分别连接 Agent2 和 Agent3,Channel 使用 memory
。此外还配置 Sink 组逻辑处理器(Sink Processors),类型设置为 failover
。
第二个 agent:位于 node02,Source 使用 avro
,Sink 使用 hdfs
,Channel 使用 memory
。
第三个 agent:位于 node03,配置和 node02 上的一样 。
Failover Sink Processor
Failover Sink Processor,组件 type 为 failover
:
- Failover Sink Processor 维护了一个 Sink 的优先级列表,以确保发送的 event 能够得到处理(交付)。
- 故障转移机制的工作原理是给故障的 Sink 分配一个退避时间并下放到“冷却池”,退避时间会随着故障次数的增加而增加(直到设置的最大退避时间)。 Sink 成功发送 event 后,它将恢复到“存活池”。Sink 有着与之关联优先级,数值越大,优先级越高。 如果一个 Sink 在发送 event 时发生故障,会尝试下一个优先级最高的 Sink。例如,优先级为 100 的 Sink 会先于优先级为 80 的 Sink 被激活。如果未指定优先级,则根据 Sink 的配置顺序来选取。
- 要使用故障转移选择器,不仅要设置 Sink 组处理器的 type 为
failover
,还要为每一个 Sink 设置一个唯一的优先级数值。 可以使用maxpenalty
属性设置故障转移时间的上限(毫秒)。
属性 | 默认值 | 解释 |
---|---|---|
sinks | – | 这一组所有 Sink 的名字,多个用空格分开 |
processor.type | default | 组件类型,这个是: failover |
processor.priority.<sinkName> | – | 组内 Sink 的权重值,<sinkName> 必须是当前组关联的 Sink 之一。数值(绝对值)越高越早被激活 |
processor.maxpenalty | 30000 | 发生异常的 Sink 的最大退避时间(毫秒) |
Agent1
创建 failover-agent1.conf
,配置采集方案:
1 | # 命名此agent上的各组件 |
Agent2
创建 failover-agent2.conf
,配置采集方案:
1 | # 命名此agent上的各组件 |
Agent3
创建 failover-agent3.conf
,配置采集方案:
1 | # 命名此agent上的各组件 |
启动Flume
先启动 node02 和 node03 上的 Flume,等待和 node01 建立连接,以接收数据:
1 | bin/flume-ng agent -n agent3 -c conf/ -f conf/agent3.conf -Dflume.root.logger=DEBUG,console |
后启动 node01 上的 Flume,等待采集数据:
1 | bin/flume-ng agent -n agent1 -c conf/ -f conf/agent1.conf -Dflume.root.logger=DEBUG,console |
追加文件内容
在 node01 执行 Shell 脚本文件 tailfile.sh
,用来往文件中追加内容。
由于 Agent2 的权重大,观察到 node02 的控制台中输出了文件采集到 HDFS 的信息。此时关闭 Agent2,会由 Agent3 代替执行相应的工作。如果再次启动 Agent2,相应的采集工作会由 Agent3 转移到 Agent2。
打印 HDFS 中的文件列表,发现在下图框起来的部分出现了两次断层,这三部分文件分别由 Agent2、Agent3、Agent2 Sink 到 HDFS 的:
查看文件内容,发现第一次断层处出现了数据丢失,第二次断层处没有数据丢失,说明在故障转移过程中、 Exec Source 在执行 tail
命令时,存在数据丢失现象。而且是在 Agent3 接替因故障离线的 Agent2 时出现的数据丢失;当 Agent2 恢复使用,采集任务平稳地从 Agent3 转移到 Agent2 时,没有出现数据丢失的情况。
Agent2 转移到 Agent3:
Agent3 转移到 Agent2:
负载均衡
需求
使用 Flume NG 本身提供的 load balance 机制,搭建一个负载均衡的 Flume NG 集群,实现负载的轮询分发。
分析
使用三个节点搭建 Flume 的高可用集群,角色分配如下:
名称 | 主机 | 角色 |
---|---|---|
Agent1 | node01 | 采集日志文件,将采集数据发送到 Agent2 或 Agent3 |
Agent2 | node02 | Collector,接收 Agnet1 采集的数据,存入 HDFS |
Agent3 | node03 | Collector,接收 Agnet1 采集的数据,存入 HDFS |
第一个 agent:位于 node01,Source 使用 exec
,Sink 使用两个 avro
分别连接 Agent2 和 Agent3,Channel 使用 memory
。此外还配置 Sink 组逻辑处理器(Sink Processors),类型设置为 load_balance
。
第二个 agent:位于 node02,Source 使用 avro
,Sink 使用 logger
,Channel 使用 memory
。
第三个 agent:位于 node03,配置和 node02 上的一样 。
Load balancing Sink Processor
Load balancing Sink Processor,组件 type 为 load_balance
:
- Load balancing Sink Processor 提供了负载均衡多个 Sink 上的流量的功能。 它维护着一个必须在其上分配负载的活动 Sink 列表的索引。 支持轮询(
round_robin
)和随机(random
)两种选择机制来分配负载。选择机制默认为round_robin
,可以通过配置更改,也支持继承了AbstractSinkSelector
的自定义选择器。 - 调用时,此选择器使用其配置的选择机制选择下一个 Sink 并调用它。如果所选 Sink 无法正常工作,则处理器通过其配置的选择机制选择下一个可用 Sink。 此实现不会将失败的 Sink 列入黑名单,而是继续乐观地尝试每个可用的 Sink。如果所有 Sink 的调用结果都为失败,选择器会将故障抛给 sink runner。
- 如果
backoff
设置为true
,失败的 Sink 会被列入黑名单,达到一定的超时时间后再自动从黑名单放出。 如从黑名单出来后 Sink 仍然无法响应,则再次进入黑名单时设定的超时时间会翻倍,以免陷入对无响应 Sink 的长时间等待中。 如果禁用此功能,所有失败的 Sink 的负载都将传递到线路中的下一个 Sink,无法均衡地平衡。
属性 | 默认值 | 解释 |
---|---|---|
processor.sinks | – | 这一组所有 Sink 的名字,多个用空格分开 |
processor.type | default | 组件类型,这个是: load_balance |
processor.backoff | false | 退避机制 |
processor.selector | round_robin | 选择机制,可选值:round_robin (轮询)、random (随机)、「自定义类的FQCN」:继承了 AbstractSinkSelector 的自定义选择器 |
processor.selector.maxTimeOut | 30000 | 发生异常的Sink的最长退避时间(毫秒) |
Agent1
创建 load_balance-agent1.conf
,配置采集方案:
1 | # 命名此agent上的各组件 |
Agent2
创建 load_balance-agent2.conf
,配置采集方案:
1 | # 命名此agent上的各组件 |
Agent3
创建 load_balance-agent3.conf
,配置采集方案:
1 | # 命名此agent上的各组件 |
启动Flume
先启动 node02 和 node03 上的 Flume,等待和 node01 建立连接,以接收数据:
1 | bin/flume-ng agent -n agent3 -c conf/ -f conf/load_balance-agent3.conf -Dflume.root.logger=DEBUG,console |
后启动 node01 上的 Flume,等待采集数据:
1 | bin/flume-ng agent -n agent1 -c conf/ -f conf/load_balance-agent1.conf -Dflume.root.logger=DEBUG,console |
追加文件内容
在 node01 执行 Shell 脚本文件 tailtime.sh
,用来往文件中追加内容:
1 |
|
观察 node02 和 node03 控制台的输出,可见 Agent2 和 Agent3 交替接收与处理 Agent1 采集来的数据:
拦截器
Flume 支持在运行时对 event 进行修改或丢弃,可以通过拦截器来实现。Flume 里面的拦截器是实现了 org.apache.flume.interceptor.Interceptor
接口的类。拦截器可以根据开发者的意图随意修改甚至丢弃 event。Flume 也支持链式的拦截器执行方式,在配置文件里面配置多个拦截器就可以了。拦截器的顺序取决于它们被初始化的顺序(实际也就是配置的顺序),event 就这样按照顺序经过每一个拦截器,如果想在拦截器里面丢弃 event, 在传递给下一级拦截器的 list 里面把它移除就行了。如果想丢弃所有的 event,返回一个空集合就行了。
需求
把 node01、node02 两台服务器中的日志文件 access.log
、nginx.log
、web.log
采集汇总到 node03 并分类收集到 HDFS 中,要求在 HDFS 中的存储路径如下:
1 | /flume/logs/access/20200220/events.1582189038642 |
分析
三个节点的角色分配如下:
名称 | 主机 | 角色 |
---|---|---|
Agent1 | node01 | 采集日志文件,将采集数据发送到 Agent3 |
Agent2 | node02 | 采集日志文件,将采集数据发送到 Agent3 |
Agent3 | node03 | Collector,接收采集的数据,存入 HDFS |
Agent1:位于 node01,Source 使用 exec
,Sink 使用 avro
连接到 Agent3,Channel 使用 memory
。此外还配置静态拦截器(Static Interceptor),在 event header 中记录采集文件的类型。
Agent2:位于 node02,配置和 node01 上的一样 。
Agent3:位于 node03,Source 使用 avro
,Sink 使用 hdfs
,Channel 使用 memory
。
Static Interceptor
Static Interceptor,组件 type 为 static
:
- Static Interceptor 可以向 event header 中写入一个固定的键值对属性。
- 这个拦截器目前不支持写入多个属性,但是你可以通过配置多个 Static Interceptor 来实现。
属性 | 默认值 | 解释 |
---|---|---|
type | – | 组件类型,这个是: static |
preserveExisting | true | 如果header中已经存在同名的属性是否保留 |
key | key | 写入header的key |
value | value | 写入header的值 |
Agent1
创建 interceptors-agent1.conf
,配置采集方案:
1 | # 命名此agent上的各组件 |
Agent2
创建 interceptors-agent2.conf
,配置采集方案:
1 | # 命名此agent上的各组件 |
Agent3
创建 interceptors-agent3.conf
,配置采集方案:
1 | # 命名此agent上的各组件 |
启动Flume
先启动 node03 上的 Flume,等待和 node01、node02 建立连接,以接收数据:
1 | bin/flume-ng agent -n agent3 -c conf -f conf/interceptors-agent3.conf -Dflume.root.logger=DEBUG,console |
后启动 node01 上的 Flume,等待采集数据:
1 | bin/flume-ng agent -n agent1 -c conf -f conf/interceptors-agent1.conf -Dflume.root.logger=DEBUG,console |
追加文件内容
在 node01 和 node02 执行 Shell 脚本文件 tailtimes.sh
,用来往文件中追加内容:
1 |
|
打印 HDFS 中的文件列表,发现根据 header 中的信息在 HDFS 生成了 access
、nginx
、web
三个目录;查看文件内容,发现来自不同服务器的同类数据成功汇聚进同一个文件:
自定义拦截器
需求
在数据采集之后,通过 Flume 的拦截器,过滤掉不需要的数据,并将指定的第一个字段进行 MD5 加密,加密之后保存 到 HDFS上面。
分析
创建工程
1 | <repositories> |
自定义拦截器
1 | package moe.sannaha.iterceptor; |
打包
将自定义拦截器打包得到 flume_interceptor.jar
并放到flume的 lib
目录下。
配置采集方案
创建 custom-interceptor.conf
,配置采集方案:
1 | c1 = |
启动Flume
1 | bin/flume-ng agent -c -n a1 conf -f conf/spool-interceptor-hdfs.conf -Dflume.root.logger=DEBUG,console |
移动文件到目录
新建测试数据文件,移动到 /export/data/intercept/
目录下:
1 | 13601249301 100 200 300 400 500 600 700 |
Flume 1.6.0 User Guide
Flume 1.8用户手册中文版
Using Flume by Hari Shreedharan - Channels
Flume NG:Flume 发展史上的第一次革命