0%

Flink 快速上手

Flink头图

Apache Flink 是一个开源的流处理框架,具有强大的 流处理批处理 能力,主要由 Java 代码实现,支持使用 Java、Scala 和 Python 等语言开发。

Flink 的主要特性包括:批流一体化、精密的状态管理、事件时间支持以及精确一次的状态一致性保障等。Flink 集成了所有常见的集群资源管理器,例如 Hadoop YARN、 Apache Mesos 和 Kubernetes,但同时也可以作为独立集群运行。


概述

简介

Apache Flink 是一个框架和分布式处理引擎,用于在无边界有边界数据流上进行有状态的计算。Flink 能在所有常见集群环境中运行,并能以内存速度和任意规模进行计算。

处理无界和有界数据
任何类型的数据都可以形成一种事件流。数据可以被作为 无界 或者 有界 流来处理。

  1. 无界流:有定义流的开始,但没有定义流的结束。它们会无休止地产生数据。无界流的数据必须持续处理,即数据被摄取后需要立刻处理。我们不能等到所有数据都到达再处理,因为输入是无限的,在任何时候输入都不会完成。处理无界数据通常要求以特定顺序摄取事件,例如事件发生的顺序,以便能够推断结果的完整性。

  2. 有界流:有定义流的开始,也有定义流的结束。有界流可以在摄取所有数据后再进行计算。有界流所有数据可以被排序,所以并不需要有序摄取。有界流处理通常被称为批处理

有界流-无界流

Apache Flink 擅长处理无界和有界数据集。精确的时间控制和状态化使得 Flink 的运行时 (runtime) 能够运行任何处理无界流的应用。有界流则由一些专为固定大小数据集特殊设计的算法和数据结构进行内部处理,产生了出色的性能。

利用内存性能
有状态的 Flink 程序针对本地状态访问进行了优化。任务的状态始终保留在内存中,如果状态大小超过可用内存,则会保存在能高效访问的磁盘数据结构中。任务通过访问本地(通常在内存中)状态来进行所有的计算,从而产生非常低的处理延迟。Flink 通过定期和异步地对本地状态进行持久化存储来保证故障场景下精确一次的状态一致性。

local-state

对比Storm和Spark

Flink、Spark Streaming、Storm、Storm Trient都可以进行实时计算,但各有特点:

计算框架 处理模型 保证次数 容错机制 延时 吞吐量
Storm native(数据进入立即处理) At-least-once ACK机制
Storm Trident micro-batching(划分小批处理) Exactly-once ACK机制
Spark Streaming micro-batching Exactly-once 基于RDD和 checkpoint
Flik native、micro-batching Exactly-once checkpoint(Flink快照)

Blink Flink 的一个分支,由阿里打造,对 Flink 进行高度定制。

安装

开发环境

  1. 三台虚拟机
1
2
3
192.168.153.100  node01
192.168.153.101 node02
192.168.153.102 node03
  1. 目录结构
1
2
/export/softwares:存放软件安装包
​/export/servers:软件安装目录
  1. 软件环境
1
2
3
java: jdk1.8.0_141
zookeeper: zookeeper-3.4.5-cdh5.14.0
hadoop: hadoop-2.6.0-cdh5.14.0

下载解压

  1. 下载 Flink 到 node01
/export/softwares/
1
wget http://archive.apache.org/dist/flink/flink-1.6.0/flink-1.6.0-bin-hadoop26-scala_2.11.tgz
  1. node01 上解压 Flink
1
tar -zxvf /export/softwares/flink-1.6.0-bin-hadoop26-scala_2.11.tgz -C /export/servers/

选择安装模式

Flink 支持多种安装模式:

  • Local:本地模式,将 Flink 运行在单机,一般不使用
  • StandAlone:独立部署模式,使用 Flink 自带的集群,由 Jobmanager(主) 和 Taskmanager(从) 组成,用于开发、测试环境
  • YARN:将 Flink 运行在 YARN 集群上,由 YARN 统一管理分配资源,不需要额外构建 Flink 集群,用于生产环境

Flink参数配置一览: Configuration

StandAlone

StandAlone

  • Client:客户端,提交任务给 JobManager
  • JobManager:主节点,负责管理 Flink 集群计算资源,并分发任务给 TaskManager
  • TaskManager:从节点,负责执行任务,定期向 JobManager 汇报状态

官方教程: Standalone Cluster

配置

  1. node01 上修改 flink-conf.yaml 文件
/export/servers/flink-1.6.0/conf/flink-conf.yaml
1
2
3
4
5
6
# jobmanager.rpc.address: localhost
+ # 配置Master节点的地址
+ jobmanager.rpc.address: node01

+ # 配置每个taskmanager生成的临时文件目录
+ taskmanager.tmp.dirs: /export/servers/flink-1.6.0/tmp
  1. node01 上修改 slaves 文件
/export/servers/flink-1.6.0/conf/slaves
1
2
3
4
- localhost
+ node01
+ node02
+ node03
  1. 将 Flink 分发到 node02 和 node03
1
2
scp -r /export/servers/flink-1.6.0/ node02:/export/servers/
scp -r /export/servers/flink-1.6.0/ node03:/export/servers/
  1. 修改环境变量,添加HADOOP_CONF_DIR目录,在每个节点上执行
1
2
# vim /etc/profile
export HADOOP_CONF_DIR=/export/servers/hadoop-2.6.0-cdh5.14.0/etc/hadoop
  1. 重新加载环境变量,在每个节点上执行
1
source /etc/profile

启动

前提:启动了 HDFS 集群

启动 Flink 集群:

1
/export/servers/flink-1.6.0/bin/start-cluster.sh

查看监控页面: Apache Flink Dashboard

Flink监控页面

测试

  1. 创建wordcount.txt文件,文件内容如下
1
2
3
hadoop,spark,zookeeper
spark,scala
hadoop,impala
  1. 在 HDFS 上创建目录,上传wordcount.txt文件
1
2
hdfs dfs -mkdir -p /test/input
hdfs dfs -put /export/data/wordcount.txt /test/input
  1. 运行测试任务,统计词频,指定输入路径和输出路径
1
2
3
4
5
6
7
8
9
# /export/servers/flink-1.6.0/bin/flink run /export/servers/flink-1.6.0/examples/batch/WordCount.jar \
--input hdfs://node01:8020/test/input/wordcount.txt \
--output hdfs://node01:8020/test/output/result.txt

2019-07-23 15:14:27,684 WARN org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
Starting execution of program
Program execution finished
Job with JobID fb1448c5840d02939983b7e3c5c3d7b4 has finished.
Job Runtime: 21916 ms
  1. 查看运行结果
1
2
3
4
5
6
# hdfs dfs -cat /test/output/result.txt
hadoop 2
impala 1
scala 1
spark 2
zookeeper 1

StandAlone HA

StandAlone-HA

  • Client:客户端,提交任务给 JobManager
  • JobManager:主节点,负责管理 Flink 集群计算资源,并分发任务给 TaskManager
  • TaskManager:从节点,负责执行任务,定期向 JobManager 汇报状态
  • Zookeeper:管理集群运行,实现 JobManager 主备切换

官方教程: YARN Setup

配置

  1. node01 上修改 flink-conf.yaml 文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# jobmanager.rpc.address: localhost
# 配置jobmanager的IP地址
jobmanager.rpc.address: node01

# 配置每个taskmanager生成的临时文件夹
taskmanager.tmp.dirs: /export/servers/flink-1.6.0/tmp

# high-availability: zookeeper
+ # 使用zookeeper实现高可用
+ high-availability: zookeeper

# high-availability.storageDir: hdfs:///flink/ha/
+ # 存储JobManager的元数据到HDFS
+ high-availability.storageDir: hdfs://node01:8020/flink/ha/

# high-availability.zookeeper.quorum: localhost:2181
+ high-availability.zookeeper.quorum: node01:2181,node02:2181,node03:2181

# state.backend: filesystem
+ # 开启HA,使用文件系统作为快照存储
+ state.backend: filesystem

# state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
+ # 启用检查点,将快照保存到HDFS
+ state.checkpoints.dir: hdfs://node01:8020/flink-checkpoints
  1. node01 上修改 slaves 文件,添加多个从节点
/export/servers/flink-1.6.0/conf/slaves
1
2
3
node01
node02
node03
  1. node01 上修改 master 文件,添加多个主节点
1
2
node01:8081
node02:8081
  1. 将 Flink 分发到 node02 和 node03
1
2
scp -r /export/servers/flink-1.6.0/ node02:/export/servers/
scp -r /export/servers/flink-1.6.0/ node03:/export/servers/
  1. node02 上修改 flink-conf.yaml,将 jobmanager 设置为 node02 的地址
1
2
- jobmanager.rpc.address: node01
+ jobmanager.rpc.address: node02
  1. 修改环境变量
1
2
# vim /etc/profile
export HADOOP_CONF_DIR=/export/servers/hadoop-2.6.0-cdh5.14.0/etc/hadoop
  1. 重新加载环境变量
1
source /etc/profile

启动

前提:启动了 Zookeeper 集群和 HDFS 集群

启动 Flink 集群:

1
/export/servers/flink-1.6.0/bin/start-cluster.sh

查看监控页面: node01 node02

注:如果访问的页面是备节点的,会重定向到主节点的页面。

YARN

YARN

注:Uberjar,也叫Fatjar,包含原生jar的所有依赖。Über为德语,意为overabroveUberjar表示比简单的jar高级

在生产环境中,需要将 Flink 运行到 YARN 集群上,使用 YARN 来管理所有计算资源,不用额外构建 Flink 集群。只需在任意一台机器上提交作业到 YARN 集群即可。

官方教程: YARN Setup

配置

YARN 模式使用 YARN 集群,无需配置 Flink 集群。

注:修改 Hadoop 的 yarn-site.xml文件,配置为不自动检查内存,以免运行的 Flink 程序超过分配的内存,导致任务被杀掉。

/export/servers/hadoop-2.6.0-cdh5.14.0/etc/hadoop/yarn-site.xml
1
2
3
4
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>

启动

前提:启动 HDFS、YARN 集群

启动 Flink Session

Session 将启动所有必需的 Flink 服务(JobManager 和 TaskManagers),以便将程序提交到集群。可以在每个会话中运行多个程序,适用于有大量小作业的场景。

/export/servers/flink-1.6.0/
1
2
3
4
5
6
7
8
9
10
11
12
bin/yarn-session.sh -tm 800 -s 1

参数:
-D <arg> 动态属性
-d,--detached 以后台模式启动
-jm,--jobManagerMemory <arg> 每个JobManager申请多少内存(MB)
-nm,--name 在YARN上为应用程序设置自定义名称
-q,--query 显示可用的YARN资源(内存,内核)
-qu,--queue <arg> 指定YARN队列
-s,--slots <arg> 每个TaskManager的插槽数
-tm,--taskManagerMemory <arg> 每个TaskManager申请多少内存(MB)
-z,--zookeeperNamespace <arg> 命名空间,用于为HA模式创建Zookeeper子路径

查看 YARN 的监控页面: http://node01:8088

YARN-监控页面

如果正在执行计算任务,可以点击 ApplicationMaster 查看任务执行情况。

后台模式启动 YARN Session

如果不希望 Flink YARN 客户端始终保持前台运行,还可以后台启动 YARN Session,使用的参数为-d--detached。在这种情况下,Flink YARN 客户端将仅向集群提交 Flink,然后自行关闭。

注:后台模式下可以使用 yarn application -kill <appId> 来停止 YARN Session:

1
yarn application -kill application_1563879343375_0005

提交作业

运行测试任务,统计词频,使用默认的输入路径和输出路径:

/export/servers/flink-1.6.0/
1
2
3
4
5
6
bin/flink run examples/batch/WordCount.jar

参数:
-c,--class <classname> 指定程序入口类("main" 方法或 "getPlan()" 方法),只有当 JAR 包没有在 manifest 中指定类时需要使用该参数
-m,--jobmanager <host:port> 指定 JobManager(主节点)的地址
-p,--parallelism <parallelism> 设置并行度

查看任务执行情况:

YARN-任务监控

查看输出结果:

YARN-输出结果

Flink编程初体验

需求:编写 Flink 程序,用来统计单词的数量。

使用批处理和流处理两种方式实现需求。

批处理程序

  1. 在 IDEA 中创建 Maven 项目
  2. 导入依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
 <properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.version>2.11.2</scala.version>
<scala.compat.version>2.11</scala.compat.version>
<hadoop.version>2.6.0</hadoop.version>
<flink.version>1.6.0</flink.version>
</properties>

<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_2.11</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.22</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.9_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>


<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.5.1</version>
<configuration>
<source>${maven.compiler.source}</source>
<target>${maven.compiler.target}</target>
<!--<encoding>${project.build.sourceEncoding}</encoding>-->
</configuration>
</plugin>

<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<!--<arg>-make:transitive</arg>-->
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>

</configuration>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18.1</version>
<configuration>
<useFile>false</useFile>
<disableXmlReport>true</disableXmlReport>
<includes>
<include>**/*Test.*</include>
<include>**/*Suite.*</include>
</includes>
</configuration>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<!-- zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF -->
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>moe.sannaha.batch.BatchWordCountDemo</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
  1. 编写代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import org.apache.flink.api.scala._

/**
* 写一个批处理程序
* 需求:编写 Flink 程序,用来统计单词的数量
*/
object BatchWordCountDemo {
/**
* 实现思路:
* 1. 初始化 Flink 执行环境
* 2. 构建数据源
* 3. 处理数据
* 4. 保存数据
*/
def main(args: Array[String]): Unit = {
//1. 初始化 Flink 执行环境
val env = ExecutionEnvironment.getExecutionEnvironment

//2. 构建数据源
//从内存集合中创建 Flink 能够处理的数据源
val textDataSet: DataSet[String] = env.fromCollection(
List("Hadoop,Spark,Flink", "Flink,Hive,Flume", "flume,Hadoop,Flink")
)

//3. 处理数据
//3.1 根据逗号将单个字符串切分为字符串数组,使用 flatMap 算子将数组压平,获取每个单词
val wordDataSet: DataSet[String] = textDataSet.flatMap(_.split(","))

//3.2 每个单词转换成一个元组对象,每个单词标记出现一次
val wordWithMark: DataSet[(String, Int)] = wordDataSet.map((_,1))

//3.3 按照元组的第一个字段进行分组
val groupedDataSet: GroupedDataSet[(String, Int)] = wordWithMark.groupBy(0)

//3.4 使用 Flink 内置的聚合函数进行单词的累计
val sumDataSet: AggregateDataSet[(String, Int)] = groupedDataSet.sum(1)

//4. 保存数据(这里直接在控制台打印输出)
sumDataSet.print()
}
}
  1. 查看输出结果
1
2
3
4
5
6
(Hive,1)
(Spark,1)
(Hadoop,2)
(flume,1)
(Flink,3)
(Flume,1)

流处理程序

  1. 在 IDEA 中创建 Maven 项目
  2. 导入依赖,与批处理程序相同
  3. 编写代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
object StreamWordCountDemo {
/**
* 步骤
* 1. 初始化 Flink 流处理运行环境
* 2. 构建数据源
* 3. 处理数据
* 4. 保存数据
* 5. 执行
*/
def main(args: Array[String]): Unit = {
//1. 获取流处理运行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment

//2. 构建数据源
//构建 socket 流数据源,指定 IP 地址和端口号
val socketDataStream = env.socketTextStream("node01", 9999)

//3. 处理数据
//3.1 对接收到的数据切分、压平,并转换成元组,每个单词标记出现一次
val wordWithMark: DataStream[(String, Int)] = socketDataStream.flatMap(_.split(" ")).map(_ -> 1)

//3.2 使用 keyBy 进行分流(分组)
val groupDataStream: KeyedStream[(String, Int), Tuple] = wordWithMark.keyBy(0)

//3.3 使用 timeWindow 指定窗口的长度和滑动距离(都为5秒)
//val windowDataStream: WindowedStream[(String, Int), Tuple, TimeWindow] = groupDataStream.timeWindow(Time.seconds(5))

//3.3 使用 timeWindow 指定窗口的长度(10秒)和滑动距离(5秒)
//val windowDataStream: WindowedStream[(String, Int), Tuple, TimeWindow] = groupDataStream.timeWindow(Time.seconds(10), Time.seconds(1))

//3.3 使用 countWindow 指定窗口的长度(5条)和滑动距离(3条),一个分组每接收到 3 条数据,统计过去的 5 条数据
val windowDataStream: WindowedStream[(String, Int), Tuple, GlobalWindow] = groupDataStream.countWindow(5, 3)

//3.4 使用 sum 根据元组的第二个字段(次数)进行累加
//val resultDataStream: DataStream[(String, Int)] = windowDataStream.sum(1)

//3.4 使用 reduce 执行累加
val resultDataStream: DataStream[(String, Int)] = windowDataStream.reduce((x, y) => (x._1, x._2 + y._2))

//4. 打印输出
resultDataStream.print()

//5. 执行
env.execute()
}
}
  1. 启动程序后,在 node01 的 netcat 端发送单词数据,同时在控制台查看输出结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# 使用 timeWindow
发送:
# nc -lk 9999
spark hive spark spark
hive spark hive
hive
hive hive
hive spark spark
接收:
4> (hadoop,1)
1> (spark,1)
1> (hive,1)
4> (flink,1)
1> (hive,1)
1> (spark,1)
3> (spring,1)
4> (hadoop,1)
4> (flink,2)

# 使用 countWindow
发送:
# nc -lk 9999
spark hive spark spark
hive spark hive
hive
hive hive
hive spark spark
接收:
1> (spark,3)
1> (hive,3)
1> (hive,5)
1> (spark,5)
  • 本文作者: SANNAHA
  • 本文链接: https://sannaha.moe/Flink/
  • 版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!