Spark 是一种基于内存的快速、通用、可扩展的大数据分析计算引擎,能更好地适用于数据挖掘与机器学习等需要迭代的场景。
概述 Spark 是一种基于内存的快速、通用、可扩展的大数据分析计算引擎。Spark 和 MapReduce 的根本差异是多个作业之间的数据通信问题:MapReduce 会在运行完工作后将中间数据存放到磁盘中;Spark 可以将 Job 中间输出结果保存在内存中,从而不再需要读写 HDFS。因此 Spark 能更好地适用于数据挖掘与机器学习等需要迭代的场景。
特点 快 :与 Hadoop 2.x 的 MapReduce 相比,Spark 基于内存的运算要快 100 倍以上,基于硬盘的运算也要快10倍以上。Spark 实现了高效的 DAG 执行引擎,可以通过内存来高效处理数据流。
易用 :Spark 支持 Java、Python 和 Scala 的API,还支持超过 80 种高级算法,使用户可以快速构建不同的应用。而且 Spark 支持交互式的 Python 和 Scala 的 Shell,可以非常方便地在这些 Shell 中使用 Spark 来验证解决问题的方法。
通用 :Spark 提供了统一的解决方案。Spark 可以用于批处理、交互式查询(Spark SQL)、实时流处理(Spark Streaming)、机器学习(Spark MLlib)和图计算(GraphX)。这些不同类型的处理都可以在同一个应用中无缝使用。Spark 统一的解决方案非常具有吸引力,毕竟任何公司都想用统一的平台去处理遇到的问题,减少开发和维护的人力成本和部署平台的物力成本。
兼容 :Spark 可以非常方便地与其他的开源产品进行融合。比如,Spark 可以使用 Hadoop 的 YARN 和 Apache Mesos 作为它的资源管理和调度器,并且可以处理所有 Hadoop 支持的数据,包括 HDFS、HBase 等,对于已经部署 Hadoop 集群的用户,不需要做任何数据迁移就可以使用 Spark 的强大处理能力。Spark 也可以不依赖于第三方的资源管理和调度器,它实现了 Standalone 作为其内置的资源管理和调度框架,这样进一步降低了 Spark 的使用门槛,使得所有人都可以轻松部署和使用 Spark。
在绝大多数的数据计算场景中,Spark 会比 MapReduce 更有优势。但是 Spark 是基于内存的,在实际的生产环境中,可能会由于内存资源不够导致 Job 执行失败,这种场景下 MapReduce 是一个更好的选择,因此 Spark 并不能完全替代 MapReduce 。
架构 主要模块
Spark Core:核心模块,提供了 Spark 最基础与最核心的功能,相当于 MapReduce,可以用于进行离线数据分析;Spark Core 包含了对 RDD 的 API 定义,Spark SQL、Spark Streaming、MLlib、GraphX 都是在 Spark Core 的基础上进行扩展的;
Spark SQL:用来操作结构化数据的组件,可以使用 SQL 或 HiveQL 来查询数据;支持多种数据源,如 Hive 表、Parquet 以及 JSON 等。
Spark Streaming:对实时数据进行流式计算的组件,提供了用来操作数据流的 API,并且与 Spark Core 中的 RDD API 高度对应。
MLlib:分布式机器学习框架,MLlib 可以使用许多常见的机器学习和统计算法,简化大规模机器学习。
GraphX:分布式图形处理框架,它提供了一组可用于表达图表计算并可以模拟 Pregel 抽象化的 API,GraphX 还对这种抽象化提供了优化运行。
集群管理器:Spark 支持在各种集群管理器上运行,包括 YARN、Mesos,以及 Spark 集群自带的独立调度器。
主要角色
Master:主节点。接收客户端请求;管理和调度资源 整个集群中最多只有一个 Master 节点处于 Active 状态
Worker:从节点。默认会占用该节点的所有资源,会导致资源竞争,从而引起OOM(Out of Memory)
Executor:任务的执行者,会将任务按照阶段(stage)来执行
Stage:根据 RDD 的依赖关系(宽依赖,窄依赖)进行划分
Task:任务执行的一个模块
Driver:客户端,SparkContext 驱动
安装 运行模式 Spark 支持多种运行模式:
local:本地模式,用于开发测试。
StandAlone:独立部署模式,使用 Spark 自带的集群,由 Master 和 Slaver 组成。
Spark on YARN:将 Spark 运行在 YARN 集群上,由 YARN 统一管理分配资源,不需要额外构建 Spark 集群,用于生产环境。
Spark on Mesos:将 Spark 运行在 Mesos 集群上,由 YARN 统一管理分配资源,不需要额外构建 Spark 集群,用于生产环境。
开发环境
三台虚拟机
/etc/hosts 1 2 3 192.168.153.100 node01 192.168.153.101 node02 192.168.153.102 node03
目录结构
1 2 /export/softwares:存放软件安装包 /export/servers:软件安装目录
软件环境
1 2 zookeeper:zookeeper-3.4.5-cdh5.14.0 hadoop:hadoop-2.6.0-cdh5.14.0
解压 /export/softwares/ 1 tar -zxf spark-2.2.0-bin-2.6.0-cdh5.14.0.tgz -C /export /servers/
Local 使用本地线程来模拟 Spark 的程序运行:
local[n]
:使用 n 个线程来模拟 Spark 的运行;
local[*]
:使用与运行环境中 CPU 核数相等个线程来模拟 Spark 的运行。
配置 重命名配置文件,local 模式无需修改配置内容:
spark/conf/ 1 $ cp spark-env.sh.template spark-env.sh
启动 启动 spark-shell,输入 :quit
可以退出:
spark/bin/ 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 $ ./spark-shell --master local ... Spark context Web UI available at http://192.168.153.100:4040 Spark context available as 'sc' (master = local , app id = local-1561281726479). Spark session available as 'spark' . Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.2.0 /_/ Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_141) Type in expressions to have them evaluated. Type :help for more information. scala> :quit
测试 执行 Spark 示例程序,使用蒙特卡洛法迭代计算 100 次来求圆周率的值:
spark/bin/ 1 2 3 4 5 6 7 $ ./spark-submit \ --class org.apache.spark.examples.SparkPi \ --master local [2] \ --executor-memory 1G \ --total-executor-cores 2 \ /export /servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/examples/jars/spark-examples_2.11-2.2.0.jar \ 100
提交命令参数信息详见 #提交任务 。
蒙特卡洛法求圆周率:让计算机每次随机生成两个0到1之间的数,看以这两个实数为横纵坐标的点是否在单位圆内。生成一系列随机点,统计单位圆内的点数与总点数
StandAlone 构建一个由 Master 和 Slaver 组成的 Spark 集群。
配置
node01 修改 spark-env.sh
:
spark/conf/ 1 2 3 4 5 6 $ cp spark-env.sh.template spark-env.sh $ vim spark-env.sh export JAVA_HOME=/usr/java/jdk1.8.0_211export SPARK_MASTER_HOST=node01export SPARK_MASTER_PORT=7077export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=4000 -Dspark.history.retainedApplications=3 -Dspark.history.fs.logDirectory=hdfs://node01:8020/spark_log"
node01 修改 slaves
:
spark/conf/ 1 2 3 4 $ cp slaves.template slaves $ vim slaves node02 node03
node01 修改 spark-defaults.conf
:
spark/conf/ 1 2 3 4 5 $ cp spark-defaults.conf.template spark-defaults.conf $ vim spark-defaults.conf spark.eventLog.enabled true spark.eventLog.dir hdfs://node01:8020/spark_log spark.eventLog.compress true
为了方便 Spark 程序的开发调试,一般都会配置 Spark 的运行日志,将 Spark 程序的运行日志保存到 HDFS 上。在 HDFS 上创建日志文件存放目录:
1 $ hdfs dfs -mkdir /spark_log
分发 将 node01 上配置好 StandAlone 模式的 Spark 分发到 node02 和 node03:
/app/ 1 2 $ scp -r spark/ node02:$PWD $ scp -r spark/ node03:$PWD
启动 node01 执行以下命令启动 Spark:
spark/sbin 1 2 $ ./start-all.sh $ ./start-history-server.sh
浏览器访问 Spark: http://node01:8080/ 查看 Spark 任务的历史日志: http://node01:4000/
node01 执行以下命令进入 spark-shell:
spark/bin/ 1 $ ./spark-shell --master spark://node01:7077
测试 执行 Spark 示例程序,使用蒙特卡洛法迭代计算100次来求圆周率的值:
spark/bin/ 1 2 3 4 5 6 7 $ ./spark-submit \ --class org.apache.spark.examples.SparkPi \ --master spark://node01:7077 \ --executor-memory 1G \ --total-executor-cores 2 \ /export /servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/examples/jars/spark-examples_2.11-2.2.0.jar \ 100
停止 node01 执行以下命令停止 Spark 集群:
spark/ 1 2 $ sbin/stop-all.sh $ sbin/stop-history-server.sh
StandAlone HA 构建一个由 Master 和 Slaver 组成的 Spark 集群,并且配置为高可用(HA)来解决 master 单节点故障问题,需要借助 Zookeeper 实现自动切换。
配置
node01 修改 spark-env.sh
:
spark/conf/spark-env.sh 1 2 3 4 5 export JAVA_HOME=/usr/java/jdk1.8.0_141 - export SPARK_MASTER_HOST=node01 export SPARK_MASTER_PORT=7077 export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=4000 -Dspark.history.retainedApplications=3 -Dspark.history.fs.logDirectory=hdfs://node01:8020/spark_log" + export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=node01:2181,node02:2181,node03:2181 -Dspark.deploy.zookeeper.dir=/spark"
node01 修改 slaves
:
spark/conf/ 1 2 3 4 $ cp slaves.template slaves $ vim slaves node02 node03
node01 修改 spark-defaults.conf
:
spark/conf/ 1 2 3 4 5 $ cp spark-defaults.conf.template spark-defaults.conf $ vim spark-defaults.conf spark.eventLog.enabled true spark.eventLog.dir hdfs://node01:8020/spark_log spark.eventLog.compress true
在 HDFS 上创建日志文件存放目录:
1 $ hdfs dfs -mkdir /spark_log
分发 将 node01 将 Spark 分发到 node02 和 node03:
/app/ 1 2 $ scp -r spark/ node02:$PWD $ scp -r spark/ node03:$PWD
启动
编写脚本启动 Zookeeper 集群:
spark/startZkServers.sh 1 2 3 4 5 6 7 #!/bin/bash echo "启动zookeeper集群中..." for host in node01 node02 node03do ssh -q $host "source /etc/profile; /app/zookeeper/bin/zkServer.sh start" done echo "启动完成..."
在 node01 上启动 Spark 集群和历史服务:
spark/ 1 2 $ sbin/start-all.sh $ sbin/start-history-server.sh
node02 启动第二个 master 节点:
spark/
浏览器页面访问: http://node01:8080/ , http://node02:8080/
node01 执行以下命令进入spark-shell:
spark/bin/ 1 $ ./spark-shell --master spark://node01:7077,node02:7077
测试 执行 spark 示例程序,使用蒙特卡洛法迭代计算100次来求圆周率的值:
spark/bin/ 1 2 3 4 5 6 7 $ ./spark-submit \ --class org.apache.spark.examples.SparkPi \ --master spark://node01:7077,node02:7077 \ --executor-memory 1G \ --total-executor-cores 2 \ /app/spark/examples/jars/spark-examples_2.11-2.2.0.jar \ 100
Spark on YARN Spark on YARN 模式官方文档说明: http://spark.apache.org/docs/latest/running-on-yarn.html
在生产环境中,需要将 Spark 运行在 YARN 集群上,使用 YARN 来管理所有计算资源,不用额外构建 Spark 集群。只需在任意一台机器配置 Spark 客户端(Driver),用于提交任务到 YARN 集群即可。
配置
如果虚拟机提供给 YARN 集群的资源不够,可以在三台服务器的 yarn-site.xml
当中添加以下两个配置来跳过 YARN 集群资源的检查,然后重启 YARN 集群:
hadoop/etc/hadoop/yarn-site.xml 1 2 3 4 5 6 7 8 <property > <name > yarn.nodemanager.pmem-check-enabled</name > <value > false</value > </property > <property > <name > yarn.nodemanager.vmem-check-enabled</name > <value > false</value > </property >
在 node01 上重启 YARN 集群:
1 stop-yarn.sh && start-yarn.sh
任意一台安装 Spark 的服务器修改 spark-env.sh
即可用于提交任务到 YARN:
spark/conf/spark-env.sh 1 2 3 4 5 6 + HADOOP_CONF_DIR=/app/hadoop/etc/hadoop + YARN_CONF_DIR=/app/hadoop/etc/hadoop export JAVA_HOME=/usr/java/jdk1.8.0_141 export SPARK_MASTER_PORT=7077 export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=4000 -Dspark.history.retainedApplications=3 -Dspark.history.fs.logDirectory=hdfs://node01:8020/spark_log" export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=node01:2181,node02:2181,node03:2181 -Dspark.deploy.zookeeper.dir=/spark"
测试 提交任务到 YARN 集群,可以选择两种部署模式(deploy-mode ):客户端 client
和集群 cluster
。默认为 client
模式,此时提交任务的服务器作为 Driver 端;但生产中通常选择 cluster
模式,让 Driver 端是运行在 NodeManager 上。
client模式
spark/bin/ 1 2 3 4 5 6 $ ./spark-submit \ --class org.apache.spark.examples.SparkPi \ --master yarn \ --deploy-mode client \ /app/spark/examples/jars/spark-examples_2.11-2.2.0.jar \ 10
client
模式下控制台会打印以下内容:
cluster模式
spark/bin/ 1 2 3 4 5 6 7 8 9 10 $ ./spark-submit \ --class org.apache.spark.examples.SparkPi \ --master yarn \ --deploy-mode cluster \ --driver-memory 4g \ --executor-memory 2g \ --executor-cores 1 \ --queue thequeue \ /app/spark/examples/jars/spark-examples_2.11-2.2.0.jar \ 10
cluster
模式下控制台会打印以下内容:
需要通过给出的追踪页面中查看任务的执行情况:
点击 logs 打开日志,查看计算结果:
如果查看 logs 时出现问题,比如出现下图所示的错误:
尝试以下方式来解决:
add to mapred-site.xml:
mapred-site.xml 1 2 3 4 5 6 7 8 9 <property > <name > mapreduce.jobhistory.address</name > <value > node01:10020</value > </property > <property > <name > mapreduce.jobhistory.webapp.address</name > <value > node01:19888</value > </property >
add to yarn-site.xml:
yarn-site.xml 1 2 3 4 <property > <name > yarn.log.server.url</name > <value > http://node01:19888/jobhistory/logs</value > </property >
start history server with mr-jobhistory-daemon.sh start historyserver
。
提交任务 将应用打包好,就可以使用 spark-submit 提交任务了。在提交任务时可以设置 Spark 使用的 classpath 和依赖,支持多种集群管理器和部署模式:
spark/bin/ 1 2 3 4 5 6 7 8 $ ./spark-submit \ --class <main-class> --master <master-url> \ --deploy-mode <deploy-mode> \ --conf <key=value> \ ... <application-jar> \ [application-arguments]
常用选项:
--class <main-class>
:要启动的类的全路径(如org.apache.spark.examples.SparkPi
)
--master <master-url>
:集群的 master URL (如spark://node01:7077
)
--deploy-mode <deploy-mode>
:发布到worker节点(cluster
) 或者作为一个本地客户端 (client
)默认为 client
--conf <key=value>
:任意的Spark配置属性, 格式key=value
. 如果 value 中包含空格,可以加引号“key=value”
<application-jar>
:打包好的应用jar,包含依赖。这个jar需要被集群全局可见,如在hdfs://
共享存储系统中,如果是file://path
,那么所有的节点的路径下都应包含同样的jar
<application-arguments>
:传给main()
方法的参数
master-url
可以是以下格式:
参数
说明
local
本地以一个worker线程运行
local[K]
本地以 K 个 worker 线程 (理想情况下, K设置为你机器的CPU核数)
local[*]
本地以等于CPU核数的线程运行
spark://HOST:PORT
连接到指定的 Spark 集群。PORT
为master集群配置的端口,默认为7077
mesos://HOST:PORT
连接到指定的 Mesos 集群。PORT
为 mesos 端口,默认为5050
。如果Mesos使用ZooKeeper,格式为mesos://zk://...
yarn-client
以client模式连接到 YARN cluster。集群的位置基于HADOOP_CONF_DIR
变量找到
yarn-cluster
以cluster模式连接到 YARN cluster。集群的位置基于HADOOP_CONF_DIR
变量找到
其他可能用到的参数:
--driver-memory MEM
:Driver 程序使用内存大小
--executor-memory MEM
:executor 内存大小,默认1G
--driver-cores NUM
:Driver 程序使用的 CPU 个数,仅限于 Spark Alone 模式
--total-executor-cores NUM
:executor 使用的总核数,仅限于 Spark Alone、Spark on Mesos 模式
–-executor-cores NUM
:每个 executor 使用的内核数,默认为1,仅限于 Spark on YARN 模式
–-queue QUEUE_NAME
:提交应用程序给哪个 YARN 的队列,默认是 default
队列,仅限于 Spark on YARN 模式
–-num-executors NUM
:启动的 executor 数量,默认是 2 个,仅限于 Spark on YARN 模式
提交到 Local:
1 2 3 4 5 6 7 bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master local [2] \ --executor-memory 1G \ --total-executor-cores 2 \ /export /servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/examples/jars/spark-examples_2.11-2.2.0.jar \ 100
提交到 HA:
1 2 3 4 5 6 7 bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master spark://node01:7077,node02:7077 \ --executor-memory 1G \ --total-executor-cores 2 \ /export /servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/examples/jars/spark-examples_2.11-2.2.0.jar \ 100
提交到 YARN:
1 2 3 4 5 6 bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master yarn \ --deploy-mode cluster \ /export /servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/examples/jars/spark-examples_2.11-2.2.0.jar \ 100
词频统计程序示例 Spark Shell 通过 Spark Shell 编写并运行一个词频统计程序,使用 Spark 的本地模式统计本地文件中单词出现的次数并按次数倒序排序。创建 word.txt
内容如下:
/data/word.txt 1 2 3 hello sannaha hi sannaha hello world
运行 spark-shell 启动本地模式,使用 2 个线程执行任务:
spark/bin/ 1 ./spark-shell --master local [2]
使用 Scala 语言编写词频统计代码,计算并查看结果:
1 2 3 4 5 scala> sc.textFile("file:///data/word.txt" ).flatMap(_.split(" " )).map((_,1 )).reduceByKey(_+_).sortBy(-_._2).collect().foreach(println) (sannaha,2 ) (hello,2 ) (world,1 ) (hi,1 )
说明:
sc
:spark-shell 启动时已经初始化 SparkContext
类对象 sc
;
textFile
:读取文件生成 RDD;
flatMap
:对文件数据进行压平并按照空格切分;
map
:对每个单词标记一个数字 1
,生成对偶元组 (word,1)
;
reduceByKey
:按照单词进行聚合,并对出现的次数进行累加;
sordBy
:按照出现次数倒序排序;
collect
:触发任务的执行 ,收集结果数据;
foreach
:遍历结果并打印。
Scala Scala 版本的词频统计程序:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 object ScalaWordCount { def main (args: Array [String ]): Unit = { val conf: SparkConf = new SparkConf ().setAppName("ScalaWordCount" ).setMaster("local[*]" ) val sc: SparkContext = new SparkContext (conf) val inputPath = args(0 ) val outputPath = args(1 ) val lines: RDD [String ] = sc.textFile(inputPath) val words: RDD [String ] = lines.flatMap(_.split(" " )).filter(!_.isEmpty) val wordWithMark: RDD [(String , Int )] = words.map((_, 1 )) val reduced: RDD [(String , Int )] = wordWithMark.reduceByKey((x, y) => x + y) val sorted: RDD [(String , Int )] = reduced.sortBy(_._2, false ) val result: Array [(String , Int )] = sorted.collect() result.foreach(println(_)) sorted.saveAsTextFile(outputPath) sc.stop() } }
在 IDEA 运行 Scala 程序可以在此处设置 main()
方法需要的参数,多个参数需要使用空格隔开:
Java Java 版本的词频统计程序。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 public class JavaWordCount { public static void main (String[] args) { SparkConf conf = new SparkConf().setMaster("local[*]" ).setAppName("JavaWordCount" ); JavaSparkContext jsc = new JavaSparkContext(conf); JavaRDD<String> lines = jsc.textFile("data/word.txt" ); JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call (String line) throws Exception { return Arrays.asList(line.split(" " )).iterator(); } }); JavaRDD<String> filtered = words.filter(new Function<String, Boolean>() { @Override public Boolean call (String s) throws Exception { return !s.isEmpty(); } }); JavaPairRDD<String, Integer> wordWithMark = filtered.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call (String word) throws Exception { return new Tuple2<>(word, 1 ); } }); JavaPairRDD<String, Integer> reduced = wordWithMark.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call (Integer count1, Integer count2) throws Exception { return count1 + count2; } }); JavaPairRDD<Integer, String> swapped1 = reduced.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() { @Override public Tuple2<Integer, String> call (Tuple2<String, Integer> tuple2) throws Exception { return tuple2.swap(); } }); JavaPairRDD<Integer, String> sorted = swapped1.sortByKey(false ); JavaPairRDD<String, Integer> swapped2 = (JavaPairRDD<String, Integer>) sorted.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() { @Override public Tuple2<String, Integer> call (Tuple2<Integer, String> tuple2) throws Exception { return tuple2.swap(); } }); List<Tuple2<String, Integer>> result = swapped2.collect(); for (Tuple2<String, Integer> tuple2 : result) { System.out.println(tuple2); } jsc.stop(); } }
Java Lambda 使用 JDK8 支持的 Lambda 表达式编写词频统计程序。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 public class LambdaWordCount { public static void main (String[] args) { SparkConf conf = new SparkConf().setMaster("local[*]" ).setAppName("JavaWordCount" ); JavaSparkContext jsc = new JavaSparkContext(conf); JavaRDD<String> lines = jsc.textFile("data/word.txt" ); JavaRDD<String> words = lines.flatMap(line -> Arrays.asList(line.split(" " )).iterator()); JavaRDD<String> filtered = words.filter(word -> !word.isEmpty()); JavaPairRDD<String, Integer> wordWithMark = filtered.mapToPair(word -> new Tuple2<>(word, 1 )); JavaPairRDD<String, Integer> reduced = wordWithMark.reduceByKey((x, y) -> (x + y)); JavaPairRDD<Integer, String> swapped1 = reduced.mapToPair(tuple2 -> tuple2.swap()); JavaPairRDD<Integer, String> sorted = swapped1.sortByKey(false ); JavaPairRDD<String, Integer> swapped2 = sorted.mapToPair(tuple2 -> tuple2.swap()); List<Tuple2<String, Integer>> result = swapped2.collect(); for (Tuple2<String, Integer> tuple2 : result) { System.out.println(tuple2); } jsc.stop(); } }