0%

Spark快速上手

Spark头图

Spark 是一种基于内存的快速、通用、可扩展的大数据分析计算引擎,能更好地适用于数据挖掘与机器学习等需要迭代的场景。


概述

Spark 是一种基于内存的快速、通用、可扩展的大数据分析计算引擎。Spark 和 MapReduce 的根本差异是多个作业之间的数据通信问题:MapReduce 会在运行完工作后将中间数据存放到磁盘中;Spark 可以将 Job 中间输出结果保存在内存中,从而不再需要读写 HDFS。因此 Spark 能更好地适用于数据挖掘与机器学习等需要迭代的场景。

特点

:与 Hadoop 2.x 的 MapReduce 相比,Spark 基于内存的运算要快 100 倍以上,基于硬盘的运算也要快10倍以上。Spark 实现了高效的 DAG 执行引擎,可以通过内存来高效处理数据流。

易用:Spark 支持 Java、Python 和 Scala 的API,还支持超过 80 种高级算法,使用户可以快速构建不同的应用。而且 Spark 支持交互式的 Python 和 Scala 的 Shell,可以非常方便地在这些 Shell 中使用 Spark 来验证解决问题的方法。

通用:Spark 提供了统一的解决方案。Spark 可以用于批处理、交互式查询(Spark SQL)、实时流处理(Spark Streaming)、机器学习(Spark MLlib)和图计算(GraphX)。这些不同类型的处理都可以在同一个应用中无缝使用。Spark 统一的解决方案非常具有吸引力,毕竟任何公司都想用统一的平台去处理遇到的问题,减少开发和维护的人力成本和部署平台的物力成本。

兼容:Spark 可以非常方便地与其他的开源产品进行融合。比如,Spark 可以使用 Hadoop 的 YARN 和 Apache Mesos 作为它的资源管理和调度器,并且可以处理所有 Hadoop 支持的数据,包括 HDFS、HBase 等,对于已经部署 Hadoop 集群的用户,不需要做任何数据迁移就可以使用 Spark 的强大处理能力。Spark 也可以不依赖于第三方的资源管理和调度器,它实现了 Standalone 作为其内置的资源管理和调度框架,这样进一步降低了 Spark 的使用门槛,使得所有人都可以轻松部署和使用 Spark。

在绝大多数的数据计算场景中,Spark 会比 MapReduce 更有优势。但是 Spark 是基于内存的,在实际的生产环境中,可能会由于内存资源不够导致 Job 执行失败,这种场景下 MapReduce 是一个更好的选择,因此 Spark 并不能完全替代 MapReduce 。

架构

主要模块

Spark主要模块

  • Spark Core:核心模块,提供了 Spark 最基础与最核心的功能,相当于 MapReduce,可以用于进行离线数据分析;Spark Core 包含了对 RDD 的 API 定义,Spark SQL、Spark Streaming、MLlib、GraphX 都是在 Spark Core 的基础上进行扩展的;
  • Spark SQL:用来操作结构化数据的组件,可以使用 SQL 或 HiveQL 来查询数据;支持多种数据源,如 Hive 表、Parquet 以及 JSON 等。
  • Spark Streaming:对实时数据进行流式计算的组件,提供了用来操作数据流的 API,并且与 Spark Core 中的 RDD API 高度对应。
  • MLlib:分布式机器学习框架,MLlib 可以使用许多常见的机器学习和统计算法,简化大规模机器学习。
  • GraphX:分布式图形处理框架,它提供了一组可用于表达图表计算并可以模拟 Pregel 抽象化的 API,GraphX 还对这种抽象化提供了优化运行。
  • 集群管理器:Spark 支持在各种集群管理器上运行,包括 YARN、Mesos,以及 Spark 集群自带的独立调度器。

主要角色

Spark主要角色

  • Master:主节点。接收客户端请求;管理和调度资源 整个集群中最多只有一个 Master 节点处于 Active 状态
  • Worker:从节点。默认会占用该节点的所有资源,会导致资源竞争,从而引起OOM(Out of Memory)
  • Executor:任务的执行者,会将任务按照阶段(stage)来执行
  • Stage:根据 RDD 的依赖关系(宽依赖,窄依赖)进行划分
  • Task:任务执行的一个模块
  • Driver:客户端,SparkContext 驱动

安装

运行模式

Spark 支持多种运行模式:

  • local:本地模式,用于开发测试。
  • StandAlone:独立部署模式,使用 Spark 自带的集群,由 Master 和 Slaver 组成。
  • Spark on YARN:将 Spark 运行在 YARN 集群上,由 YARN 统一管理分配资源,不需要额外构建 Spark 集群,用于生产环境。
  • Spark on Mesos:将 Spark 运行在 Mesos 集群上,由 YARN 统一管理分配资源,不需要额外构建 Spark 集群,用于生产环境。

开发环境

  1. 三台虚拟机
/etc/hosts
1
2
3
192.168.153.100 node01
192.168.153.101 node02
192.168.153.102 node03
  1. 目录结构
1
2
/export/softwares:存放软件安装包
/export/servers:软件安装目录
  1. 软件环境
1
2
zookeeper:zookeeper-3.4.5-cdh5.14.0
hadoop:hadoop-2.6.0-cdh5.14.0

解压

/export/softwares/
1
tar -zxf spark-2.2.0-bin-2.6.0-cdh5.14.0.tgz -C /export/servers/

Local

使用本地线程来模拟 Spark 的程序运行:

  • local[n]:使用 n 个线程来模拟 Spark 的运行;
  • local[*]:使用与运行环境中 CPU 核数相等个线程来模拟 Spark 的运行。

配置

重命名配置文件,local 模式无需修改配置内容:

spark/conf/
1
$ cp spark-env.sh.template spark-env.sh

启动

启动 spark-shell,输入 :quit 可以退出:

spark/bin/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
$ ./spark-shell --master local
...
Spark context Web UI available at http://192.168.153.100:4040
Spark context available as 'sc' (master = local, app id = local-1561281726479).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.2.0
/_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_141)
Type in expressions to have them evaluated.
Type :help for more information.

scala> :quit

测试

执行 Spark 示例程序,使用蒙特卡洛法迭代计算 100 次来求圆周率的值:

spark/bin/
1
2
3
4
5
6
7
$ ./spark-submit \
--class org.apache.spark.examples.SparkPi \
--master local[2] \
--executor-memory 1G \
--total-executor-cores 2 \
/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/examples/jars/spark-examples_2.11-2.2.0.jar \
100

Spark_Local_SparkPi

提交命令参数信息详见 #提交任务

蒙特卡洛法求圆周率:让计算机每次随机生成两个0到1之间的数,看以这两个实数为横纵坐标的点是否在单位圆内。生成一系列随机点,统计单位圆内的点数与总点数

StandAlone

构建一个由 Master 和 Slaver 组成的 Spark 集群。

配置

  1. node01 修改 spark-env.sh
spark/conf/
1
2
3
4
5
6
$ cp spark-env.sh.template spark-env.sh
$ vim spark-env.sh
export JAVA_HOME=/usr/java/jdk1.8.0_211
export SPARK_MASTER_HOST=node01
export SPARK_MASTER_PORT=7077
export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=4000 -Dspark.history.retainedApplications=3 -Dspark.history.fs.logDirectory=hdfs://node01:8020/spark_log"
  1. node01 修改 slaves
spark/conf/
1
2
3
4
$ cp slaves.template slaves
$ vim slaves
node02
node03
  1. node01 修改 spark-defaults.conf
spark/conf/
1
2
3
4
5
$ cp spark-defaults.conf.template spark-defaults.conf
$ vim spark-defaults.conf
spark.eventLog.enabled true
spark.eventLog.dir hdfs://node01:8020/spark_log
spark.eventLog.compress true

为了方便 Spark 程序的开发调试,一般都会配置 Spark 的运行日志,将 Spark 程序的运行日志保存到 HDFS 上。在 HDFS 上创建日志文件存放目录:

1
$ hdfs dfs -mkdir /spark_log

分发

将 node01 上配置好 StandAlone 模式的 Spark 分发到 node02 和 node03:

/app/
1
2
$ scp -r spark/ node02:$PWD
$ scp -r spark/ node03:$PWD

启动

node01 执行以下命令启动 Spark:

spark/sbin
1
2
$ ./start-all.sh
$ ./start-history-server.sh

浏览器访问 Spark: http://node01:8080/
查看 Spark 任务的历史日志: http://node01:4000/

node01 执行以下命令进入 spark-shell:

spark/bin/
1
$ ./spark-shell --master spark://node01:7077

测试

执行 Spark 示例程序,使用蒙特卡洛法迭代计算100次来求圆周率的值:

spark/bin/
1
2
3
4
5
6
7
$ ./spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://node01:7077 \
--executor-memory 1G \
--total-executor-cores 2 \
/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/examples/jars/spark-examples_2.11-2.2.0.jar \
100

停止

node01 执行以下命令停止 Spark 集群:

spark/
1
2
$ sbin/stop-all.sh
$ sbin/stop-history-server.sh

StandAlone HA

构建一个由 Master 和 Slaver 组成的 Spark 集群,并且配置为高可用(HA)来解决 master 单节点故障问题,需要借助 Zookeeper 实现自动切换。

配置

  1. node01 修改 spark-env.sh
spark/conf/spark-env.sh
1
2
3
4
5
export JAVA_HOME=/usr/java/jdk1.8.0_141
- export SPARK_MASTER_HOST=node01
export SPARK_MASTER_PORT=7077
export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=4000 -Dspark.history.retainedApplications=3 -Dspark.history.fs.logDirectory=hdfs://node01:8020/spark_log"
+ export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=node01:2181,node02:2181,node03:2181 -Dspark.deploy.zookeeper.dir=/spark"
  1. node01 修改 slaves
spark/conf/
1
2
3
4
$ cp slaves.template slaves
$ vim slaves
node02
node03
  1. node01 修改 spark-defaults.conf
spark/conf/
1
2
3
4
5
$ cp spark-defaults.conf.template spark-defaults.conf
$ vim spark-defaults.conf
spark.eventLog.enabled true
spark.eventLog.dir hdfs://node01:8020/spark_log
spark.eventLog.compress true

在 HDFS 上创建日志文件存放目录:

1
$ hdfs dfs -mkdir /spark_log

分发

将 node01 将 Spark 分发到 node02 和 node03:

/app/
1
2
$ scp -r spark/ node02:$PWD
$ scp -r spark/ node03:$PWD

启动

  1. 编写脚本启动 Zookeeper 集群:
spark/startZkServers.sh
1
2
3
4
5
6
7
#!/bin/bash
echo "启动zookeeper集群中..."
for host in node01 node02 node03
do
ssh -q $host "source /etc/profile; /app/zookeeper/bin/zkServer.sh start"
done
echo "启动完成..."
  1. 在 node01 上启动 Spark 集群和历史服务:
spark/
1
2
$ sbin/start-all.sh
$ sbin/start-history-server.sh
  1. node02 启动第二个 master 节点:
spark/
1
$ sbin/start-master.sh

浏览器页面访问: http://node01:8080/ http://node02:8080/

ALIVE
STANDBY

node01 执行以下命令进入spark-shell:

spark/bin/
1
$ ./spark-shell --master spark://node01:7077,node02:7077

测试

执行 spark 示例程序,使用蒙特卡洛法迭代计算100次来求圆周率的值:

spark/bin/
1
2
3
4
5
6
7
$ ./spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://node01:7077,node02:7077 \
--executor-memory 1G \
--total-executor-cores 2 \
/app/spark/examples/jars/spark-examples_2.11-2.2.0.jar \
100

Spark on YARN

Spark on YARN 模式官方文档说明: http://spark.apache.org/docs/latest/running-on-yarn.html

在生产环境中,需要将 Spark 运行在 YARN 集群上,使用 YARN 来管理所有计算资源,不用额外构建 Spark 集群。只需在任意一台机器配置 Spark 客户端(Driver),用于提交任务到 YARN 集群即可。

配置

  1. 如果虚拟机提供给 YARN 集群的资源不够,可以在三台服务器的 yarn-site.xml 当中添加以下两个配置来跳过 YARN 集群资源的检查,然后重启 YARN 集群:
hadoop/etc/hadoop/yarn-site.xml
1
2
3
4
5
6
7
8
<property>
<name>yarn.nodemanager.pmem-check-enabled</name>
<value>false</value>
</property>
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>

在 node01 上重启 YARN 集群:

1
stop-yarn.sh && start-yarn.sh
  1. 任意一台安装 Spark 的服务器修改 spark-env.sh 即可用于提交任务到 YARN:
spark/conf/spark-env.sh
1
2
3
4
5
6
+ HADOOP_CONF_DIR=/app/hadoop/etc/hadoop
+ YARN_CONF_DIR=/app/hadoop/etc/hadoop
export JAVA_HOME=/usr/java/jdk1.8.0_141
export SPARK_MASTER_PORT=7077
export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=4000 -Dspark.history.retainedApplications=3 -Dspark.history.fs.logDirectory=hdfs://node01:8020/spark_log"
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=node01:2181,node02:2181,node03:2181 -Dspark.deploy.zookeeper.dir=/spark"

测试

提交任务到 YARN 集群,可以选择两种部署模式(deploy-mode ):客户端 client 和集群 cluster。默认为 client模式,此时提交任务的服务器作为 Driver 端;但生产中通常选择 cluster 模式,让 Driver 端是运行在 NodeManager 上。

client模式

Spark_onYARN_client

spark/bin/
1
2
3
4
5
6
$ ./spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode client \
/app/spark/examples/jars/spark-examples_2.11-2.2.0.jar \
10

client 模式下控制台会打印以下内容:

Spark_onYARN_SparkPi_client

cluster模式

Spark_onYARN_cluster

spark/bin/
1
2
3
4
5
6
7
8
9
10
$ ./spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
--driver-memory 4g \
--executor-memory 2g \
--executor-cores 1 \
--queue thequeue \
/app/spark/examples/jars/spark-examples_2.11-2.2.0.jar \
10

cluster 模式下控制台会打印以下内容:

Spark_onYARN_SparkPi

需要通过给出的追踪页面中查看任务的执行情况:

Spark_onYARN_SparkPi_trackingURL

点击 logs 打开日志,查看计算结果:

Spark_onYARN_SparkPi

如果查看 logs 时出现问题,比如出现下图所示的错误:

log server problem

尝试以下方式来解决:

  1. add to mapred-site.xml:
mapred-site.xml
1
2
3
4
5
6
7
8
9
<property>
<name>mapreduce.jobhistory.address</name>
<value>node01:10020</value>
</property>

<property>
<name>mapreduce.jobhistory.webapp.address</name>
<value>node01:19888</value>
</property>
  1. add to yarn-site.xml:
yarn-site.xml
1
2
3
4
<property>
<name>yarn.log.server.url</name>
<value>http://node01:19888/jobhistory/logs</value>
</property>
  1. start history server with mr-jobhistory-daemon.sh start historyserver

提交任务

将应用打包好,就可以使用 spark-submit 提交任务了。在提交任务时可以设置 Spark 使用的 classpath 和依赖,支持多种集群管理器和部署模式:

spark/bin/
1
2
3
4
5
6
7
8
$ ./spark-submit \
--class <main-class>
--master <master-url> \
--deploy-mode <deploy-mode> \
--conf <key=value> \
... # other options
<application-jar> \
[application-arguments]

常用选项:

  • --class <main-class>:要启动的类的全路径(如org.apache.spark.examples.SparkPi)
  • --master <master-url>:集群的 master URL (如spark://node01:7077)
  • --deploy-mode <deploy-mode>:发布到worker节点(cluster) 或者作为一个本地客户端 (client)默认为 client
  • --conf <key=value>:任意的Spark配置属性, 格式key=value. 如果 value 中包含空格,可以加引号“key=value”
  • <application-jar>:打包好的应用jar,包含依赖。这个jar需要被集群全局可见,如在hdfs://共享存储系统中,如果是file://path,那么所有的节点的路径下都应包含同样的jar
  • <application-arguments>:传给main()方法的参数

master-url可以是以下格式:

参数 说明
local 本地以一个worker线程运行
local[K] 本地以 K 个 worker 线程 (理想情况下, K设置为你机器的CPU核数)
local[*] 本地以等于CPU核数的线程运行
spark://HOST:PORT 连接到指定的 Spark 集群。PORT为master集群配置的端口,默认为7077
mesos://HOST:PORT 连接到指定的 Mesos 集群。PORT为 mesos 端口,默认为5050。如果Mesos使用ZooKeeper,格式为mesos://zk://...
yarn-client 以client模式连接到 YARN cluster。集群的位置基于HADOOP_CONF_DIR 变量找到
yarn-cluster 以cluster模式连接到 YARN cluster。集群的位置基于HADOOP_CONF_DIR 变量找到

其他可能用到的参数:

  • --driver-memory MEM:Driver 程序使用内存大小
  • --executor-memory MEM:executor 内存大小,默认1G
  • --driver-cores NUM:Driver 程序使用的 CPU 个数,仅限于 Spark Alone 模式
  • --total-executor-cores NUM:executor 使用的总核数,仅限于 Spark Alone、Spark on Mesos 模式
  • –-executor-cores NUM:每个 executor 使用的内核数,默认为1,仅限于 Spark on YARN 模式
  • –-queue QUEUE_NAME:提交应用程序给哪个 YARN 的队列,默认是 default 队列,仅限于 Spark on YARN 模式
  • –-num-executors NUM:启动的 executor 数量,默认是 2 个,仅限于 Spark on YARN 模式

提交到 Local:

1
2
3
4
5
6
7
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master local[2] \
--executor-memory 1G \
--total-executor-cores 2 \
/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/examples/jars/spark-examples_2.11-2.2.0.jar \
100

提交到 HA:

1
2
3
4
5
6
7
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://node01:7077,node02:7077 \
--executor-memory 1G \
--total-executor-cores 2 \
/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/examples/jars/spark-examples_2.11-2.2.0.jar \
100

提交到 YARN:

1
2
3
4
5
6
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/examples/jars/spark-examples_2.11-2.2.0.jar \
100

词频统计程序示例

Spark Shell

通过 Spark Shell 编写并运行一个词频统计程序,使用 Spark 的本地模式统计本地文件中单词出现的次数并按次数倒序排序。创建 word.txt 内容如下:

/data/word.txt
1
2
3
hello sannaha
hi sannaha
hello world
  1. 运行 spark-shell 启动本地模式,使用 2 个线程执行任务:
spark/bin/
1
./spark-shell --master local[2]
  1. 使用 Scala 语言编写词频统计代码,计算并查看结果:
1
2
3
4
5
scala> sc.textFile("file:///data/word.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortBy(-_._2).collect().foreach(println)
(sannaha,2)
(hello,2)
(world,1)
(hi,1)

说明:

  • sc:spark-shell 启动时已经初始化 SparkContext 类对象 sc
  • textFile:读取文件生成 RDD;
  • flatMap:对文件数据进行压平并按照空格切分;
  • map:对每个单词标记一个数字 1,生成对偶元组 (word,1)
  • reduceByKey:按照单词进行聚合,并对出现的次数进行累加;
  • sordBy:按照出现次数倒序排序;
  • collect触发任务的执行,收集结果数据;
  • foreach:遍历结果并打印。

Scala

Scala 版本的词频统计程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
* scala 版本的词频统计程序
* 需求:传递两个参数,一个参数是源文件的路径,一个参数是数据结果路径
*/
object ScalaWordCount {
def main(args: Array[String]): Unit = {
/**
* SparkContect是我们的程序入口,如果想开发SparkCore应用程序首先需要创建SparkContext对象
* 主要是实例化了两个对象:
* 1)DAGScheduler:构建有向无环图
* 2)TaskScheduler:将任务递交到集群
*/
val conf: SparkConf = new SparkConf().setAppName("ScalaWordCount").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)

//将传入的参数赋值给遍历
//方法一
val inputPath = args(0)
val outputPath = args(1)
//方法二
//val Array(inputPath,outputPath)=args

//通过 sc 读取数据,指定数据源
val lines: RDD[String] = sc.textFile(inputPath)

//将读取到的每行数据按空格切分,得到单词集合,过滤掉内容为空的,压平
val words: RDD[String] = lines.flatMap(_.split(" ")).filter(!_.isEmpty)

//每个单词记一次数,组成一个对偶元组
val wordWithMark: RDD[(String, Int)] = words.map((_, 1))

//以单词为 key 进行分组聚合,对出现次数进行累加
val reduced: RDD[(String, Int)] = wordWithMark.reduceByKey((x, y) => x + y)

//根据出现次数倒序排序
val sorted: RDD[(String, Int)] = reduced.sortBy(_._2, false)

//将数据保存到指定路径
val result: Array[(String, Int)] = sorted.collect()
result.foreach(println(_))
sorted.saveAsTextFile(outputPath)

//释放资源
sc.stop()
}
}

在 IDEA 运行 Scala 程序可以在此处设置 main() 方法需要的参数,多个参数需要使用空格隔开:

WordCount程序参数设置

Java

Java 版本的词频统计程序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
public class JavaWordCount {
public static void main(String[] args) {
/**
* 1:创建SparkContext对象
* 2:指定读取数据源
* 3:对读取到的数据源进行算子操作
* 4:将计算好的结果输出到指定目录
*/
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("JavaWordCount");
JavaSparkContext jsc = new JavaSparkContext(conf);

//通过 jsc 读取数据,指定数据源
JavaRDD<String> lines = jsc.textFile("data/word.txt");

//压平
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String line) throws Exception {
return Arrays.asList(line.split(" ")).iterator();
}
});

//过滤掉空字段
JavaRDD<String> filtered = words.filter(new Function<String, Boolean>() {
@Override
public Boolean call(String s) throws Exception {
return !s.isEmpty();
}
});

//每个单词记一次数
JavaPairRDD<String, Integer> wordWithMark = filtered.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String word) throws Exception {
return new Tuple2<>(word, 1);
}
});

//分组聚合
JavaPairRDD<String, Integer> reduced = wordWithMark.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer count1, Integer count2) throws Exception {
return count1 + count2;
}
});

/**
* 根据出现次数倒序排序(Java 只有 sortByKey() 方法)
* 1. 交换 key-value
* 2. 根据 key 进行排序
* 3. 再次交换 key-value
*/
//1. 交换 key-value
JavaPairRDD<Integer, String> swapped1 = reduced.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
@Override
public Tuple2<Integer, String> call(Tuple2<String, Integer> tuple2) throws Exception {
//return new Tuple2<>(tuple2._2, tuple2._1);
return tuple2.swap();
}
});

//2. 根据 key 进行排序
JavaPairRDD<Integer, String> sorted = swapped1.sortByKey(false);

//3. 再次交换 key-value
JavaPairRDD<String, Integer> swapped2 = (JavaPairRDD<String, Integer>) sorted.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Tuple2<Integer, String> tuple2) throws Exception {
return tuple2.swap();
}
});

//将数据收集到 Driver 端
List<Tuple2<String, Integer>> result = swapped2.collect();

//打印
for (Tuple2<String, Integer> tuple2 : result) {
System.out.println(tuple2);
}

//释放资源
jsc.stop();
}
}

Java Lambda

使用 JDK8 支持的 Lambda 表达式编写词频统计程序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class LambdaWordCount {
public static void main(String[] args) {
/**
* 1:创建SparkContext对象
* 2:指定读取数据源
* 3:对读取到的数据源进行算子操作
* 4:将计算好的结果输出到指定目录
*/
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("JavaWordCount");
JavaSparkContext jsc = new JavaSparkContext(conf);

//通过 jsc 读取数据,指定数据源
JavaRDD<String> lines = jsc.textFile("data/word.txt");

//压平
JavaRDD<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());

//过滤掉空字段
JavaRDD<String> filtered = words.filter(word -> !word.isEmpty());

//每个单词记一次数
JavaPairRDD<String, Integer> wordWithMark = filtered.mapToPair(word -> new Tuple2<>(word, 1));

//分组聚合
JavaPairRDD<String, Integer> reduced = wordWithMark.reduceByKey((x, y) -> (x + y));

/**
* 根据出现次数倒序排序(Java 只有 sortByKey() 方法)
* 1. 交换 key-value
* 2. 根据 key 进行排序
* 3. 再次交换 key-value
*/
//1. 交换 key-value
JavaPairRDD<Integer, String> swapped1 = reduced.mapToPair(tuple2 -> tuple2.swap());

//2. 根据 key 进行排序
JavaPairRDD<Integer, String> sorted = swapped1.sortByKey(false);

//3. 再次交换 key-value
JavaPairRDD<String, Integer> swapped2 = sorted.mapToPair(tuple2 -> tuple2.swap());

//将数据收集到 Driver 端
List<Tuple2<String, Integer>> result = swapped2.collect();

//打印
for (Tuple2<String, Integer> tuple2 : result) {
System.out.println(tuple2);
}

//释放资源
jsc.stop();
}
}
  • 本文作者: SANNAHA
  • 本文链接: https://sannaha.moe/Spark/
  • 版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!