0%

Spark快速上手

Spark头图

Spark 是专为大规模数据处理而设计的快速通用的计算引擎。对比 Hadoop 的 MapReduce 会在运行完工作后将中间数据存放到磁盘中,Spark 可以将 Job 中间输出结果保存在内存中,从而不再需要读写 HDFS,因此 Spark 能更好地适用于数据挖掘与机器学习等需要迭代的 MapReduce 的算法。Spark 是在 Scala 语言中实现的,它将 Scala 用作其应用程序框架。


概述

Spark 是专为大规模数据处理而设计的快速通用的计算引擎。对于 Hadoop 的 MapReduce 会在运行完工作后将中间数据存放到磁盘中,Spark 可以将 Job 中间输出结果保存在内存中,从而不再需要读写 HDFS,因此 Spark 能更好地适用于数据挖掘与机器学习等需要迭代的 MapReduce 的算法。Spark 是在 Scala 语言中实现的,它将 Scala 用作其应用程序框架。

Spark 是MapReduce的替代方案,而且兼容HDFS、Hive,可融入Hadoop的生态系统,以弥补MapReduce的不足。

特点

:与 Hadoop 2.x 的 MapReduce 相比,Spark基于内存的运算要快100倍以上,基于硬盘的运算也要快10倍以上。Spark实现了高效的DAG执行引擎,可以通过基于内存来高效处理数据流。

易用:Spark支持Java、Python和Scala的API,还支持超过80种高级算法,使用户可以快速构建不同的应用。而且Spark支持交互式的Python和Scala的shell,可以非常方便地在这些shell中使用Spark集群来验证解决问题的方法。

通用:Spark提供了统一的解决方案。Spark可以用于批处理、交互式查询(Spark SQL)、实时流处理(Spark Streaming)、机器学习(Spark MLlib)和图计算(GraphX)。这些不同类型的处理都可以在同一个应用中无缝使用。Spark统一的解决方案非常具有吸引力,毕竟任何公司都想用统一的平台去处理遇到的问题,减少开发和维护的人力成本和部署平台的物力成本。

兼容:Spark可以非常方便地与其他的开源产品进行融合。比如,Spark可以使用Hadoop的YARN和Apache Mesos作为它的资源管理和调度器,并且可以处理所有Hadoop支持的数据,包括HDFS、HBase和Cassandra等。这对于已经部署Hadoop集群的用户特别重要,因为不需要做任何数据迁移就可以使用Spark的强大处理能力。Spark也可以不依赖于第三方的资源管理和调度器,它实现了Standalone作为其内置的资源管理和调度框架,这样进一步降低了Spark的使用门槛,使得所有人都可以非常容易地部署和使用Spark。

架构

主要模块

Spark主要模块

  • Spark Core:核心模块,用于进行离线数据分析,相当于 MapReduce;包含了对 RDD 的 API 定义。
  • Spark SQL:用来操作结构化数据的程序包,可以使用 SQL 或者 HiveQL 来查询数据;支持多种数据源,如 Hive 表、Parquet 以及 JSON 等。
  • Spark Streaming:对实时数据进行流式计算的组件,提供了用来操作数据流的 API,并且与 Spark Core 中的 RDD API 高度对应。
  • Spark MLlib:分布式机器学习框架,MLlib 可以使用许多常见的机器学习和统计算法,简化大规模机器学习。
  • Spark GraphX:分布式图形处理框架,它提供了一组可用于表达图表计算并可以模拟 Pregel 抽象化的 API,GraphX 还对这种抽象化提供了优化运行。
  • 集群管理器:Spark 支持在各种集群管理器上运行,包括 Hadoop YARN、Apache Mesos,以及 Spark 集群自带的独立调度器。

主要角色

Spark主要角色

  • Master:主节点。接收客户端请求;管理和调度资源 整个集群中最多只有一个 Master 节点处于 Active 状态
  • Worker:从节点。默认会占用该节点的所有资源,会导致资源竞争,从而引起OOM(Out of Memory)
  • Executor:任务的执行者,会将任务按照阶段(stage)来执行
  • Stage:根据 RDD 的依赖关系(宽依赖,窄依赖)进行划分
  • Task:任务执行的一个模块
  • Driver:客户端,SparkContext 驱动

安装

运行模式

Spark 支持多种运行模式:

  • local:本地模式,用于开发测试。
  • StandAlone:独立部署模式,使用 Spark 自带的集群,由 Master 和 Slaver 组成。
  • Spark on YARN:将 Spark 运行在 YARN 集群上,由 YARN 统一管理分配资源,不需要额外构建 Spark 集群,用于生产环境。
  • Spark on Mesos:将 Spark 运行在 Mesos 集群上,由 YARN 统一管理分配资源,不需要额外构建 Spark 集群,用于生产环境。

开发环境

  1. 三台虚拟机
/etc/hosts
1
2
3
192.168.153.100 node01
192.168.153.101 node02
192.168.153.102 node03
  1. 目录结构
1
2
/export/softwares:存放软件安装包
/export/servers:软件安装目录
  1. 软件环境
1
2
zookeeper:zookeeper-3.4.5-cdh5.14.0
hadoop:hadoop-2.6.0-cdh5.14.0

解压

/export/softwares/
1
tar -zxf spark-2.2.0-bin-2.6.0-cdh5.14.0.tgz -C /export/servers/

Local

使用本地线程来模拟 Spark 的程序运行:

  • local[k]:使用 k 个线程来模拟 Spark 的运行
  • local[*]:使用与 CPU 核数相等个线程来模拟 Spark 的运行

配置

修改配置文件名,local 模式无需修改内容:

/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/conf/
1
cp spark-env.sh.template spark-env.sh

启动

启动验证,启动成功后输入:quit退出 Spark shell 客户端:

/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# bin/spark-shell --master local
...
Spark context Web UI available at http://192.168.153.100:4040
Spark context available as 'sc' (master = local, app id = local-1561281726479).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.2.0
/_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_141)
Type in expressions to have them evaluated.
Type :help for more information.

scala> :quit

测试

执行 Spark 示例程序,使用蒙特卡洛法迭代计算 100 次来求圆周率的值:

/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/
1
2
3
4
5
6
7
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master local[2] \
--executor-memory 1G \
--total-executor-cores 2 \
/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/examples/jars/spark-examples_2.11-2.2.0.jar \
100

Spark_Local_SparkPi

提交命令参数信息详见 #提交任务

蒙特卡洛法求圆周率:让计算机每次随机生成两个0到1之间的数,看以这两个实数为横纵坐标的点是否在单位圆内。生成一系列随机点,统计单位圆内的点数与总点数

StandAlone

构建一个由 Master 和 Slaver 组成的 Spark 集群。

修改配置文件

  1. node01 修改 spark-env.sh
/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/conf/
1
2
3
4
5
6
# cp spark-env.sh.template spark-env.sh
# vim spark-env.sh
export JAVA_HOME=/usr/java/jdk1.8.0_211
export SPARK_MASTER_HOST=node01
export SPARK_MASTER_PORT=7077
export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=4000 -Dspark.history.retainedApplications=3 -Dspark.history.fs.logDirectory=hdfs://node01:8020/spark_log"
  1. node01 修改 slaves
/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/conf/
1
2
3
4
# cp slaves.template slaves
# vim slaves
node02
node03
  1. node01 修改 spark-defaults.conf
/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/conf/
1
2
3
4
5
# cp spark-defaults.conf.template spark-defaults.conf
# vim spark-defaults.conf
spark.eventLog.enabled true
spark.eventLog.dir hdfs://node01:8020/spark_log
spark.eventLog.compress true

为了方便 Spark 程序的开发调试,一般都会配置 Spark 的运行日志,将 Spark 程序的运行日志保存到 HDFS 上。在 HDFS 上创建日志文件存放目录:

1
hdfs dfs -mkdir -p /spark_log

分发 Spark

将 node01 上配置好 StandAlone 模式的 Spark 分发到 node02 和 node03:

/export/servers/
1
2
scp -r spark-2.2.0-bin-2.6.0-cdh5.14.0/ node02:$PWD
scp -r spark-2.2.0-bin-2.6.0-cdh5.14.0/ node03:$PWD

启动

node01 执行以下命令启动 Spark 程序:

1
2
3
cd /export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0
sbin/start-all.sh
sbin/start-history-server.sh

浏览器页面访问 Spark: http://node01:8080/
查看 Spark 任务的历史日志: http://node01:4000/

node01 执行以下命令进入 spark-shell:

/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/
1
bin/spark-shell --master spark://node01:7077

测试

执行 Spark 示例程序,使用蒙特卡洛法迭代计算100次来求圆周率的值:

/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/
1
2
3
4
5
6
7
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://node01:7077 \
--executor-memory 1G \
--total-executor-cores 2 \
/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/examples/jars/spark-examples_2.11-2.2.0.jar \
100

停止

node01 执行以下命令停止 Spark 集群:

/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/
1
2
sbin/stop-all.sh
sbin/stop-history-server.sh

StandAlone HA

构建一个由 Master 和 Slaver 组成的 Spark 集群,并且配置为高可用(HA)来解决 master 单节点故障问题,需要借助 Zookeeper 实现自动切换。

配置

  1. node01 服务器修改spark-env.sh
1
2
3
4
5
6
7
# cd /export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/conf
# vim spark-env.sh
export JAVA_HOME=/usr/java/jdk1.8.0_141
- export SPARK_MASTER_HOST=node01
export SPARK_MASTER_PORT=7077
export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=4000 -Dspark.history.retainedApplications=3 -Dspark.history.fs.logDirectory=hdfs://node01:8020/spark_log"
+ export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=node01:2181,node02:2181,node03:2181 -Dspark.deploy.zookeeper.dir=/spark"
  1. node01 修改 slaves(在 StandAlone 模式已经修改过)
/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/conf/
1
2
3
4
# cp slaves.template slaves
# vim slaves
node02
node03
  1. node01 修改spark-defaults.conf(在 StandAlone 模式已经修改过)

为了方便 Spark 程序的开发调试,一般都会配置 Spark 的运行日志,将 Spark 程序的运行日志保存到 HDFS 上:

/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/conf/
1
2
3
4
5
6
# cd /export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/conf
# cp spark-defaults.conf.template spark-defaults.conf
# vim spark-defaults.conf
spark.eventLog.enabled true
spark.eventLog.dir hdfs://node01:8020/spark_log
spark.eventLog.compress true

在 HDFS 上创建日志文件存放目录(在 StandAlone 模式已经创建过)

1
hdfs dfs -mkdir -p /spark_log

分发

在 node01 上将 Spark 分发到 node02 和 node03:

1
2
3
cd /export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/conf
scp spark-env.sh node02:$PWD
scp spark-env.sh node03:$PWD

启动

  1. 使用脚本启动 Zookeeper 集群:
/export/servers/zookeeper-3.4.5-cdh5.14.0/startZkServers.sh
1
2
3
4
5
6
7
#!/bin/bash
echo "启动zookeeper集群中..."
for host in node01 node02 node03
do
ssh -q $host "source /etc/profile; /export/servers/zookeeper-3.4.5-cdh5.14.0/bin/zkServer.sh start"
done
echo "启动完成..."
  1. 在 node01 上启动 Spark 集群
1
2
3
cd /export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0
sbin/start-all.sh
sbin/start-history-server.sh
  1. node02 启动第二个 master 节点
1
2
cd /export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0
sbin/start-master.sh

浏览器页面访问: http://node01:8080/ http://node02:8080/

ALIVE
STANDBY

node01 执行以下命令进入spark-shell:

/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/
1
bin/spark-shell --master spark://node01:7077,node02:7077

测试

执行 spark 示例程序,使用蒙特卡洛法迭代计算100次来求圆周率的值:

/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/
1
2
3
4
5
6
7
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://node01:7077,node02:7077 \
--executor-memory 1G \
--total-executor-cores 2 \
/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/examples/jars/spark-examples_2.11-2.2.0.jar \
100

Spark on YARN

Spark on YARN 模式官方文档说明: http://spark.apache.org/docs/latest/running-on-yarn.html

在生产环境中,需要将 Spark 运行在 YARN 集群上,使用 YARN 来管理所有计算资源,不用额外构建 Spark 集群。只需在任意一台机器配置 Spark 客户端(Driver),用于提交任务到 YARN 集群即可。

配置

  1. 如果 YARN 集群资源不够,可以在三台服务器的yarn-site.xml当中添加以下两个配置来跳过 YARN 集群资源的检查,然后重启 YARN 集群。
/export/servers/hadoop-2.6.0-cdh5.14.0/etc/hadoop/yarn-site.xml
1
2
3
4
5
6
7
8
<property>
<name>yarn.nodemanager.pmem-check-enabled</name>
<value>false</value>
</property>
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>

在 node01 上重启 YARN 集群:

1
stop-yarn.sh && start-yarn.sh
  1. 任意找一台机器修改spark-env.sh
/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/conf/spark-env.sh
1
2
3
4
5
6
+ HADOOP_CONF_DIR=/export/servers/hadoop-2.6.0-cdh5.14.0/etc/hadoop
+ YARN_CONF_DIR=/export/servers/hadoop-2.6.0-cdh5.14.0/etc/hadoop
export JAVA_HOME=/usr/java/jdk1.8.0_141
export SPARK_MASTER_PORT=7077
export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=4000 -Dspark.history.retainedApplications=3 -Dspark.history.fs.logDirectory=hdfs://node01:8020/spark_log"
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=node01:2181,node02:2181,node03:2181 -Dspark.deploy.zookeeper.dir=/spark"

测试

提交任务到 YARN 集群,deploy-mode 可以选择两种模式:clientcluster。默认为 client模式,实际使用时通常选择 cluster,此时 Driver 端是运行在 NodeManager 上。

执行 Spark 示例程序,使用蒙特卡洛法迭代计算 10 次来求圆周率的值:

  • client 模式

Spark_onYARN_client

/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/
1
2
3
4
5
6
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode client \
/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/examples/jars/spark-examples_2.11-2.2.0.jar \
10

如果 deploy-modeclient,控制台会打印以下内容:

Spark_onYARN_SparkPi_client

  • cluster 模式

Spark_onYARN_cluster

1
2
3
4
5
6
7
8
9
10
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
--driver-memory 4g \
--executor-memory 2g \
--executor-cores 1 \
--queue thequeue \
/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/examples/jars/spark-examples_2.11-2.2.0.jar \
10

如果 deploy-modecluster,控制台会打印以下内容:

Spark_onYARN_SparkPi

通过给出的追踪页面中查看任务的执行情况:

Spark_onYARN_SparkPi_trackingURL

点击 logs 打开日志,查看计算结果:

Spark_onYARN_SparkPi

如果查看 logs 时出现问题,比如出现下图所示的错误:

log server problem

尝试以下步骤:

  1. add to mapred-site.xml
mapred-site.xml
1
2
3
4
5
6
7
8
9
<property>
<name>mapreduce.jobhistory.address</name>
<value>node01:10020</value>
</property>

<property>
<name>mapreduce.jobhistory.webapp.address</name>
<value>node01:19888</value>
</property>
  1. add to yarn-site.xml
yarn-site.xml
1
2
3
4
<property>
<name>yarn.log.server.url</name>
<value>http://node01:19888/jobhistory/logs</value>
</property>
  1. start history server with mr-jobhistory-daemon.sh start historyserver

提交任务

一旦将应用打包好,就可以使用bin/spark-submit脚本启动应用了。这个脚本负责设置 Spark 使用的 classpath 和依赖,支持不同类型的集群管理器和发布模式:

/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/
1
2
3
4
5
6
7
8
bin/spark-submit \
--class <main-class>
--master <master-url> \
--deploy-mode <deploy-mode> \
--conf <key=value> \
... # other options
<application-jar> \
[application-arguments]

常用选项:

  • --class <main-class>:要启动的类的全路径(如org.apache.spark.examples.SparkPi)
  • --master <master-url>:集群的 master URL (如spark://node01:7077)
  • --deploy-mode <deploy-mode>:发布到worker节点(cluster) 或者作为一个本地客户端 (client)默认为 client
  • --conf <key=value>:任意的Spark配置属性, 格式key=value. 如果 value 中包含空格,可以加引号“key=value”
  • <application-jar>:打包好的应用jar,包含依赖。这个jar需要被集群全局可见,如在hdfs://共享存储系统中,如果是file://path,那么所有的节点的路径下都应包含同样的jar
  • <application-arguments>:传给main()方法的参数

master-url可以是以下格式:

参数 说明
local 本地以一个worker线程运行
local[K] 本地以 K 个 worker 线程 (理想情况下, K设置为你机器的CPU核数)
local[*] 本地以等于CPU核数的线程运行
spark://HOST:PORT 连接到指定的 Spark 集群。PORT为master集群配置的端口,默认为7077
mesos://HOST:PORT 连接到指定的 Mesos 集群。PORT为 mesos 端口,默认为5050。如果Mesos使用ZooKeeper,格式为mesos://zk://...
yarn-client 以client模式连接到 YARN cluster。集群的位置基于HADOOP_CONF_DIR 变量找到
yarn-cluster 以cluster模式连接到 YARN cluster。集群的位置基于HADOOP_CONF_DIR 变量找到

其他可能用到的参数:

  • --driver-memory MEM:Driver 程序使用内存大小
  • --executor-memory MEM:executor 内存大小,默认1G
  • --driver-cores NUM:Driver 程序使用的 CPU 个数,仅限于 Spark Alone 模式
  • --total-executor-cores NUM:executor 使用的总核数,仅限于 Spark Alone、Spark on Mesos 模式
  • –-executor-cores NUM:每个 executor 使用的内核数,默认为1,仅限于 Spark on YARN 模式
  • –-queue QUEUE_NAME:提交应用程序给哪个 YARN 的队列,默认是 default 队列,仅限于 Spark on YARN 模式
  • –-num-executors NUM:启动的 executor 数量,默认是 2 个,仅限于 Spark on YARN 模式

提交到 Local:

1
2
3
4
5
6
7
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master local[2] \
--executor-memory 1G \
--total-executor-cores 2 \
/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/examples/jars/spark-examples_2.11-2.2.0.jar \
100

提交到 HA:

1
2
3
4
5
6
7
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://node01:7077,node02:7077 \
--executor-memory 1G \
--total-executor-cores 2 \
/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/examples/jars/spark-examples_2.11-2.2.0.jar \
100

提交到 YARN:

1
2
3
4
5
6
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/examples/jars/spark-examples_2.11-2.2.0.jar \
100

Spark Shell

目标:通过Spark Shell编写一个词频统计程序,使用Spark的本地模式统计本地的文件中单词出现的次数并按次数倒序排序

准备工作:创建word.txt用作词频统计的数据源,内容如下:

/export/data/word.txt
1
2
3
hadoop spark zookeeper
spark scala
hadoop impala
  1. 运行spark-shell --master local[2],启动Spark的本地模式,使用2个线程跑任务,只运行一个SparkSubmit进程
/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/
1
bin/spark-shell --master local[2]
  1. 编写scala词频统计代码,查看结果
1
2
scala> sc.textFile("file:///export/data/words.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortBy(-_._2).collect
res0: Array[(String, Int)] = Array((spark,2), (hadoop,2), (scala,1), (zookeeper,1), (impala,1))

代码说明:

  • sc:Spark-Shell中已经默认将SparkContext类初始化为对象sc,使用时直接调用sc即可
  • textFile:读取文件
  • flatMap:对文件中的每一行数据进行压平切分,这里按照空格分隔
  • map:对每个单词标记一个数字1,生成对偶元组(word,1)
  • reduceByKey:对相同的单词出现的次数进行累加
  • collect:触发任务执行,收集结果数据

简单的Spark程序

Scala

Scala 版本的词频统计程序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
* scala 版本的词频统计程序
* 需求:传递两个参数,一个参数是源文件的路径,一个参数是数据结果路径
*/
object ScalaWordCount {
def main(args: Array[String]): Unit = {
/**
* SparkContect是我们的程序入口,如果想开发SparkCore应用程序首先需要创建SparkContext对象
* 主要是实例化了两个对象:
* 1)DAGScheduler:构建有向无环图
* 2)TaskScheduler:将任务递交到集群
*/
val conf: SparkConf = new SparkConf().setAppName("ScalaWordCount").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)

//将传入的参数赋值给遍历
//方法一
val inputPath = args(0)
val outputPath = args(1)
//方法二
//val Array(inputPath,outputPath)=args

//通过 sc 读取数据,指定数据源
val lines: RDD[String] = sc.textFile(inputPath)

//将读取到的每行数据按空格切分,得到单词集合,过滤掉内容为空的,压平
val words: RDD[String] = lines.flatMap(_.split(" ")).filter(!_.isEmpty)

//每个单词记一次数,组成一个对偶元组
val wordWithMark: RDD[(String, Int)] = words.map((_, 1))

//以单词为 key 进行分组聚合,对出现次数进行累加
val reduced: RDD[(String, Int)] = wordWithMark.reduceByKey((x, y) => x + y)

//根据出现次数倒序排序
val sorted: RDD[(String, Int)] = reduced.sortBy(_._2, false)

//将数据保存到指定路径
val result: Array[(String, Int)] = sorted.collect()
result.foreach(println(_))
sorted.saveAsTextFile(outputPath)

//释放资源
sc.stop()
}
}

在 IDEA 运行 Scala 程序可以在此处设置 main() 方法需要的参数,多个参数需要使用空格隔开:

WordCount程序参数设置

Java

Java 版本的词频统计程序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
public class JavaWordCount {
public static void main(String[] args) {
/**
* 1:创建SparkContext对象
* 2:指定读取数据源
* 3:对读取到的数据源进行算子操作
* 4:将计算好的结果输出到指定目录
*/
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("JavaWordCount");
JavaSparkContext jsc = new JavaSparkContext(conf);

//通过 jsc 读取数据,指定数据源
JavaRDD<String> lines = jsc.textFile("E:/test/input/test01.txt");

//压平
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String line) throws Exception {
return Arrays.asList(line.split(" ")).iterator();
}
});

//过滤掉空字段
JavaRDD<String> filtered = words.filter(new Function<String, Boolean>() {
@Override
public Boolean call(String s) throws Exception {
return !s.isEmpty();
}
});

//每个单词记一次数
JavaPairRDD<String, Integer> wordWithMark = filtered.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String word) throws Exception {
return new Tuple2<>(word, 1);
}
});

//分组聚合
JavaPairRDD<String, Integer> reduced = wordWithMark.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer count1, Integer count2) throws Exception {
return count1 + count2;
}
});

/**
* 根据出现次数倒序排序(Java 只有 sortByKey() 方法)
* 1. 交换 key-value
* 2. 根据 key 进行排序
* 3. 再次交换 key-value
*/
//1. 交换 key-value
JavaPairRDD<Integer, String> swapped1 = reduced.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
@Override
public Tuple2<Integer, String> call(Tuple2<String, Integer> tuple2) throws Exception {
//return new Tuple2<>(tuple2._2, tuple2._1);
return tuple2.swap();
}
});

//2. 根据 key 进行排序
JavaPairRDD<Integer, String> sorted = swapped1.sortByKey(false);

//3. 再次交换 key-value
JavaPairRDD<String, Integer> swapped2 = (JavaPairRDD<String, Integer>) sorted.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Tuple2<Integer, String> tuple2) throws Exception {
return tuple2.swap();
}
});

//将数据收集到 Driver 端
List<Tuple2<String, Integer>> result = swapped2.collect();

//打印
for (Tuple2<String, Integer> tuple2 : result) {
System.out.println(tuple2);
}

//释放资源
jsc.stop();
}
}

Java Lambda

使用 JDK8 支持的 Lambda 表达式编写词频统计程序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class LambdaWordCount {
public static void main(String[] args) {
/**
* 1:创建SparkContext对象
* 2:指定读取数据源
* 3:对读取到的数据源进行算子操作
* 4:将计算好的结果输出到指定目录
*/
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("JavaWordCount");
JavaSparkContext jsc = new JavaSparkContext(conf);

//通过 jsc 读取数据,指定数据源
JavaRDD<String> lines = jsc.textFile("E:/test/input/test01.txt");

//压平
JavaRDD<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());

//过滤掉空字段
JavaRDD<String> filtered = words.filter(word -> !word.isEmpty());

//每个单词记一次数
JavaPairRDD<String, Integer> wordWithMark = filtered.mapToPair(word -> new Tuple2<>(word, 1));

//分组聚合
JavaPairRDD<String, Integer> reduced = wordWithMark.reduceByKey((x, y) -> (x + y));

/**
* 根据出现次数倒序排序(Java 只有 sortByKey() 方法)
* 1. 交换 key-value
* 2. 根据 key 进行排序
* 3. 再次交换 key-value
*/
//1. 交换 key-value
JavaPairRDD<Integer, String> swapped1 = reduced.mapToPair(tuple2 -> tuple2.swap());

//2. 根据 key 进行排序
JavaPairRDD<Integer, String> sorted = swapped1.sortByKey(false);

//3. 再次交换 key-value
JavaPairRDD<String, Integer> swapped2 = sorted.mapToPair(tuple2 -> tuple2.swap());

//将数据收集到 Driver 端
List<Tuple2<String, Integer>> result = swapped2.collect();

//打印
for (Tuple2<String, Integer> tuple2 : result) {
System.out.println(tuple2);
}

//释放资源
jsc.stop();
}
}
  • 本文作者: SANNAHA
  • 本文链接: https://sannaha.moe/Spark/
  • 版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!