Apache Flink 是一个开源的流处理框架,具有强大的 流处理 和 批处理 能力,主要由 Java 代码实现,支持使用 Java、Scala 和 Python 等语言开发。
Flink 的主要特性包括:批流一体化、精密的状态管理、事件时间支持以及精确一次的状态一致性保障等。Flink 集成了所有常见的集群资源管理器,例如 Hadoop YARN、 Apache Mesos 和 Kubernetes,但同时也可以作为独立集群运行。
概述 简介 Apache Flink 是一个框架和分布式处理引擎,用于在无边界 和有边界 数据流上进行有状态的计算。Flink 能在所有常见集群环境中运行,并能以内存速度和任意规模进行计算。
处理无界和有界数据 任何类型的数据都可以形成一种事件流。数据可以被作为 无界 或者 有界 流来处理。
无界流 :有定义流的开始,但没有定义流的结束。它们会无休止地产生数据。无界流的数据必须持续处理,即数据被摄取后需要立刻处理。我们不能等到所有数据都到达再处理,因为输入是无限的,在任何时候输入都不会完成。处理无界数据通常要求以特定顺序摄取事件,例如事件发生的顺序,以便能够推断结果的完整性。
有界流 :有定义流的开始,也有定义流的结束。有界流可以在摄取所有数据后再进行计算。有界流所有数据可以被排序,所以并不需要有序摄取。有界流处理通常被称为批处理
Apache Flink 擅长处理无界和有界数据集 。精确的时间控制和状态化使得 Flink 的运行时 (runtime) 能够运行任何处理无界流的应用。有界流则由一些专为固定大小数据集特殊设计的算法和数据结构进行内部处理,产生了出色的性能。
利用内存性能 有状态的 Flink 程序针对本地状态访问进行了优化。任务的状态始终保留在内存中,如果状态大小超过可用内存,则会保存在能高效访问的磁盘数据结构中。任务通过访问本地(通常在内存中)状态来进行所有的计算,从而产生非常低的处理延迟。Flink 通过定期和异步地对本地状态进行持久化存储来保证故障场景下精确一次的状态一致性。
对比Storm和Spark Flink、Spark Streaming、Storm、Storm Trient都可以进行实时计算,但各有特点:
计算框架
处理模型
保证次数
容错机制
延时
吞吐量
Storm
native(数据进入立即处理)
At-least-once
ACK机制
低
低
Storm Trident
micro-batching(划分小批处理)
Exactly-once
ACK机制
中
中
Spark Streaming
micro-batching
Exactly-once
基于RDD和 checkpoint
中
高
Flik
native、micro-batching
Exactly-once
checkpoint(Flink快照)
低
高
Blink Blink 是 Flink 的一个分支,由阿里打造,对 Flink 进行高度定制。
安装 开发环境
三台虚拟机
1 2 3 192.168.153.100 node01 192.168.153.101 node02 192.168.153.102 node03
目录结构
1 2 /export/softwares:存放软件安装包 /export/servers:软件安装目录
软件环境
1 2 3 java: jdk1.8.0_141 zookeeper: zookeeper-3.4.5-cdh5.14.0 hadoop: hadoop-2.6.0-cdh5.14.0
下载解压
下载 Flink 到 node01
/export/softwares/ 1 wget http://archive.apache.org/dist/flink/flink-1.6.0/flink-1.6.0-bin-hadoop26-scala_2.11.tgz
node01 上解压 Flink
1 tar -zxvf /export /softwares/flink-1.6.0-bin-hadoop26-scala_2.11.tgz -C /export /servers/
选择安装模式 Flink 支持多种安装模式:
Local:本地模式,将 Flink 运行在单机,一般不使用
StandAlone:独立部署模式,使用 Flink 自带的集群,由 Jobmanager(主) 和 Taskmanager(从) 组成,用于开发、测试环境
YARN:将 Flink 运行在 YARN 集群上,由 YARN 统一管理分配资源,不需要额外构建 Flink 集群,用于生产环境
Flink参数配置一览: Configuration
StandAlone
Client:客户端,提交任务给 JobManager
JobManager:主节点,负责管理 Flink 集群计算资源,并分发任务给 TaskManager
TaskManager:从节点,负责执行任务,定期向 JobManager 汇报状态
官方教程: Standalone Cluster
配置
node01 上修改 flink-conf.yaml
文件
/export/servers/flink-1.6.0/conf/flink-conf.yaml 1 2 3 4 5 6 # jobmanager.rpc.address: localhost + # 配置Master节点的地址 + jobmanager.rpc.address: node01 + # 配置每个taskmanager生成的临时文件目录 + taskmanager.tmp.dirs: /export/servers/flink-1.6.0/tmp
node01 上修改 slaves
文件
/export/servers/flink-1.6.0/conf/slaves 1 2 3 4 - localhost + node01 + node02 + node03
将 Flink 分发到 node02 和 node03
1 2 scp -r /export /servers/flink-1.6.0/ node02:/export /servers/ scp -r /export /servers/flink-1.6.0/ node03:/export /servers/
修改环境变量,添加HADOOP_CONF_DIR
目录,在每个节点上执行
1 2 export HADOOP_CONF_DIR=/export /servers/hadoop-2.6.0-cdh5.14.0/etc/hadoop
重新加载环境变量,在每个节点上执行
启动 前提:启动了 HDFS 集群
启动 Flink 集群:
1 /export /servers/flink-1.6.0/bin/start-cluster.sh
查看监控页面: Apache Flink Dashboard
测试
创建wordcount.txt
文件,文件内容如下
1 2 3 hadoop,spark,zookeeper spark,scala hadoop,impala
在 HDFS 上创建目录,上传wordcount.txt
文件
1 2 hdfs dfs -mkdir -p /test /input hdfs dfs -put /export /data/wordcount.txt /test /input
运行测试任务,统计词频,指定输入路径和输出路径
1 2 3 4 5 6 7 8 9 --input hdfs://node01:8020/test /input/wordcount.txt \ --output hdfs://node01:8020/test /output/result.txt 2019-07-23 15:14:27,684 WARN org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. Starting execution of program Program execution finished Job with JobID fb1448c5840d02939983b7e3c5c3d7b4 has finished. Job Runtime: 21916 ms
查看运行结果
1 2 3 4 5 6 hadoop 2 impala 1 scala 1 spark 2 zookeeper 1
StandAlone HA
Client:客户端,提交任务给 JobManager
JobManager:主节点,负责管理 Flink 集群计算资源,并分发任务给 TaskManager
TaskManager:从节点,负责执行任务,定期向 JobManager 汇报状态
Zookeeper:管理集群运行,实现 JobManager 主备切换
官方教程: YARN Setup
配置
node01 上修改 flink-conf.yaml
文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 # jobmanager.rpc.address: localhost # 配置jobmanager的IP地址 jobmanager.rpc.address: node01 # 配置每个taskmanager生成的临时文件夹 taskmanager.tmp.dirs: /export/servers/flink-1.6.0/tmp # high-availability: zookeeper + # 使用zookeeper实现高可用 + high-availability: zookeeper # high-availability.storageDir: hdfs:///flink/ha/ + # 存储JobManager的元数据到HDFS + high-availability.storageDir: hdfs://node01:8020/flink/ha/ # high-availability.zookeeper.quorum: localhost:2181 + high-availability.zookeeper.quorum: node01:2181,node02:2181,node03:2181 # state.backend: filesystem + # 开启HA,使用文件系统作为快照存储 + state.backend: filesystem # state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints + # 启用检查点,将快照保存到HDFS + state.checkpoints.dir: hdfs://node01:8020/flink-checkpoints
node01 上修改 slaves
文件,添加多个从节点
/export/servers/flink-1.6.0/conf/slaves
node01 上修改 master
文件,添加多个主节点
将 Flink 分发到 node02 和 node03
1 2 scp -r /export /servers/flink-1.6.0/ node02:/export /servers/ scp -r /export /servers/flink-1.6.0/ node03:/export /servers/
node02 上修改 flink-conf.yaml
,将 jobmanager
设置为 node02 的地址
1 2 - jobmanager.rpc.address: node01 + jobmanager.rpc.address: node02
修改环境变量
1 2 export HADOOP_CONF_DIR=/export /servers/hadoop-2.6.0-cdh5.14.0/etc/hadoop
重新加载环境变量
启动 前提:启动了 Zookeeper 集群和 HDFS 集群
启动 Flink 集群:
1 /export /servers/flink-1.6.0/bin/start-cluster.sh
查看监控页面: node01 , node02
注:如果访问的页面是备节点的,会重定向到主节点的页面。
YARN
注:Uberjar
,也叫Fatjar
,包含原生jar的所有依赖。Über
为德语,意为over
、abrove
,Uberjar
表示比简单的jar高级
在生产环境中,需要将 Flink 运行到 YARN 集群上,使用 YARN 来管理所有计算资源,不用额外构建 Flink 集群。只需在任意一台机器上提交作业到 YARN 集群即可。
官方教程: YARN Setup
配置 YARN 模式使用 YARN 集群,无需配置 Flink 集群。
注:修改 Hadoop 的 yarn-site.xml
文件,配置为不自动检查内存,以免运行的 Flink 程序超过分配的内存,导致任务被杀掉。
/export/servers/hadoop-2.6.0-cdh5.14.0/etc/hadoop/yarn-site.xml 1 2 3 4 <property > <name > yarn.nodemanager.vmem-check-enabled</name > <value > false</value > </property >
启动 前提:启动 HDFS、YARN 集群
启动 Flink Session
Session 将启动所有必需的 Flink 服务(JobManager 和 TaskManagers),以便将程序提交到集群。可以在每个会话中运行多个程序,适用于有大量小作业的场景。
/export/servers/flink-1.6.0/ 1 2 3 4 5 6 7 8 9 10 11 12 bin/yarn-session.sh -tm 800 -s 1 参数: -D <arg> 动态属性 -d,--detached 以后台模式启动 -jm,--jobManagerMemory <arg> 每个JobManager申请多少内存(MB) -nm,--name 在YARN上为应用程序设置自定义名称 -q,--query 显示可用的YARN资源(内存,内核) -qu,--queue <arg> 指定YARN队列 -s,--slots <arg> 每个TaskManager的插槽数 -tm,--taskManagerMemory <arg> 每个TaskManager申请多少内存(MB) -z,--zookeeperNamespace <arg> 命名空间,用于为HA模式创建Zookeeper子路径
查看 YARN 的监控页面: http://node01:8088
如果正在执行计算任务,可以点击 ApplicationMaster 查看任务执行情况。
后台模式启动 YARN Session
如果不希望 Flink YARN 客户端始终保持前台运行,还可以后台启动 YARN Session,使用的参数为-d
或--detached
。在这种情况下,Flink YARN 客户端将仅向集群提交 Flink,然后自行关闭。
注:后台模式下可以使用 yarn application -kill <appId>
来停止 YARN Session:
1 yarn application -kill application_1563879343375_0005
提交作业
运行测试任务,统计词频,使用默认的输入路径和输出路径:
/export/servers/flink-1.6.0/ 1 2 3 4 5 6 bin/flink run examples/batch/WordCount.jar 参数: -c,--class <classname> 指定程序入口类("main" 方法或 "getPlan()" 方法),只有当 JAR 包没有在 manifest 中指定类时需要使用该参数 -m,--jobmanager <host:port> 指定 JobManager(主节点)的地址 -p,--parallelism <parallelism> 设置并行度
查看任务执行情况:
查看输出结果:
Flink编程初体验 需求:编写 Flink 程序,用来统计单词的数量。
使用批处理和流处理两种方式实现需求。
批处理程序
在 IDEA 中创建 Maven 项目
导入依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 <properties > <maven.compiler.source > 1.8</maven.compiler.source > <maven.compiler.target > 1.8</maven.compiler.target > <encoding > UTF-8</encoding > <scala.version > 2.11.2</scala.version > <scala.compat.version > 2.11</scala.compat.version > <hadoop.version > 2.6.0</hadoop.version > <flink.version > 1.6.0</flink.version > </properties > <dependencies > <dependency > <groupId > org.scala-lang</groupId > <artifactId > scala-library</artifactId > <version > ${scala.version}</version > </dependency > <dependency > <groupId > org.apache.flink</groupId > <artifactId > flink-streaming-scala_2.11</artifactId > <version > ${flink.version}</version > </dependency > <dependency > <groupId > org.apache.flink</groupId > <artifactId > flink-scala_2.11</artifactId > <version > ${flink.version}</version > </dependency > <dependency > <groupId > org.apache.flink</groupId > <artifactId > flink-clients_2.11</artifactId > <version > ${flink.version}</version > </dependency > <dependency > <groupId > org.apache.flink</groupId > <artifactId > flink-table_2.11</artifactId > <version > ${flink.version}</version > </dependency > <dependency > <groupId > org.apache.hadoop</groupId > <artifactId > hadoop-client</artifactId > <version > ${hadoop.version}</version > </dependency > <dependency > <groupId > mysql</groupId > <artifactId > mysql-connector-java</artifactId > <version > 5.1.38</version > </dependency > <dependency > <groupId > com.alibaba</groupId > <artifactId > fastjson</artifactId > <version > 1.2.22</version > </dependency > <dependency > <groupId > org.apache.flink</groupId > <artifactId > flink-connector-kafka-0.9_2.11</artifactId > <version > ${flink.version}</version > </dependency > </dependencies > <build > <sourceDirectory > src/main/scala</sourceDirectory > <testSourceDirectory > src/test/scala</testSourceDirectory > <plugins > <plugin > <groupId > org.apache.maven.plugins</groupId > <artifactId > maven-compiler-plugin</artifactId > <version > 2.5.1</version > <configuration > <source > ${maven.compiler.source}</source > <target > ${maven.compiler.target}</target > </configuration > </plugin > <plugin > <groupId > net.alchim31.maven</groupId > <artifactId > scala-maven-plugin</artifactId > <version > 3.2.0</version > <executions > <execution > <goals > <goal > compile</goal > <goal > testCompile</goal > </goals > <configuration > <args > <arg > -dependencyfile</arg > <arg > ${project.build.directory}/.scala_dependencies</arg > </args > </configuration > </execution > </executions > </plugin > <plugin > <groupId > org.apache.maven.plugins</groupId > <artifactId > maven-surefire-plugin</artifactId > <version > 2.18.1</version > <configuration > <useFile > false</useFile > <disableXmlReport > true</disableXmlReport > <includes > <include > **/*Test.*</include > <include > **/*Suite.*</include > </includes > </configuration > </plugin > <plugin > <groupId > org.apache.maven.plugins</groupId > <artifactId > maven-shade-plugin</artifactId > <version > 2.3</version > <executions > <execution > <phase > package</phase > <goals > <goal > shade</goal > </goals > <configuration > <filters > <filter > <artifact > *:*</artifact > <excludes > <exclude > META-INF/*.SF</exclude > <exclude > META-INF/*.DSA</exclude > <exclude > META-INF/*.RSA</exclude > </excludes > </filter > </filters > <transformers > <transformer implementation ="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer" > <mainClass > moe.sannaha.batch.BatchWordCountDemo</mainClass > </transformer > </transformers > </configuration > </execution > </executions > </plugin > </plugins > </build >
编写代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 import org.apache.flink.api.scala._object BatchWordCountDemo { def main (args: Array [String ]): Unit = { val env = ExecutionEnvironment .getExecutionEnvironment val textDataSet: DataSet [String ] = env.fromCollection( List ("Hadoop,Spark,Flink" , "Flink,Hive,Flume" , "flume,Hadoop,Flink" ) ) val wordDataSet: DataSet [String ] = textDataSet.flatMap(_.split("," )) val wordWithMark: DataSet [(String , Int )] = wordDataSet.map((_,1 )) val groupedDataSet: GroupedDataSet [(String , Int )] = wordWithMark.groupBy(0 ) val sumDataSet: AggregateDataSet [(String , Int )] = groupedDataSet.sum(1 ) sumDataSet.print() } }
查看输出结果
1 2 3 4 5 6 (Hive,1) (Spark,1) (Hadoop,2) (flume,1) (Flink,3) (Flume,1)
流处理程序
在 IDEA 中创建 Maven 项目
导入依赖,与批处理程序相同
编写代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 object StreamWordCountDemo { def main (args: Array [String ]): Unit = { val env = StreamExecutionEnvironment .getExecutionEnvironment val socketDataStream = env.socketTextStream("node01" , 9999 ) val wordWithMark: DataStream [(String , Int )] = socketDataStream.flatMap(_.split(" " )).map(_ -> 1 ) val groupDataStream: KeyedStream [(String , Int ), Tuple ] = wordWithMark.keyBy(0 ) val windowDataStream: WindowedStream [(String , Int ), Tuple , GlobalWindow ] = groupDataStream.countWindow(5 , 3 ) val resultDataStream: DataStream [(String , Int )] = windowDataStream.reduce((x, y) => (x._1, x._2 + y._2)) resultDataStream.print() env.execute() } }
启动程序后,在 node01 的 netcat 端发送单词数据,同时在控制台查看输出结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 发送: spark hive spark spark hive spark hive hive hive hive hive spark spark 接收: 4> (hadoop,1) 1> (spark,1) 1> (hive,1) 4> (flink,1) 1> (hive,1) 1> (spark,1) 3> (spring,1) 4> (hadoop,1) 4> (flink,2) 发送: spark hive spark spark hive spark hive hive hive hive hive spark spark 接收: 1> (spark,3) 1> (hive,3) 1> (hive,5) 1> (spark,5)