0%

Spark RDD详解

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是 Spark 中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。RDD具有数据流模型的特点:自动容错、位置感知性调度和可伸缩性。RDD允许用户在执行多个查询时显式地将数据缓存在内存中,后续的查询能够重用这些数据,这极大地提升了查询速度。


理解RDD

什么是RDD?
  • Resilient:数据存储可以在内存和磁盘自动切换;节点故障或分区的损坏而丢失的数据可以在血缘(Lineage)的帮助下自动恢复;计算出错可以重试;可以根据需要重新分片;
  • Distributed:数据是分布式存储的,可用于分布式计算;
  • Dataset:封装了计算逻辑的数据集合,本身不存放数据,但操作起来像本地集合一样。

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是 Spark 中最基本的数据抽象,它代表一个弹性的不可变可分区、里面的元素可并行计算集合,它由分区组成,本身不存放数据,只存放数据的引用,但操作 RDD 就像操作本地集合一样。RDD允许用户在执行多个查询时显式地将数据缓存在内存中,后续的查询能够重用这些数据,这极大地提升了查询速度。

为什么要设计出 RDD 这种数据抽象?

Hadoop 的 MapReduce 是一种基于数据集的工作模式,这种工作模式一般是从存储上加载数据集,然后操作数据集,最后写入物理存储设备,数据更多面临的是一次性处理,这种方式对迭代式的算法交互式数据挖掘这两种数据领域常见的操作不是很高效Spark 应运而生。RDD 是基于工作集的工作模式,更多的是面向工作流。

两种迭代方式

RDD的属性

在 Spark 源码的注释中,阐明了每个 RDD 都有五个主要属性:

RDD.scala
1
2
3
4
5
6
7
8
9
10
/**
* Internally, each RDD is characterized by five main properties:
*
* - A list of partitions
* - A function for computing each split
* - A list of dependencies on other RDDs
* - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
* - Optionally, a list of preferred locations to compute each split on (e.g. block locations for
* an HDFS file)
*/

分区列表

一个 Partition(分区)列表。Partition 即数据集的基本组成单位,每个 Partition 都会被一个计算任务处理,Partition 的数量决定计算的并行度。

RDD.scala
1
2
3
4
5
6
7
8
/**
* Implemented by subclasses to return the set of partitions in this RDD. This method will only
* be called once, so it is safe to implement a time-consuming computation in it.
*
* The partitions in this array must satisfy the following property:
* `rdd.partitions.zipWithIndex.forall { case (partition, index) => partition.index == index }`
*/
protected def getPartitions: Array[Partition]

生成Partition

  • 文件在 HDFS 上以 block(块)存储;
  • Spark 根据数据格式对应的 InputFormat 将文件解析为若干个 InputSplit(输入分片),如果文件支持切分,split 的数量等于 block 的数量,如果文件不支持切分(如 gz 格式)就只有一个 split;
  • 每个 split 对应 RDD 的一个 Partition(分区),每个分区对应一个 Task(计算任务)。每个 Task 都会被分配给某个 Executor 的某个 core 去执行;

计算函数

一个对每个 split(分片)进行计算的函数

RDD.scala
1
2
3
4
5
6
/**
* :: DeveloperApi ::
* Implemented by subclasses to compute a given partition.
*/
@DeveloperApi
def compute(split: Partition, context: TaskContext): Iterator[T]
  • RDD 的计算是以分区为单位的,每个 RDD 都会实现 compute 函数。如果 RDD 是通过已有的文件系统构建的,则 compute 函数读取指定文件系统中的数据;如果 RDD 是通过其他 RDD 转换而来,则 compute 函数执行转换逻辑,对其他 RDD 的数据进行转换。该函数会对迭代器进行复合,不需要保存每次计算的结果。
  • RDD 是只读的,要想改变 RDD 中的数据,只能在现有的 RDD 基础上创建新的 RDD。由一个 RDD 转换到另一个 RDD,可以通过丰富的操作算子实现,不像 MapReduce 那样只能写 map 和 reduce。

依赖关系

RDD 之间的依赖关系。RDD 通过操作算子进行转换,转换得到的新 RDD 包含了从其他 RDD 衍生所必需的信息,RDD 之间维护着这种依赖关系,称之为血缘。当部分分区数据丢失时,Spark 可以通过这个依赖关系重新计算丢失的分区数据,而不用对 RDD 的所有分区重新进行计算。

RDD.scala
1
2
3
4
5
/**
* Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only
* be called once, so it is safe to implement a time-consuming computation in it.
*/
protected def getDependencies: Seq[Dependency[_]] = deps

分区函数

key-value 型 RDD 的 Partitioner(分区函数)。当前 Spark 中实现了两种类型的分区函数,一个是基于哈希的 HashPartitioner,另外一个是基于范围的 RangePartitioner。只有对于 key-value 型的 RDD,才会有 Partitioner,非 key-value 型的 RDD 的 Parititioner 值是 None。Partitioner 函数不但决定了 RDD 本身的分区数量,也决定了父 RDD 在 Shuffle 时输出的分区数量。

RDD.scala
1
2
/** Optionally overridden by subclasses to specify how they are partitioned. */
@transient val partitioner: Option[Partitioner] = None

优先位置列表

一个存取每个 split 的 preferred location(优先位置)列表。对于 HDFS 文件来说,这个列表保存的就是每个 Partition 所在的 block 的位置。根据“移动数据不如移动计算”的理念,Spark 在进行任务调度的时候,会尽可能地将计算任务分配到存储要处理数据块所在的节点。

RDD.scala
1
2
3
4
/**
* Optionally overridden by subclasses to specify placement preferences.
*/
protected def getPreferredLocations(split: Partition): Seq[String] = Nil

查看不同格式的文件对应的 Partition 数量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
scala> val rdd1=sc.textFile("hdfs://node01:8020/input/bigFile")
rdd1: org.apache.spark.rdd.RDD[String] = hdfs://node01:8020/input/bigFile MapPartitionsRDD[3] at textFile at <console>:24
scala> rdd1.partitions.size
res1: Int = 3

scala> val rdd2=sc.textFile("hdfs://node01:8020/input/solr-4.10.2.zip")
rdd2: org.apache.spark.rdd.RDD[String] = hdfs://node01:8020/input/solr-4.10.2.zip MapPartitionsRDD[5] at textFile at <console>:24
scala> rdd2.partitions.size
res2: Int = 2

scala> val rdd3=sc.textFile("hdfs://node01:8020/input/hadoop-2.6.0-cdh5.14.0.tar.gz")
rdd3: org.apache.spark.rdd.RDD[String] = hdfs://node01:8020/input/hadoop-2.6.0-cdh5.14.0.tar.gz MapPartitionsRDD[7] at textFile at <console>:24
scala> rdd3.partitions.size
res3: Int = 1

RDD的作用

在执行 Spark 计算任务时,首先需要申请资源,然后将应用程序的数据处理逻辑划分成一个个计算任务,之后将任务分发到分配有资源的计算节点上进行计算,最后得到计算结果。在 Yarn 环境中 Spark 程序的运行流程如下:

  1. 启动 Yarn 集群环境:

1-启动Yarn集群环境

  1. Spark 申请资源,选择一些 NodeManager 创建调度节点和计算节点:

2-Spark申请资源

  1. Spark 框架根据需求将计算逻辑根据分区划分成不同的任务:

3-Spark划分任务

  1. 调度节点根据计算节点状态将任务发送到计算节点进行计算:

4-进行计算

RDD 是 Spark 用于数据处理的核心模型,从上面的流程中可以看出 RDD 主要作用是将计算逻辑进行封装,生成 Task 发送给 Executor 节点执行计算。

创建RDD

创建 RDD 有三种方式:

  1. 通过已有的集合创建:
1
2
3
4
// 使用 parallelize() 根据集合创建 RDD
val rdd1 = sc.parallelize(Array(1, 2, 3, 4, 5))
// 使用 makeRDD() 创建 RDD,该方法调用了 parallelize()
val rdd2 = sc.makeRDD(Array(1, 2, 3, 4, 5))
  1. 通过读取文件创建,支持本地文件系统、HDFS、HBase 等:
1
2
3
4
// 从本地文件系统中创建 RDD
val rdd3 = sc.textFile("data/words.txt")
// 从 HDFS 中创建 RDD
val rdd4 = sc.textFile("hdfs://node01:8020/data/words.txt")
  1. 通过已有的 RDD 经过算子转换,生成新的 RDD:
1
val rdd5 = rdd4.flatMap(_.split(" "))

算子

RDD的算子分为两类:

  • Transformation:转换算子,根据 RDD 创建一个新的 RDD。这类算子都是延迟加载的,它们不会直接执行计算,只是记住转换动作。只有要求返回结果给 Driver 时,这些转换才会真正运行。这种设计让 Spark 的运行更加高效。
  • Action:行动算子,对 RDD 计算后返回一个结果给 Driver,会直接执行计算

Transformation

转换算子 说明
map(f: T => U) 对每个元素执行函数计算,返回MappedRDD[U]map
flatmap(f: T => TraversableOnce[U]) 首先对每个元素执行函数计算,然后将结果展平,返回FlatMappedRDD[U]flatmap
filter(f: T => Boolean) 保留函数计算结果为true的元素,返回FilteredRDD[T]
mapPartitions(Iterator[T] => Iterator[U]) 对每个分区执行函数计算,返回MapPartitionsRDD[U]
sample(withReplacement, fraction, seed) 简单随机抽样,返回采样子集PartitionwiseSampledRDD[T]withReplacement表示是抽出的数据是否放回,fraction表示样本数量,seed表示随机数发生器种子。
union(otherRdd[T]) 对源RDD和参数RDD求并集,返回UnionRDD[T]
intersection(otherRdd[T]) 对源RDD和参数RDD求交集,返回MapPartitionsRDD[T]
distinct([numTasks]) 对源RDD进行去重后,返回新的RDD。numTasks表示并行任务的数量,默认为8。
partitionBy() 对RDD进行分区操作,如果原有的partionRDD和现有的partionRDD数量一致就不进行分区,否则会生成ShuffledRDD
reduceByKey(f: (V, V) => V) 在(K,V)类型的RDD上调用,使用指定的reduce函数将key相同的value聚合到一起,返回ShuffledRDD[(K, V)]。同一台机器上的键值对在移动前会先组合,适合大数据复杂计算。reduceByKey
groupByKey() 在(K,V)类型的RDD上调用,同样对每个key进行操作,但只生成一个sequence,返回ShuffledRDD[([K], Iterable[V])]。所有的键值对都会被移动,网络开销大,不适合大数据复杂计算。groupByKey
sortByKey([ascending], [numTasks]) 在(K,V)类型的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD。
sortBy(func,[ascending], [numTasks]) sortByKey类似,但是更灵活,可以用func函数先对数据进行处理,按照处理后的数据比较结果排序。
join(otherDataset, [numTasks]) 在(K,V)和(K,W)类型的RDD上调用,返回(K,(V,W))类型的RDD,相同key对应的所有元素对在一起。
cogroup(otherDataset, [numTasks]) 在(K,V)和(K,W)类型的RDD上调用,返回(K,(Iterable,Iterable))类型的RDD。
coalesce(numPartitions,[shuffle]) 将 RDD 的分区数减少到 numPartitions。当以后大数据集被过滤成小数据集后,减少分区数,可以提升效率。第二个参数 shuffle 为 Boolean 类型,默认为 false。
repartition(numPartitions) 将RDD数据重新混洗(reshuffle)并随机分布到新的分区中,使数据分布更均衡,新的分区个数取决于 numPartitions。该算子总是需要通过网络混洗所有数据。底层是调用 coalesce(numPartitions, shuffle = true)

Action

行动算子 说明
reduce(func) 通过func函数聚合RDD中的所有元素,这个功能必须是可交换且可并联的。
collect() 在 Driver 中,以数组的形式返回数据集的所有元素。
count() 返回RDD的元素个数。
first() 返回RDD的第一个元素,类似于 take(1)。
take(n) 返回一个由数据集的前n个元素组成的数组。
takeSample(withReplacement, num, [seed]) 返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否用随机数替换不足的部分,withReplacement表示是抽出的数据是否放回,num表示样本数量,seed表示随机数发生器种子。
takeOrdered(n, [ordering]) 返回前几个的排序。
aggregate(zeroValue: U)(seqOp: (U,T) ⇒ U, combOp: (U, U) ⇒ U) 先将每个分区里面的元素通过seqOp函数和初始值进行聚合,然后用combOp函数将每个分区的结果和初始值zeroValue进行combine操作。这个函数最终返回的类型不需要和RDD中元素类型一致。
fold(num)(func) aggregate的简化操作,seqOpcombOp一样,没有初始值。
saveAsTextFile(path) 将数据集中的元素以textfile的形式保存到指定的目录下。Spark 会调用toString()方法转换每个元素。
saveAsSequenceFile(path) 将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下。
saveAsObjectFile(path) 将数据集中的元素序列化成对象,存储到文件中。
countByKey() 对(K,V)类型的RDD,返回一个(K,Int)的Map,表示每个key对应的元素个数。
foreach(f: T => Unit) 为数据集的每一个元素调用func函数进行处理。
foreachPartition(f: Iterator[T] => Unit) 为数据集的每一个分区调用func函数进行处理。可以用来处理数据库连接等操作。

使用经验

reduceByKey 与 groupByKey 之间如何选择?

reduceByKey 性能更好。reduceByKey 会在 map 端先进行本地 combine,可以大大减少要传输到 reduce 端的数据量,减小网络传输的开销。groupByKey 不会进行本地聚合,只是原封不动的把 ShuffleMapTask 的输出拉取到 ResultTask 的内存中,所有的数据都要进行网络传输,从而导致网络传输的性能开销很大。

只有在 reduceByKey 处理不了时,才用 groupByKey().map() 来替代。

需要改变分区数量,repartition 与 coalesce 之间如何选择?
1
2
repartition(numPartitions:Int):RDD[T]
coalesce(numPartitions:Int,shuffle:Boolean=false):RDD[T]

repartition 会创建新的 partition 并使用 full shuffle,因此它是一个消耗昂贵的算子。Spark 还提供了一个经过优化的算子叫作 coalesce,它会利用已有的 partition 来尽量减少数据 shuffle,尽量避免数据迁移。而 repartition 相当于 coalesce 中参数 shuffletrue 的简易实现。

使用时如何选择呢:

  • 如果需要稍微减少分区,可以使用 coalesce(用到 coalesce 的场景默认其 shuffle 参数为 false),它会利用已有的 partition 来减少 shuffle,可以尽量避免数据迁移;
  • 如果要增加分区或减少分区到特别少甚至到 1,可以使用 repartitioncoalesce(1, true),避免 Spark 程序因为并行度不够而影响性能。

假设源 RDD 有 N 个分区,需要重新划分成 M 个分区:

  1. 如果 N < M。一般情况下 N 个分区有数据分布不均匀的状况,利用 HashPartitioner 函数将数据重新分区为 M 个,这时需要将 shuffle 设置为 true。在 shuffle 为 false 的情况下,coalesce 为无效的,不进行 shuffle 过程,父 RDD 和子 RDD 之间是窄依赖关系。
  2. 如果 N > M 并且 N 和 M 相差不多(假如 N = 1000,M = 100),那么就可以将 N 个分区中的若干个分区合并成一个新的分区,最终合并为 M 个分区,这时可以将 shuffle 设置为 false
  3. 如果 N > M 并且两者相差悬殊,这时如果将 shuffle 设置为 false,父子 RDD 是窄依赖关系,他们同处在一个 Stage 中,就可能造成 Spark 程序的并行度不够,从而影响性能。如果在 M = 1 的时候,为了使 coalesce 之前的操作有更好的并行度,可以将 shuffle 设置为 true

注:如果 shuffle 为 false 时,如果传入的参数大于现有的分区数目,RDD 的分区数不变,也就是说不经过 shuffle,是无法将 RDD 的分区数变多的。

map 和 mapPartitions 的区别
  • map:一次处理一个元素的数据
  • mapPartitions:一次处理一批数据

mapPartitions 的优缺点:

  • 优点:速度快,一次处理一批数据,即一次接收所有的 partition 数据,在 map 过程中需要频繁创建额外的对象(例如将 RDD 中的数据通过 jdbc 写入数据库,map 需要为每个元素创建一个连接,而 mapPartitions 为每个 partition 创建一个链接),则 mapPartitions 效率比 map 高的多。
  • 缺点:容易出现内存溢出,当接收的 partition 的数据量较大时,例如 100 万数据, 一次传入一个 function 以后,那么可能一下子内存不够,但是又没有办法去腾出内存空间来,可能就导致 OOM;而 map 一般较少出现内存溢出。

依赖关系

依赖类型

RDD和它的父RDD的依赖关系有两种类型:窄依赖宽依赖

窄依赖与宽依赖

窄依赖(narrow dependency)指的是父 RDD 的每个 Partition 最多被子 RDD 的一个 Partition 使用,如 mapfilterunionmapPartitions 等操作都会产生窄依赖。

宽依赖(wide dependency)指的是父 RDD 的每个 Partition 都有可能被子 RDD 的多个 Partition 使用,如 groupByKeyreduceByKeysortByKey 等操作都会产生宽依赖。

join 操作既可能是窄依赖也可能是宽依赖

图左的join:两个RDD在进行join操作时,子RDD依赖于两个父RDD,但每个父RDD的每个 Partition 只被子RDD 的一个 Partition 使用,所以是窄依赖的join(join with inputs co-partitioned)。
图右的join:两个RDD在进行join操作时,子RDD的多个 Partition 依赖于父RDD 的同一个 Partition,所以是宽依赖的join(join with inputs not co-partitioned)。子RDD的 Partition 需要父RDD的所有 Partition 进行 join 的转换,涉及到了 shuffle。

Lineage

为了进行容错,RDD 通过所谓的血缘(Lineage)记住了它是如何从其它 RDD 转换过来的。RDD的 Lineage 记录的是粗粒度的,只记录单个块上执行的单个操作。当 RDD 的部分分区数据丢失时,它可以根据 RDD 的元数据信息和转换行为来重新运算和恢复丢失的数据分区。

DAG的生成

什么是DAG?

DAG(Directed Acyclic Graph)叫有向无环图,“有向”指有方向,“无环”指不能有闭环。DAG 描述了多个 RDD 之间的转换关系,RDD 和 RDD 之间是存在依赖关系的,子 RDD 记录了父 RDD 的引用,最后的 RDD 触发 Action。一个 DAG 就是一个 job。

什么是Stage?

Stage的划分

根据 RDD 之间依赖关系的不同将 DAG 划分成不同的 Stage(调度阶段):

  • 窄依赖:Partition 依赖关系是明确的,转换处理可以在同一个线程里完成,窄依赖就被Spark划分到同一个 Stage。
  • 宽依赖:父 RDD 在 Shuffle 完成后,才能开始接下来的计算。宽依赖是划分 Stage 的依据
为什么要划分Stage?

为了并行计算。一个复杂的业务逻辑如果有 shuffle,那么就意味着上一个阶段产生结果后,才能执行下一个阶段,即下一个阶段的计算要依赖上一个阶段的结果。按照 shuffle 进行划分(也就是按照宽依赖就行划分),就可以将一个 DAG 划分成多个 Stage。在同一个 Stage 中,会有多个算子操作,可以形成一个 pipeline 流水线,流水线内的多个平行的分区可以并行执行。

如何划分Stage?
  1. Spark 划分 Stage 的整体思路是:从后往前回溯,遇到宽依赖就断开,划分为一个 Stage,遇到窄依赖就将这个 RDD 加入该 Stage 中。
  2. 每个 Stage 里面 Task 的数量是由该 Stage 中最后一个 RDD 的 Partition 的数量所决定的。
  3. 最后一个 Stage 里面的任务类型是 ResultTask,前面其他所有的 Stage 的任务类型是 ShuffleMapTask。
  4. 代表当前 Stage 的算子一定是该 Stage 的最后一个计算步骤。

划分Stage

Shuffle的过程

宽依赖之间会划分 Stage,而 Stage 之间就是 Shuffle。

Spark 中负责 Shuffle 过程的执行、计算和处理的组件主要是 ShuffleManager。ShuffleManager 随着 Spark 的发展有两种实现的方式,分别为 HashShuffleManager 和 SortShuffleManager,因此 Spark 的 Shuffle 有 Hash-Based Shuffle 和 Sort-Based Shuffle 两种。

Hash-Based Shuffle 和 Sort-Based Shuffle 有什么区别?
  • Hash-Based Shuffle:过时的,会产生大量的中间磁盘文件,影响性能。是 Spark 1.2 之前的策略。
  • Sort-Based Shuffle:每个 Task 会将所有的临时文件合并成一个磁盘文件,又分为普通机制和byPass机制:
  • 普通机制:会对数据进行排序,造成 Shuffle 的速度稍慢。
  • byPass机制:不会对数据进行排序,执行效率较高。

两种机制的介绍详见: spark之shuffle原理及性能优化

任务调度

任务调度流程图

任务调度流程图

各个 RDD 之间的依赖关系形成DAG(有向无环图),DAGScheduler 对这些依赖关系形成的 DAG 进行 Stage 划分:从后往前回溯,遇到宽依赖就断开,划分为一个 Stage,遇到窄依赖就将这个 RDD 加入该 Stage中。完成了 Stage 的划分,DAGScheduler 基于每个 Stage 生成 TaskSet,并将 TaskSet(一组Task)提交给 TaskScheduler。TaskScheduler 负责具体的任务调度,最后在 Worker 节点上启动 Task。

DAGScheduler

  1. DAGScheduler 对 DAG 有向无环图进行Stage划分。
  2. 记录哪个 RDD 或 Stage 输出被缓存。在一个复杂的 Shuffle 之后,通常缓存一下(cache、persist),方便之后的计算。
  3. 重新提交 Shuffle 输出丢失的 Stage(Stage 内部计算出错)给 TaskScheduler。
  4. 将 TaskSet 传给任务调度器:
  • spark-cluster TaskScheduler
  • yarn-cluster YarnClusterScheduler
  • yarn-client YarnClientClusterScheduler

TaskScheduler

  1. 为每一个 TaskSet 构建一个TaskSetManager 实例来管理这个TaskSet 的生命周期。
  2. 根据数据本地性判断运行每个 Task 的最佳位置。
  3. 提交 TaskSet 到集群运行并监控。
  4. 推测执行,遇到计算缓慢任务需要放到其他节点上重试。
  5. 重新提交 Shuffle 输出丢失的 Stage 给 DAGScheduler。

RDD的缓存

Spark 速度快的原因之一就是可以在内存中持久化数据集。如果在应用程序中多次使用同一个 RDD,可以将该 RDD 缓存起来,之后对此 RDD 或衍生出的 RDD 进行操作时可以重用。与RDD相关的持久化和缓存,是 Spark 最重要的特征之一,也是 Spark 构建迭代式算法和快速交互式查询的关键。

注意:缓存不会斩断血缘关系。

使用RDD缓存

RDD通过 persist() 方法或 cache() 方法可以缓存前面的计算结果:

  • cache:只能将数据缓存到内存中。内存缓存不下就不进行缓存,下次执行时直接执行没有缓存到内存中的 Task 即可。
  • persist:可以指定缓存级别。

这两个方法并非在被调用时立即缓存,而要等触发后面的 Action 时才会将该 RDD 缓存在计算节点的内存中。

1
2
3
4
5
val data = sc.textFile(“hdfs://node01:8020/input”)
//使用cache方法,本质上是 data.persist(StorageLevel.MEMORY_ONLY)
data.cache()
//使用persist方法
data.persist(StorageLevel.DISK_ONLY_2)

RDD的缓存级别

缓存级别的说明如下:

Storage Level 说明
MEMORY_ONLY 使用未序列化的 Java 对象格式,将数据保存在内存中。如果内存不够存放所有的数据,则某些分区的数据就不会进行持久化。那么下次对这个 RDD 执行算子操作时,那些没有被持久化的数据,需要从源头处重新计算一遍。这是默认的持久化策略,使用 cache() 方法时,实际就是使用的这种持久化策略。
MEMORY_ONLY_SER 基本含义同 MEMORY_ONLY。唯一的区别是,会将 RDD 中的数据进行序列化,RDD 的每个 partition 会被序列化成一个字节数组。这种方式更加节省内存,从而可以避免持久化的数据占用过多内存导致频繁 GC。
MYMORY_AND_DISK 使用未序列化的 Java 对象格式,优先尝试将数据保存在内存中。如果内存不够存放所有的数据,会将数据写入磁盘文件中,下次对这个 RDD 执行算子时,持久化在磁盘文件中的数据会被读取出来使用。
MEMORY_AND_DISK_SER 基本含义同 MEMORY_AND_DISK。唯一的区别是,会将 RDD 中的数据进行序列化,RDD 的每个 partition 会被序列化成一个字节数组。这种方式更加节省内存,从而可以避免持久化的数据占用过多内存导致频繁 GC。
DISK_ONLY 使用未序列化的Java对象格式,将数据全部写入磁盘文件中。
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. 对于上述任意一种持久化策略,如果加上后缀 _2,代表的是将每个持久化的数据,都复制一份副本,并将副本保存到其他节点上。这种基于副本的持久化机制主要用于进行容错。假如某个节点挂掉,节点的内存或磁盘中的持久化数据丢失了,那么后续对 RDD 计算时还可以使用该数据在其他节点上的副本。如果没有副本的话,就只能将这些数据从源头处重新计算一遍了。
OFF_HEAP(试验性) RDD 的数据序例化之后存储至 Tachyon 。相比于 MEMORY_ONLY_SEROFF_HEAP 能够减少垃圾回收开销、使得 Spark Executor 更“小”更“轻”的同时可以共享内存;而且数据存储于 Tachyon 中,Spark 集群节点故障并不会造成数据丢失,因此这种方式在“大”内存或多并发应用的场景下是很有吸引力的。需要注意的是,Tachyon 并不直接包含于 Spark 的体系之内,需要选择合适的版本进行部署;它的数据是以“块”为单位进行管理的,这些块可以根据一定的算法被丢弃,且不会被重建。

选择缓存级别
MEMORY_ONLY > MEMORY_ONLY_SER > MEMORY_AND_DISK_SER > MEMORY_AND_DISK

默认情况下,性能最高的当然是MEMORY_ONLY,但前提是你的内存必须足够足够大,可以绰绰有余地存放下整个RDD的所有数据。因为不进行序列化与反序列化操作,就避免了这部分的性能开销;对这个RDD的后续算子操作,都是基于纯内存中的数据的操作,不需要从磁盘文件中读取数据,性能也很高;而且不需要复制一份数据副本,并远程传送到其他节点上。但是这里必须要注意的是,在实际的生产环境中,恐怕能够直接用这种策略的场景还是有限的,如果RDD中数据比较多时(比如几十亿),直接用这种持久化级别,会导致JVM的OOM内存溢出异常。

如果使用MEMORY_ONLY级别时发生了内存溢出,那么建议尝试使用MEMORY_ONLY_SER级别。该级别会将RDD数据序列化后再保存在内存中,此时每个partition仅仅是一个字节数组而已,大大减少了对象数量,并降低了内存占用。这种级别比MEMORY_ONLY多出来的性能开销,主要就是序列化与反序列化的开销。但是后续算子可以基于纯内存进行操作,因此性能总体还是比较高的。此外,可能发生的问题同上,如果RDD中的数据量过多的话,还是可能会导致OOM内存溢出的异常。

如果纯内存的级别都无法使用,那么建议使用MEMORY_AND_DISK_SER策略,而不是MEMORY_AND_DISK策略。因为既然到了这一步,就说明RDD的数据量很大,内存无法完全放下。序列化后的数据比较少,可以节省内存和磁盘的空间开销。同时该策略会优先尽量尝试将数据缓存在内存中,内存缓存不下才会写入磁盘。

通常不建议使用DISK_ONLY和后缀为_2的级别:因为完全基于磁盘文件进行数据的读写,会导致性能急剧降低,有时还不如重新计算一次所有RDD。后缀为_2的级别,必须将所有数据都复制一份副本,并发送到其他节点上,数据复制以及网络传输会导致较大的性能开销,除非是要求作业的高可用性,否则不建议使用。

源码分析

查看 StorageLevel 类的源码,看一下 RDD 都有哪些缓存级别:

StorageLevel.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class StorageLevel private(
//数据是否保存到磁盘里面
private var _useDisk: Boolean,
//数据是否保存到内存里面
private var _useMemory: Boolean,
//是否使用堆外内存来缓存数据,避免JVM频繁GC
private var _useOffHeap: Boolean,
//是否反序列化
private var _deserialized: Boolean,
//数据保存的复本数
private var _replication: Int = 1)
extends Externalizable {
......
def useDisk: Boolean = _useDisk
def useMemory: Boolean = _useMemory
def useOffHeap: Boolean = _useOffHeap
def deserialized: Boolean = _deserialized
def replication: Int = _replication
......
}

object StorageLevel {
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(false, false, true, false)
......
}

StorageLevel 类的主构造器包含了 5 个参数:

  1. useDisk:使用硬盘。
  2. useMemory:使用内存。
  3. useOffHeap:使用堆外内存。堆外内存意味着把内存对象分配在Java虚拟机的 Heap 以外的内存,这些内存直接受操作系统管理。这样做的结果就是能保持一个较小的堆,以减少垃圾回收器对应用的影响。
  4. deserialized:反序列化,把字节流恢复为结构化对象。其逆过程是序列化(Serialization),把结构化对象转化为字节流。序列化方式存储对象可以节省磁盘或内存的空间。
  5. replication:备份数(在多个节点上备份)。

根据这五个参数,理解 StorageLevel 的各种缓存级别就不难了:

1
2
//RDD将存储在硬盘和内存中,使用序列化,并且在多个节点上备份2份
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)

cache 方法和 persist 方法的联系

查看源码会发现 cache() 调用了 persist()

RDD.scala
1
2
def cache(): this.type = persist()
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

RDD的检查点

什么是 Checkpoint(检查点)?为什么使用 Checkpoint?
  1. Spark 在生产环境下经常会面临 transformation 的 RDD 非常多transformation 的 RDD 本身计算特别复杂特别耗时,这时就要考虑对计算结果数据持久化保存。
  2. Spark 擅长多步骤迭代,也擅长基于 Job 的复用。如果能够对曾经计算产生的数据进行复用,可以极大的提升效率。
  3. 如果采用 persist 方法把数据缓存在内存中,虽然快速但易失;如果把数据缓存在磁盘上,则没有这个问题。

Checkpoint 的作用就是为了相对缓存而言更加可靠的持久化数据。Checkpoint 可以指定把数据以多复本的方式放在本地;在生产环境中可以把数据放在 HDFS 上,借助 HDFS 天然的高容错、高可靠的特征来实现可靠的持久化数据

Checkpoint 会斩断之前的血缘关系,Checkpoint 后的 RDD 不知道它的父 RDD 了,从 Checkpoint 处直接拿到数据。对于长时间迭代型应用来说,迭代过程出错也不必通过非常长的血缘关系去重建分区数据了。

Checkpoint 机制的原理是什么?
当 RDD 使用**缓存机制**从内存中读取数据,如果没有读到数据,再使用 **Checkpoint 机制**读取数据,此时如果没有 Checkpoint 数据,那么就需要找到父 RDD **重新计算数据**了。

使用 Checkpoint 首先需要调用 sparkContextsetCheckpointDir() 方法,设置一个容错文件系统目录(如 HDFS),然后对RDD调用 checkpoint() 方法。之后在 RDD 所处的 Job 运行结束后,会启动一个单独的 Job 来将 Checkpoint 过的数据写入之前设置的文件系统。

1
2
3
4
5
6
sc.setCheckpointDir("hdfs://node01:8020/ckpdir") 
//设置检查点目录,会立即在HDFS上创建一个空目录
val rdd1 = sc.textFile("hdfs://node01:8020/wordcount/input/words.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
rdd1.checkpoint //对rdd1进行检查点保存
rdd1.collect //Action操作才会真正执行checkpoint
//后续如果要使用到rdd1可以从checkpoint中读取
RDD 的缓存和 Checkpoint 有什么区别?
  1. 存储位置不同:缓存的数据保存在 BlockManager 的内存或磁盘上;Checkpoint 的数据保存在 HDFS 上。
  2. 对血缘的影响不同:缓存的 RDD 的不会丢掉 Lineage,可以通过血缘关系重新计算;Checkpoint 执行完后,已经将当前计算结果安全保存在 HDFS,会斩断 Lineage。
  3. 生命周期不同:缓存的 RDD 会在程序结束后被清除;Checkpoint 保存的 RDD 在程序结束后依然存在于 HDFS。

数据读取与保存

文本文件

读取

1
val input = sc.textFile(inputPath)

注:路径支持通配符;如果传递目录,则将目录下的所有文件读取作为RDD。

将一个文本文件读取为 RDD 时,输入的每一行都会成为 RDD 的一个元素。也可以将多个文本文件一次性读取为一个 pair RDD, 其中 key 是文件名, value 是文件内容。

通过 wholeTextFiles() 对于大量的小文件读取效率比较高,大文件效果没有那么高。

保存

1
result.saveAsTextFile(outputPath)

注:路径作为输出目录,在目录下输出多个文件。

Demo

1
2
3
4
5
6
7
8
9
scala> sc.textFile("./data/wordcount.txt")
res6: org.apache.spark.rdd.RDD[String] = ./README.md MapPartitionsRDD[7] at textFile at <console>:25

scala> val readme = sc.textFile("./data/wordcount.txt")
readme: org.apache.spark.rdd.RDD[String] = ./README.md MapPartitionsRDD[9] at textFile at <console>:24

scala> readme.collect()
res7: Array[String] = Array(# Apache Spark, "", Spark is a fast and general cluster...
scala> readme.saveAsTextFile("hdfs://node01:8020/test")

JSON文件

读取
如果JSON文件中每一行就是一条JSON记录,那么可以通过将JSON文件当做文本文件来读取,然后利用相关的JSON库对每一条数据进行JSON解析。如果JSON数据是跨行的,那么只能读入整个文件,然后对每个文件进行解析。

保存
在输出之前将结构化数据组成的 RDD 转为字符串 RDD,然后使用 Spark 的文本文件 API 写出去,本质还是以文本文件的形式保存。

Demo

/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
scala> import org.json4s._  
import org.json4s._

scala> import org.json4s.jackson.JsonMethods._
import org.json4s.jackson.JsonMethods._

scala> import org.json4s.jackson.Serialization
import org.json4s.jackson.Serialization

scala> var result = sc.textFile("examples/src/main/resources/people.json")
result: org.apache.spark.rdd.RDD[String] = examples/src/main/resources/people.json MapPartitionsRDD[7] at textFile at <console>:47

scala> implicit val formats = Serialization.formats(ShortTypeHints(List()))
formats: org.json4s.Formats{val dateFormat: org.json4s.DateFormat; val typeHints: org.json4s.TypeHints} = org.json4s.Serialization$$anon$1@61f2c1da

scala> result.collect()
res3: Array[String] = Array({"name":"Michael"}, {"name":"Andy", "age":30}, {"name":"Justin", "age":19})

CSV文件

读取
读取 CSV/TSV 数据和读取 JSON 数据相似,都需要先把文件当作普通文本文件来读取数据,然后通过将每一行进行解析实现对CSV的读取。

保存
CSV/TSV 数据的输出也是需要将结构化 RDD 通过相关的库转换成字符串 RDD,然后使用 Spark 的文本文件 API 写出去。

SequenceFile文件

SequenceFile 文件是 Hadoop 用来存储二进制形式的 key-value 对而设计的一种文件格式。

读取
Spark 有专门用来读取 SequenceFile 的接口。在 SparkContext 中调用 sequenceFile[keyClass, valueClass](path)

保存
调用 saveAsSequenceFile(path) 保存 PairRDD。需要 key 和 value 能够自动转为 Writable 类型。

数据类型对比

Demo

1
2
3
4
5
6
7
8
9
10
scala> val data=sc.parallelize(List((2,"aa"),(3,"bb"),(4,"cc"),(5,"dd"),(6,"ee")))
data: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[16] at parallelize at <console>:24

scala> data.saveAsSequenceFile("hdfs://node01:8020/sequdata")

scala> val sdata = sc.sequenceFile[Int,String]("hdfs://node01:8020/sequdata/*")
sdata: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[19] at sequenceFile at <console>:24

scala> sdata.collect()
res14: Array[(Int, String)] = Array((2,aa), (3,bb), (4,cc), (5,dd), (6,ee))

Object文件

对象文件是将对象序列化后保存的文件,采用 Java 的序列化机制。

读取
调用 objectFile[k,v](path) 读取对象文件,返回对应的 RDD。因为要反序列化所以要指定类型。

保存
调用saveAsObjectFile() 保存对象文件。

Demo

1
2
3
4
5
6
7
8
9
10
11
12
13
scala> val data=sc.parallelize(List((2,"aa"),(3,"bb"),(4,"cc"),(5,"dd"),(6,"ee")))
data: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[20] at parallelize at <console>:24

scala> data.saveAsObjectFile("hdfs://node01:8020/objfile")

scala> import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.RDD

scala> val objrdd =sc.objectFile[(Int,String)]("hdfs://node01:8020/objfile/p*")
objrdd: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[28] at objectFile at <console>:25

scala> objrdd.collect()
res20: Array[(Int, String)] = Array((2,aa), (3,bb), (4,cc), (5,dd), (6,ee))

Hadoop

Spark 的整个生态系统与 Hadoop 完全兼容,对于 Hadoop 所支持的文件类型或者数据库类型,Spark 也同样提供支持。由于 Hadoop 的 API 有新旧两个版本,所以 Spark 也提供了两套接口。

读取
对于读取 Hadoop 存储而言,hadoopRDDnewHadoopRDD 是最为抽象的两个函数接口,主要包含以下四个参数:

  • 输入格式:指定数据输入的类型,如 TextInputFormat 等。
  • 新版:org.apache.hadoop.mapred.InputFormat
  • 旧版:org.apache.hadoop.mapreduce.InputFormat(NewInputFormat)
  • 键类型:指定[K,V]键值对中K的类型。
  • 值类型:指定[K,V]键值对中V的类型。
  • 分区值:指定由外部存储生成的 RDD 的 Partition 数量的最小值,如果没有指定,系统会使用默认值 defaultMinSplits

其他创建操作的API接口都是为了方便最终的Spark程序开发者而设置的,是这两个接口的高效实现版本。例如,对于 textFile 而言,只需指定 path 这个文件路径的参数,其他参数在系统内部有默认值。

兼容旧版HadoopAPI的读取操作

文件路径 输入格式 键类型 值类型 分区值
textFile(path:String,minPartitions:Int=defaultMinPartitions) path TextInputFormat LongWritable Text minSplits
hadoopFile[K,V,F <: InputFormat[K,V]] (path:String,minPartitions:Int)(implicit km:ClassTag[K],vm:ClassTag[V],fm:ClassTag[F]):RDD[(K,V)] path F K V minSplits
hadoopFile[K,V,F <: [K,V]] (path:String)(implicit km:ClassTag[K],vm:ClassTag[V],fm:ClassTag[F]):RDD[(K,V)] path F K V DefaultMinSplits
hadoopFile[K,V] (path:String,inputFormatClass:Class[_ <: InputFormat[K,V]], keyClass:Class[K],valueClass:Class[V],minPartitions:Int=defaultMinPartitions):RDD[(K,V)] path inputFormatClass keyClass valueClass defaultMinPartitions
hadoopRDD[K,V] (conf:JobConf,inputFormatClass:Class[_ <: InputFormat[K, V]], keyClass:Class[K],valueClass:Class[V],minPartitions:Int=defaultMinPartitions):RDD[(K, V)] n/a inputFormatClass keyClass valueClass defaultMinPartitions
sequenceFile[K,V] (path:String,minPartitions:Int=defaultMinPartitions)(implicit km:ClassTag[K],vm:ClassTag[V],kcf:()⇒WritableConverter[K], vcf:()⇒WritableConverter[V]):RDD[(K,V)] path SequenceFileInputFormat[K,V] K V defaultMinPartitions
objectFile[T] (path:String,minPartitions:Int=defaultMinPartitions)(implicit arg0:ClassTag[T]):RDD[T] path SequenceFileInputFormat[NullWritable, BytesWritable] NullWritable BytesWritable minSplits

兼容新版HadoopAPI的读取操作

文件路径 输入格式 键类型 值类型 分区值
newAPIHadoopFile[K,V,F <: InputFormat[K,V]] (path:String,fClass:Class[F],kClass:Class[K],vClass:Class[V],conf:Configuration=hadoopConfiguration):RDD[(K,V)] path F K V n/a
newAPIHadoopFile[K,V,F <: InputFormat[K,V]] (path:String)(implicit km:ClassTag[K],vm:ClassTag[V],fm:ClassTag[F]):RDD[(K,V)] path F K V n/a
newAPIHadoopRDD[K,V,F <: InputFormat[K,V]] (conf:Configuration=hadoopConfiguration,fClass:Class[F],kClass:Class[K],vClass:Class[V]):RDD[(K,V)] n/a F K V n/a

保存
将 RDD 保存到 HDFS 中,通常需要设置以下五个参数:

  • 输出路径:指定数据输出的路径。
  • 键类型:指定[K,V]键值对中K的类型。
  • 值类型:指定[K,V]键值对中V的类型。
  • 输出格式:RDD 的输出格式,如 TextOutputFormatSequenceFileOutputFormat
  • codec:数据存储的压缩格式,如 DefaultCodecGzip 等。

兼容旧版HadoopAPI的保存操作
前六个 API 都是 saveAsHadoopDataset 的简易实现,仅支持将 RDD 存储到 HDFS;saveAsHadoopDataset 的参数类型是 JobConf,不仅能够将 RDD 存储到 HDFS 中,也可以将 RDD 存储到其他数据库中,如 HBase、MangoDB 等。

1
2
3
4
5
6
7
saveAsObjectFile(path:String): Unit
saveAsTextFile(path:String, codec: Class[_ <: CompressionCodec]): Unit
saveAsTextFile(path:String): Unit
saveAsHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassTag[F]): Unit
saveAsHadoopFile[F <: OutputFormat[K, V]](path: String, codec: Class[_ <: CompressionCodec])(implicit fm: ClassTag[F]): Unit
saveAsHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], codec: Class[_ <: CompressionCodec]): Unit
saveAsHadoopDataset(conf: JobConf): Unit

兼容新版HadoopAPI的保存操作
前两个 API 是 saveAsNewAPIHadoopDataset 的简易实现,仅支持将 RDD 存储到 HDFS;saveAsNewAPIHadoopDataset 可以将 RDD 输出到任何 Hadoop 支持的存储系统。新版 API 没有 codec 的参数,所以要压缩文件需要使用 hadoopConfiguration 参数,设置对应 mapreduce.map.output.compress.codec 参数和 mapreduce.map.output.compress 参数。

1
2
3
saveAsNewAPIHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], conf: Configuration = self.context.hadoopConfiguration): Unit
saveAsNewAPIHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassTag[F]): Unit
saveAsNewAPIHadoopDataset(conf: Configuration): Unit

Demo
读取示例:

1
2
3
4
5
6
7
scala> import org.apache.hadoop.io._
import org.apache.hadoop.io._

scala> val data = sc.parallelize(Array((30,"hadoop"), (71,"hive"), (11,"cat")))
data: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[47] at parallelize at <console>:35

scala> data.saveAsNewAPIHadoopFile("hdfs://node01:8020/output4/",classOf[LongWritable] ,classOf[Text] ,classOf[org.apache.hadoop.mapreduce.lib.output.TextOutputFormat[LongWritable, Text]])

写入示例:

1
2
3
4
5
scala> val read =  sc.newAPIHadoopFile[LongWritable, Text, org.apache.hadoop.mapreduce.lib.input.TextInputFormat]("hdfs://node01:8020/output4/part*", classOf[org.apache.hadoop.mapreduce.lib.input.TextInputFormat], classOf[LongWritable], classOf[Text])
read: org.apache.spark.rdd.RDD[(org.apache.hadoop.io.LongWritable, org.apache.hadoop.io.Text)] = hdfs://node01:8020/output4/part* NewHadoopRDD[48] at newAPIHadoopFile at <console>:35

scala> read.map{case (k, v) => v.toString}.collect
res44: Array[String] = Array(30 hadoop, 71 hive, 11 cat)

数据库

MySQL

支持通过 JDBC 访问关系型数据库。需要通过 JdbcRDD 进行数据读取和写入。

读取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def main(args: Array[String]) {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("JdbcApp")
val sc = new SparkContext(sparkConf)

val rdd = new org.apache.spark.rdd.JdbcRDD(sc, () => {
Class.forName("com.mysql.jdbc.Driver").newInstance()
java.sql.DriverManager.getConnection("jdbc:mysql://localhost:3306/rdd", "root", "123456")
}, "select * from rddtable where id >= ? and id <= ?;", //SQL 语句,必须有两个占位符
1, 10, //占位符对应的值
1, //Partition 的个数
r => (r.getInt(1), r.getString(2)))

println(rdd.count())
rdd.foreach(println(_))
sc.stop()
}

写入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def main(args: Array[String]) {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("HBaseApp")
val sc = new SparkContext(sparkConf)
val data = sc.parallelize(List("Female", "Male", "Female"))

data.foreachPartition(insertData)
}

def insertData(iterator: Iterator[String]): Unit = {
Class.forName("com.mysql.jdbc.Driver").newInstance()
val conn = java.sql.DriverManager.getConnection("jdbc:mysql://localhost:3306/rdd", "root", "123456")
iterator.foreach(data => {
val ps = conn.prepareStatement("insert into rddtable(name) values (?)")
ps.setString(1, data)
ps.executeUpdate()
})
}

Cassandra数据库和ElasticSearch集成

项目页面位置

HBase

由于 org.apache.hadoop.hbase.mapreduce.TableInputFormat 类的实现,Spark 可以通过 Hadoop 输入格式访问 HBase。这个输入格式会返回键值对数据,其中键的类型为 org. apache.hadoop.hbase.io.ImmutableBytesWritable,而值的类型为 org.apache.hadoop.hbase.client.Result

导入依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.2.0-cdh5.14.0</version>
</dependency>

<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.2.0-cdh5.14.0</version>
</dependency>

读取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def main(args: Array[String]) {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("HBaseApp")
val sc = new SparkContext(sparkConf)

val conf = HBaseConfiguration.create()
//HBase中的表名
conf.set(TableInputFormat.INPUT_TABLE, "fruit")

val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])

val count = hBaseRDD.count()
println("hBaseRDD RDD Count:"+ count)
hBaseRDD.cache()
hBaseRDD.foreach {
case (_, result) =>
val key = Bytes.toString(result.getRow)
val name = Bytes.toString(result.getValue("info".getBytes, "name".getBytes))
val color = Bytes.toString(result.getValue("info".getBytes, "color".getBytes))
println("Row key:" + key + " Name:" + name + " Color:" + color)
}
sc.stop()
}

写入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def main(args: Array[String]) {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("HBaseApp")
val sc = new SparkContext(sparkConf)

val conf = HBaseConfiguration.create()
val jobConf = new JobConf(conf)
jobConf.setOutputFormat(classOf[TableOutputFormat])
jobConf.set(TableOutputFormat.OUTPUT_TABLE, "fruit_spark")

val fruitTable = TableName.valueOf("fruit_spark")
val tableDescr = new HTableDescriptor(fruitTable)
tableDescr.addFamily(new HColumnDescriptor("info".getBytes))

val admin = new HBaseAdmin(conf)
if (admin.tableExists(fruitTable)) {
admin.disableTable(fruitTable)
admin.deleteTable(fruitTable)
}
admin.createTable(tableDescr)

def convert(triple: (Int, String, Int)) = {
val put = new Put(Bytes.toBytes(triple._1))
put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(triple._2))
put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("price"), Bytes.toBytes(triple._3))
(new ImmutableBytesWritable, put)
}
val initialRDD = sc.parallelize(List((1,"apple",11), (2,"banana",12), (3,"pear",13)))
val localData = initialRDD.map(convert)

localData.saveAsHadoopDataset(jobConf)
}

程序示例

访问日志分析

需求

根据访问日志 access.log,统计 PV、UV 和访问次数 Top10 的IP。日志数据格式如下:

access.log
1
194.230.140.21 - - [18/Sep/2017:06:49:18 +0000] "GET /content/uploads/2017/07/active.html HTTP/1.1" 304 0 "-" "Mozilla/4.0 (compatible;)"

导入依赖

pom.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
<properties>
<scala.version>2.11.8</scala.version>
<spark.version>2.2.0</spark.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.5</version>
</dependency>

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>

</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass></mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

统计PV

PV(Page View),即页面点击量,用户每次对网站中的每个网页访问均被记录1次PV。用户对同一页面的多次访问,访问量累计。因此日志每条记录可以视为一次PV,统计日志记录的数量便可得到PV。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
object PV {
def main(args: Array[String]): Unit = {
//创建 SparkContext 对象,设置 AppName 和 Master
val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)

//读取数据,给每一行数据打上标记1,统计总和
val file: RDD[String] = sc.textFile("E:\\input\\access.log")
val pvWithMark: RDD[(String, Int)] = file.map(_ => ("pv", 1))
val totalPv: RDD[(String, Int)] = pvWithMark.reduceByKey(_ + _)

//聚合输出
totalPv.foreach(println(_))
sc.stop()
}
}

统计UV

UV(Unique Visitor),即浏览这个网页的访客人数。每个用户对网站的多次访问只被记录1次UV,一般用IP地址标识用户。因此需要对每条日志记录的内容进行分析,得到每条记录的IP,并根据IP去重,得到UV。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
object UV {
/**
* 194.230.140.21 - - [18/Sep/2017:06:49:18 +0000] "GET /content/uploads/2017/07/active.html HTTP/1.1" 304 0 "-" "Mozilla/4.0 (compatible;)"
*/
def main(args: Array[String]): Unit = {
//创建 SparkContext 对象,设置 APPName 和 Master
val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)

//读取数据,切分后根据IP去重,给每一条数据打上标记1,统计总和
val file: RDD[String] = sc.textFile("E:\\input\\access.log")
val ips: RDD[String] = file.map(_.split(" ")(0))
val uvWithMark: RDD[(String, Int)] = ips.distinct().map(_ => ("UV", 1))
val totalUv: RDD[(String, Int)] = uvWithMark.reduceByKey(_ + _)

//聚合输出
totalUv.foreach(println(_))
sc.stop()
}
}

统计Top10

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
object Top10 {
def main(args: Array[String]): Unit = {
//创建 SparkContext 对象,设置 APPName 和 Master
val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)

//读取数据,切分后给每一条数据打上标记1,根据IP聚合,统计总和后排序取Top10
val file: RDD[String] = sc.textFile("E:\\input\\access.log")
val ipWithMark: RDD[(String, Int)] = file.map(_.split(" ")(0)).map(x => (x, 1))
val result: Array[(String, Int)] = ipWithMark.reduceByKey(_ + _).sortBy(_._2, false).take(10)

result.foreach(println(_))
sc.stop()
}
}

访客地理位置分析

需求

通过城市热力图可以直观地展示访客所在的地理位置,需要使用 Spark 根据 IP 地址计算出对应的经纬度。

城市热力图

访问日志:

access.log
1
194.230.140.21 - - [18/Sep/2017:06:49:18 +0000] "GET /content/uploads/2017/07/active.html HTTP/1.1" 304 0 "-" "Mozilla/4.0 (compatible;)"

IP段对应的经纬度:

ip-city.txt
1
114.218.107.0|114.218.107.255|1926916864|1926917119|亚洲|中国|江苏|苏州|昆山|电信|320583|China|CN|120.98074|31.38464

广播变量

根据 ip-city.txt 可以得到IP地址库数据集,该数据集会被其他数据集使用,适合作为广播变量

广播变量可以用来高效分发较大的对象,由 Driver 端定义,在 Executor 端使用。Task 想要使用广播变量中的数据时,会从当前 Executor 节点对应的 BlockManager 中尝试获取变量副本,如果没有就从 Driver 端远程拉取并保存在 BlockManager。广播变量是被 Executor 节点上的所有 Task 共用的,不用每个 Task 携带一份,节省了通信的成本和服务器的资源。

广播变量使用方法如下:

  • 对象 T 调用 SparkContext.broadcast 创建出一个 Broadcast[T] 对象。
  • 通过 value 属性获得该对象的值。
  • 变量只会被发到各个节点一次,应作为只读值处理。
1
2
3
4
5
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(35)

scala> broadcastVar.value
res1: Array[Int] = Array(1, 2, 3)

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
object IPtoLocation {
/**
* 根据IP地址查找对应的经纬度信息
* 查找到经纬度信息以后,返回该经纬度对应的访问次数
*/
def main(args: Array[String]): Unit = {
//创建 SparkContext 对象,设置 APPName 和 Master
val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)

//读取城市IP地址库信息(起始IP, 结束IP, 经度, 纬度)
val ipLib: RDD[(String, String, String, String)] = sc.textFile("E:\\input\\ip-city.txt").map(_.split("\\|")).map(x => (x(2), x(3), x(x.length - 2), x(x.length - 1)))
//读取运营商日志信息,ip
val logs: RDD[String] = sc.textFile("E:\\input\\access.log").map(_.split(" ")(0))

//将要城市IP地址库信息收集到Driver端进行广播
val ipLibBroadCast: Broadcast[Array[(String, String, String, String)]] = sc.broadcast(ipLib.collect())

//遍历运营商日志数据,获取每一个IP地址,并转换为十进制
val locationWithMark: RDD[((String, String), Int)] = logs.mapPartitions(iter => {
//获取广播变量中的值
val ipLibValue: Array[(String, String, String, String)] = ipLibBroadCast.value
//遍历迭代器,获取每一个IP地址
iter.map(ip => {
//将IP地址转换为十进制
val ipNum = ip2Long(ip)
//二分法查找IP地址在IP地址库中的索引
val index: Int = binarySearch(ipNum, ipLibValue)
//对每次出现的经纬度信息标记为1并返回
if (index == -1) {
(("", ""), 0)
} else {
((ipLibValue(index)._3, ipLibValue(index)._4), 1)
}
})
})
//统计相同经纬度出现的总次数
val count: RDD[((String, String), Int)] = locationWithMark.filter(_._2 != 0).reduceByKey(_ + _)
count.foreach(println(_))
//val sorted: RDD[((String, String), Int)] = count.sortBy(_._2, false)
//sorted.foreach(println(_))

sc.stop()
}

//将IP地址转换为long类型
def ip2Long(ip: String): Long = {
val ips = ip.split("\\.")
var ipNum = 0L
//遍历数组
for (i <- ips) {
ipNum = i.toLong | ipNum << 8L
}
ipNum
}

//通过二分查找,找到long类型数字在广播变量数组中的下标
def binarySearch(ipNum: Long, broadcastValue: Array[(String, String, String, String)]): Int = {
var start = 0
var end = broadcastValue.length - 1
while (start <= end) {
val middle = (start + end) / 2
if (ipNum >= broadcastValue(middle)._1.toLong && ipNum <= broadcastValue(middle)._2.toLong) {
return middle
}
if (ipNum < broadcastValue(middle)._1.toLong) {
end = middle - 1
}
if (ipNum > broadcastValue(middle)._2.toLong) {
start = middle + 1
}
}
-1
}
}

参考资料
Spark 2.2.x 中文文档 - RDD 编程指南
Understand RDD Operations: Transformations and Actions 作者:Khoa Nguyen
Spark学习之路 (三)Spark之RDD 作者:扎心了,老铁
coalesce 与 repartition的区别 作者:姜小嫌
Spark性能优化指南——基础篇 作者:李雪蕤(美团技术团队)
Spark性能优化指南——高级篇 作者:李雪蕤(美团技术团队)