RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是 Spark 中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。RDD具有数据流模型的特点:自动容错、位置感知性调度和可伸缩性。RDD允许用户在执行多个查询时显式地将数据缓存在内存中,后续的查询能够重用这些数据,这极大地提升了查询速度。
理解RDD
- Resilient:数据存储可以在内存和磁盘自动切换;节点故障或分区的损坏而丢失的数据可以在血缘(Lineage)的帮助下自动恢复;计算出错可以重试;可以根据需要重新分片;
- Distributed:数据是分布式存储的,可用于分布式计算;
- Dataset:封装了计算逻辑的数据集合,本身不存放数据,但操作起来像本地集合一样。
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是 Spark 中最基本的数据抽象,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合,它由分区组成,本身不存放数据,只存放数据的引用,但操作 RDD 就像操作本地集合一样。RDD允许用户在执行多个查询时显式地将数据缓存在内存中,后续的查询能够重用这些数据,这极大地提升了查询速度。
Hadoop 的 MapReduce 是一种基于数据集的工作模式,这种工作模式一般是从存储上加载数据集,然后操作数据集,最后写入物理存储设备,数据更多面临的是一次性处理,这种方式对迭代式的算法和交互式数据挖掘这两种数据领域常见的操作不是很高效Spark 应运而生。RDD 是基于工作集的工作模式,更多的是面向工作流。
RDD的属性
在 Spark 源码的注释中,阐明了每个 RDD 都有五个主要属性:
1 | /** |
分区列表
一个 Partition(分区)列表。Partition 即数据集的基本组成单位,每个 Partition 都会被一个计算任务处理,Partition 的数量决定计算的并行度。
1 | /** |
- 文件在 HDFS 上以 block(块)存储;
- Spark 根据数据格式对应的
InputFormat
将文件解析为若干个 InputSplit(输入分片),如果文件支持切分,split 的数量等于 block 的数量,如果文件不支持切分(如gz
格式)就只有一个 split; - 每个 split 对应 RDD 的一个 Partition(分区),每个分区对应一个 Task(计算任务)。每个 Task 都会被分配给某个 Executor 的某个 core 去执行;
计算函数
一个对每个 split(分片)进行计算的函数:
1 | /** |
- RDD 的计算是以分区为单位的,每个 RDD 都会实现
compute
函数。如果 RDD 是通过已有的文件系统构建的,则compute
函数读取指定文件系统中的数据;如果 RDD 是通过其他 RDD 转换而来,则compute
函数执行转换逻辑,对其他 RDD 的数据进行转换。该函数会对迭代器进行复合,不需要保存每次计算的结果。 - RDD 是只读的,要想改变 RDD 中的数据,只能在现有的 RDD 基础上创建新的 RDD。由一个 RDD 转换到另一个 RDD,可以通过丰富的操作算子实现,不像 MapReduce 那样只能写 map 和 reduce。
依赖关系
RDD 之间的依赖关系。RDD 通过操作算子进行转换,转换得到的新 RDD 包含了从其他 RDD 衍生所必需的信息,RDD 之间维护着这种依赖关系,称之为血缘。当部分分区数据丢失时,Spark 可以通过这个依赖关系重新计算丢失的分区数据,而不用对 RDD 的所有分区重新进行计算。
1 | /** |
分区函数
key-value
型 RDD 的 Partitioner(分区函数)。当前 Spark 中实现了两种类型的分区函数,一个是基于哈希的 HashPartitioner
,另外一个是基于范围的 RangePartitioner
。只有对于 key-value
型的 RDD,才会有 Partitioner,非 key-value
型的 RDD 的 Parititioner 值是 None
。Partitioner 函数不但决定了 RDD 本身的分区数量,也决定了父 RDD 在 Shuffle 时输出的分区数量。
1 | /** Optionally overridden by subclasses to specify how they are partitioned. */ |
优先位置列表
一个存取每个 split 的 preferred location(优先位置)列表。对于 HDFS 文件来说,这个列表保存的就是每个 Partition 所在的 block 的位置。根据“移动数据不如移动计算”的理念,Spark 在进行任务调度的时候,会尽可能地将计算任务分配到存储要处理数据块所在的节点。
1 | /** |
查看不同格式的文件对应的 Partition 数量
1 | scala> val rdd1=sc.textFile("hdfs://node01:8020/input/bigFile") |
RDD的作用
在执行 Spark 计算任务时,首先需要申请资源,然后将应用程序的数据处理逻辑划分成一个个计算任务,之后将任务分发到分配有资源的计算节点上进行计算,最后得到计算结果。在 Yarn 环境中 Spark 程序的运行流程如下:
- 启动 Yarn 集群环境:
- Spark 申请资源,选择一些 NodeManager 创建调度节点和计算节点:
- Spark 框架根据需求将计算逻辑根据分区划分成不同的任务:
- 调度节点根据计算节点状态将任务发送到计算节点进行计算:
RDD 是 Spark 用于数据处理的核心模型,从上面的流程中可以看出 RDD 主要作用是将计算逻辑进行封装,生成 Task 发送给 Executor 节点执行计算。
创建RDD
创建 RDD 有三种方式:
- 通过已有的集合创建:
1 | // 使用 parallelize() 根据集合创建 RDD |
- 通过读取文件创建,支持本地文件系统、HDFS、HBase 等:
1 | // 从本地文件系统中创建 RDD |
- 通过已有的 RDD 经过算子转换,生成新的 RDD:
1 | val rdd5 = rdd4.flatMap(_.split(" ")) |
算子
RDD的算子分为两类:
- Transformation:转换算子,根据 RDD 创建一个新的 RDD。这类算子都是延迟加载的,它们不会直接执行计算,只是记住转换动作。只有要求返回结果给 Driver 时,这些转换才会真正运行。这种设计让 Spark 的运行更加高效。
- Action:行动算子,对 RDD 计算后返回一个结果给 Driver,会直接执行计算。
Transformation
转换算子 | 说明 |
---|---|
map(f: T => U) | 对每个元素执行函数计算,返回MappedRDD[U] 。 |
flatmap(f: T => TraversableOnce[U]) | 首先对每个元素执行函数计算,然后将结果展平,返回FlatMappedRDD[U] 。 |
filter(f: T => Boolean) | 保留函数计算结果为true 的元素,返回FilteredRDD[T] 。 |
mapPartitions(Iterator[T] => Iterator[U]) | 对每个分区执行函数计算,返回MapPartitionsRDD[U] 。 |
sample(withReplacement, fraction, seed) | 简单随机抽样,返回采样子集PartitionwiseSampledRDD[T] 。withReplacement 表示是抽出的数据是否放回,fraction 表示样本数量,seed 表示随机数发生器种子。 |
union(otherRdd[T]) | 对源RDD和参数RDD求并集,返回UnionRDD[T] 。 |
intersection(otherRdd[T]) | 对源RDD和参数RDD求交集,返回MapPartitionsRDD[T] 。 |
distinct([numTasks]) | 对源RDD进行去重后,返回新的RDD。numTasks 表示并行任务的数量,默认为8。 |
partitionBy() | 对RDD进行分区操作,如果原有的partionRDD和现有的partionRDD数量一致就不进行分区,否则会生成ShuffledRDD 。 |
reduceByKey(f: (V, V) => V) | 在(K,V)类型的RDD上调用,使用指定的reduce函数将key相同的value聚合到一起,返回ShuffledRDD[(K, V)] 。同一台机器上的键值对在移动前会先组合,适合大数据复杂计算。 |
groupByKey() | 在(K,V)类型的RDD上调用,同样对每个key进行操作,但只生成一个sequence,返回ShuffledRDD[([K], Iterable[V])] 。所有的键值对都会被移动,网络开销大,不适合大数据复杂计算。 |
sortByKey([ascending], [numTasks]) | 在(K,V)类型的RDD上调用,K必须实现Ordered 接口,返回一个按照key进行排序的(K,V)的RDD。 |
sortBy(func,[ascending], [numTasks]) | 与sortByKey 类似,但是更灵活,可以用func 函数先对数据进行处理,按照处理后的数据比较结果排序。 |
join(otherDataset, [numTasks]) | 在(K,V)和(K,W)类型的RDD上调用,返回(K,(V,W))类型的RDD,相同key对应的所有元素对在一起。 |
cogroup(otherDataset, [numTasks]) | 在(K,V)和(K,W)类型的RDD上调用,返回(K,(Iterable |
coalesce(numPartitions,[shuffle]) | 将 RDD 的分区数减少到 numPartitions 。当以后大数据集被过滤成小数据集后,减少分区数,可以提升效率。第二个参数 shuffle 为 Boolean 类型,默认为 false。 |
repartition(numPartitions) | 将RDD数据重新混洗(reshuffle)并随机分布到新的分区中,使数据分布更均衡,新的分区个数取决于 numPartitions 。该算子总是需要通过网络混洗所有数据。底层是调用 coalesce(numPartitions, shuffle = true) 。 |
Action
行动算子 | 说明 |
---|---|
reduce(func) | 通过func函数聚合RDD中的所有元素,这个功能必须是可交换且可并联的。 |
collect() | 在 Driver 中,以数组的形式返回数据集的所有元素。 |
count() | 返回RDD的元素个数。 |
first() | 返回RDD的第一个元素,类似于 take(1)。 |
take(n) | 返回一个由数据集的前n个元素组成的数组。 |
takeSample(withReplacement, num, [seed]) | 返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否用随机数替换不足的部分,withReplacement 表示是抽出的数据是否放回,num 表示样本数量,seed 表示随机数发生器种子。 |
takeOrdered(n, [ordering]) | 返回前几个的排序。 |
aggregate(zeroValue: U)(seqOp: (U,T) ⇒ U, combOp: (U, U) ⇒ U) | 先将每个分区里面的元素通过seqOp 函数和初始值进行聚合,然后用combOp 函数将每个分区的结果和初始值zeroValue 进行combine操作。这个函数最终返回的类型不需要和RDD中元素类型一致。 |
fold(num)(func) | aggregate的简化操作,seqOp 和combOp 一样,没有初始值。 |
saveAsTextFile(path) | 将数据集中的元素以textfile的形式保存到指定的目录下。Spark 会调用toString() 方法转换每个元素。 |
saveAsSequenceFile(path) | 将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下。 |
saveAsObjectFile(path) | 将数据集中的元素序列化成对象,存储到文件中。 |
countByKey() | 对(K,V)类型的RDD,返回一个(K,Int)的Map ,表示每个key对应的元素个数。 |
foreach(f: T => Unit) | 为数据集的每一个元素调用func 函数进行处理。 |
foreachPartition(f: Iterator[T] => Unit) | 为数据集的每一个分区调用func 函数进行处理。可以用来处理数据库连接等操作。 |
使用经验
reduceByKey
性能更好。reduceByKey
会在 map 端先进行本地 combine,可以大大减少要传输到 reduce 端的数据量,减小网络传输的开销。groupByKey
不会进行本地聚合,只是原封不动的把 ShuffleMapTask 的输出拉取到 ResultTask 的内存中,所有的数据都要进行网络传输,从而导致网络传输的性能开销很大。
只有在 reduceByKey
处理不了时,才用 groupByKey().map()
来替代。
1 | repartition(numPartitions:Int):RDD[T] |
repartition
会创建新的 partition 并使用 full shuffle,因此它是一个消耗昂贵的算子。Spark 还提供了一个经过优化的算子叫作 coalesce
,它会利用已有的 partition 来尽量减少数据 shuffle,尽量避免数据迁移。而 repartition
相当于 coalesce
中参数 shuffle
为 true
的简易实现。
使用时如何选择呢:
- 如果需要稍微减少分区,可以使用
coalesce
(用到coalesce
的场景默认其 shuffle 参数为 false),它会利用已有的 partition 来减少 shuffle,可以尽量避免数据迁移; - 如果要增加分区或减少分区到特别少甚至到 1,可以使用
repartition
或coalesce(1, true)
,避免 Spark 程序因为并行度不够而影响性能。
假设源 RDD 有 N 个分区,需要重新划分成 M 个分区:
- 如果 N < M。一般情况下 N 个分区有数据分布不均匀的状况,利用
HashPartitioner
函数将数据重新分区为 M 个,这时需要将 shuffle 设置为true
。在 shuffle 为false
的情况下,coalesce
为无效的,不进行 shuffle 过程,父 RDD 和子 RDD 之间是窄依赖关系。 - 如果 N > M 并且 N 和 M 相差不多(假如 N = 1000,M = 100),那么就可以将 N 个分区中的若干个分区合并成一个新的分区,最终合并为 M 个分区,这时可以将 shuffle 设置为
false
。 - 如果 N > M 并且两者相差悬殊,这时如果将 shuffle 设置为
false
,父子 RDD 是窄依赖关系,他们同处在一个 Stage 中,就可能造成 Spark 程序的并行度不够,从而影响性能。如果在 M = 1 的时候,为了使coalesce
之前的操作有更好的并行度,可以将 shuffle 设置为true
。
注:如果 shuffle 为 false
时,如果传入的参数大于现有的分区数目,RDD 的分区数不变,也就是说不经过 shuffle,是无法将 RDD 的分区数变多的。
- map:一次处理一个元素的数据
- mapPartitions:一次处理一批数据
mapPartitions 的优缺点:
- 优点:速度快,一次处理一批数据,即一次接收所有的 partition 数据,在 map 过程中需要频繁创建额外的对象(例如将 RDD 中的数据通过 jdbc 写入数据库,map 需要为每个元素创建一个连接,而 mapPartitions 为每个 partition 创建一个链接),则 mapPartitions 效率比 map 高的多。
- 缺点:容易出现内存溢出,当接收的 partition 的数据量较大时,例如 100 万数据, 一次传入一个 function 以后,那么可能一下子内存不够,但是又没有办法去腾出内存空间来,可能就导致 OOM;而 map 一般较少出现内存溢出。
依赖关系
依赖类型
RDD和它的父RDD的依赖关系有两种类型:窄依赖和宽依赖。
窄依赖(narrow dependency)指的是父 RDD 的每个 Partition 最多被子 RDD 的一个 Partition 使用,如 map
、filter
、union
、mapPartitions
等操作都会产生窄依赖。
宽依赖(wide dependency)指的是父 RDD 的每个 Partition 都有可能被子 RDD 的多个 Partition 使用,如 groupByKey
、reduceByKey
、sortByKey
等操作都会产生宽依赖。
join 操作既可能是窄依赖也可能是宽依赖
图左的join:两个RDD在进行join操作时,子RDD依赖于两个父RDD,但每个父RDD的每个 Partition 只被子RDD 的一个 Partition 使用,所以是窄依赖的join(join with inputs co-partitioned)。
图右的join:两个RDD在进行join操作时,子RDD的多个 Partition 依赖于父RDD 的同一个 Partition,所以是宽依赖的join(join with inputs not co-partitioned)。子RDD的 Partition 需要父RDD的所有 Partition 进行 join 的转换,涉及到了 shuffle。
Lineage
为了进行容错,RDD 通过所谓的血缘(Lineage)记住了它是如何从其它 RDD 转换过来的。RDD的 Lineage 记录的是粗粒度的,只记录单个块上执行的单个操作。当 RDD 的部分分区数据丢失时,它可以根据 RDD 的元数据信息和转换行为来重新运算和恢复丢失的数据分区。
DAG的生成
DAG(Directed Acyclic Graph)叫有向无环图,“有向”指有方向,“无环”指不能有闭环。DAG 描述了多个 RDD 之间的转换关系,RDD 和 RDD 之间是存在依赖关系的,子 RDD 记录了父 RDD 的引用,最后的 RDD 触发 Action。一个 DAG 就是一个 job。
根据 RDD 之间依赖关系的不同将 DAG 划分成不同的 Stage(调度阶段):
- 窄依赖:Partition 依赖关系是明确的,转换处理可以在同一个线程里完成,窄依赖就被Spark划分到同一个 Stage。
- 宽依赖:父 RDD 在 Shuffle 完成后,才能开始接下来的计算。宽依赖是划分 Stage 的依据。
为了并行计算。一个复杂的业务逻辑如果有 shuffle,那么就意味着上一个阶段产生结果后,才能执行下一个阶段,即下一个阶段的计算要依赖上一个阶段的结果。按照 shuffle 进行划分(也就是按照宽依赖就行划分),就可以将一个 DAG 划分成多个 Stage。在同一个 Stage 中,会有多个算子操作,可以形成一个 pipeline 流水线,流水线内的多个平行的分区可以并行执行。
- Spark 划分 Stage 的整体思路是:从后往前回溯,遇到宽依赖就断开,划分为一个 Stage,遇到窄依赖就将这个 RDD 加入该 Stage 中。
- 每个 Stage 里面 Task 的数量是由该 Stage 中最后一个 RDD 的 Partition 的数量所决定的。
- 最后一个 Stage 里面的任务类型是 ResultTask,前面其他所有的 Stage 的任务类型是 ShuffleMapTask。
- 代表当前 Stage 的算子一定是该 Stage 的最后一个计算步骤。
Shuffle的过程
宽依赖之间会划分 Stage,而 Stage 之间就是 Shuffle。
Spark 中负责 Shuffle 过程的执行、计算和处理的组件主要是 ShuffleManager。ShuffleManager 随着 Spark 的发展有两种实现的方式,分别为 HashShuffleManager 和 SortShuffleManager,因此 Spark 的 Shuffle 有 Hash-Based Shuffle 和 Sort-Based Shuffle 两种。
- Hash-Based Shuffle:过时的,会产生大量的中间磁盘文件,影响性能。是 Spark 1.2 之前的策略。
- Sort-Based Shuffle:每个 Task 会将所有的临时文件合并成一个磁盘文件,又分为普通机制和byPass机制:
- 普通机制:会对数据进行排序,造成 Shuffle 的速度稍慢。
- byPass机制:不会对数据进行排序,执行效率较高。
两种机制的介绍详见: spark之shuffle原理及性能优化
任务调度
任务调度流程图
各个 RDD 之间的依赖关系形成DAG(有向无环图),DAGScheduler 对这些依赖关系形成的 DAG 进行 Stage 划分:从后往前回溯,遇到宽依赖就断开,划分为一个 Stage,遇到窄依赖就将这个 RDD 加入该 Stage中。完成了 Stage 的划分,DAGScheduler 基于每个 Stage 生成 TaskSet,并将 TaskSet(一组Task)提交给 TaskScheduler。TaskScheduler 负责具体的任务调度,最后在 Worker 节点上启动 Task。
DAGScheduler
- DAGScheduler 对 DAG 有向无环图进行Stage划分。
- 记录哪个 RDD 或 Stage 输出被缓存。在一个复杂的 Shuffle 之后,通常缓存一下(cache、persist),方便之后的计算。
- 重新提交 Shuffle 输出丢失的 Stage(Stage 内部计算出错)给 TaskScheduler。
- 将 TaskSet 传给任务调度器:
- spark-cluster TaskScheduler
- yarn-cluster YarnClusterScheduler
- yarn-client YarnClientClusterScheduler
TaskScheduler
- 为每一个 TaskSet 构建一个TaskSetManager 实例来管理这个TaskSet 的生命周期。
- 根据数据本地性判断运行每个 Task 的最佳位置。
- 提交 TaskSet 到集群运行并监控。
- 推测执行,遇到计算缓慢任务需要放到其他节点上重试。
- 重新提交 Shuffle 输出丢失的 Stage 给 DAGScheduler。
RDD的缓存
Spark 速度快的原因之一就是可以在内存中持久化数据集。如果在应用程序中多次使用同一个 RDD,可以将该 RDD 缓存起来,之后对此 RDD 或衍生出的 RDD 进行操作时可以重用。与RDD相关的持久化和缓存,是 Spark 最重要的特征之一,也是 Spark 构建迭代式算法和快速交互式查询的关键。
注意:缓存不会斩断血缘关系。
使用RDD缓存
RDD通过 persist()
方法或 cache()
方法可以缓存前面的计算结果:
- cache:只能将数据缓存到内存中。内存缓存不下就不进行缓存,下次执行时直接执行没有缓存到内存中的 Task 即可。
- persist:可以指定缓存级别。
这两个方法并非在被调用时立即缓存,而要等触发后面的 Action 时才会将该 RDD 缓存在计算节点的内存中。
1 | val data = sc.textFile(“hdfs://node01:8020/input”) |
RDD的缓存级别
缓存级别的说明如下:
Storage Level | 说明 |
---|---|
MEMORY_ONLY | 使用未序列化的 Java 对象格式,将数据保存在内存中。如果内存不够存放所有的数据,则某些分区的数据就不会进行持久化。那么下次对这个 RDD 执行算子操作时,那些没有被持久化的数据,需要从源头处重新计算一遍。这是默认的持久化策略,使用 cache() 方法时,实际就是使用的这种持久化策略。 |
MEMORY_ONLY_SER | 基本含义同 MEMORY_ONLY 。唯一的区别是,会将 RDD 中的数据进行序列化,RDD 的每个 partition 会被序列化成一个字节数组。这种方式更加节省内存,从而可以避免持久化的数据占用过多内存导致频繁 GC。 |
MYMORY_AND_DISK | 使用未序列化的 Java 对象格式,优先尝试将数据保存在内存中。如果内存不够存放所有的数据,会将数据写入磁盘文件中,下次对这个 RDD 执行算子时,持久化在磁盘文件中的数据会被读取出来使用。 |
MEMORY_AND_DISK_SER | 基本含义同 MEMORY_AND_DISK 。唯一的区别是,会将 RDD 中的数据进行序列化,RDD 的每个 partition 会被序列化成一个字节数组。这种方式更加节省内存,从而可以避免持久化的数据占用过多内存导致频繁 GC。 |
DISK_ONLY | 使用未序列化的Java对象格式,将数据全部写入磁盘文件中。 |
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. | 对于上述任意一种持久化策略,如果加上后缀 _2 ,代表的是将每个持久化的数据,都复制一份副本,并将副本保存到其他节点上。这种基于副本的持久化机制主要用于进行容错。假如某个节点挂掉,节点的内存或磁盘中的持久化数据丢失了,那么后续对 RDD 计算时还可以使用该数据在其他节点上的副本。如果没有副本的话,就只能将这些数据从源头处重新计算一遍了。 |
OFF_HEAP(试验性) | RDD 的数据序例化之后存储至 Tachyon 。相比于 MEMORY_ONLY_SER ,OFF_HEAP 能够减少垃圾回收开销、使得 Spark Executor 更“小”更“轻”的同时可以共享内存;而且数据存储于 Tachyon 中,Spark 集群节点故障并不会造成数据丢失,因此这种方式在“大”内存或多并发应用的场景下是很有吸引力的。需要注意的是,Tachyon 并不直接包含于 Spark 的体系之内,需要选择合适的版本进行部署;它的数据是以“块”为单位进行管理的,这些块可以根据一定的算法被丢弃,且不会被重建。 |
选择缓存级别
MEMORY_ONLY > MEMORY_ONLY_SER > MEMORY_AND_DISK_SER > MEMORY_AND_DISK
默认情况下,性能最高的当然是MEMORY_ONLY,但前提是你的内存必须足够足够大,可以绰绰有余地存放下整个RDD的所有数据。因为不进行序列化与反序列化操作,就避免了这部分的性能开销;对这个RDD的后续算子操作,都是基于纯内存中的数据的操作,不需要从磁盘文件中读取数据,性能也很高;而且不需要复制一份数据副本,并远程传送到其他节点上。但是这里必须要注意的是,在实际的生产环境中,恐怕能够直接用这种策略的场景还是有限的,如果RDD中数据比较多时(比如几十亿),直接用这种持久化级别,会导致JVM的OOM内存溢出异常。
如果使用MEMORY_ONLY级别时发生了内存溢出,那么建议尝试使用MEMORY_ONLY_SER级别。该级别会将RDD数据序列化后再保存在内存中,此时每个partition仅仅是一个字节数组而已,大大减少了对象数量,并降低了内存占用。这种级别比MEMORY_ONLY多出来的性能开销,主要就是序列化与反序列化的开销。但是后续算子可以基于纯内存进行操作,因此性能总体还是比较高的。此外,可能发生的问题同上,如果RDD中的数据量过多的话,还是可能会导致OOM内存溢出的异常。
如果纯内存的级别都无法使用,那么建议使用MEMORY_AND_DISK_SER策略,而不是MEMORY_AND_DISK策略。因为既然到了这一步,就说明RDD的数据量很大,内存无法完全放下。序列化后的数据比较少,可以节省内存和磁盘的空间开销。同时该策略会优先尽量尝试将数据缓存在内存中,内存缓存不下才会写入磁盘。
通常不建议使用DISK_ONLY和后缀为_2的级别:因为完全基于磁盘文件进行数据的读写,会导致性能急剧降低,有时还不如重新计算一次所有RDD。后缀为_2的级别,必须将所有数据都复制一份副本,并发送到其他节点上,数据复制以及网络传输会导致较大的性能开销,除非是要求作业的高可用性,否则不建议使用。
源码分析
查看 StorageLevel
类的源码,看一下 RDD 都有哪些缓存级别:
1 | class StorageLevel private( |
StorageLevel
类的主构造器包含了 5 个参数:
useDisk
:使用硬盘。useMemory
:使用内存。useOffHeap
:使用堆外内存。堆外内存意味着把内存对象分配在Java虚拟机的 Heap 以外的内存,这些内存直接受操作系统管理。这样做的结果就是能保持一个较小的堆,以减少垃圾回收器对应用的影响。deserialized
:反序列化,把字节流恢复为结构化对象。其逆过程是序列化(Serialization),把结构化对象转化为字节流。序列化方式存储对象可以节省磁盘或内存的空间。replication
:备份数(在多个节点上备份)。
根据这五个参数,理解 StorageLevel
的各种缓存级别就不难了:
1 | //RDD将存储在硬盘和内存中,使用序列化,并且在多个节点上备份2份 |
cache 方法和 persist 方法的联系
查看源码会发现 cache()
调用了 persist()
:
1 | def cache(): this.type = persist() |
RDD的检查点
- Spark 在生产环境下经常会面临 transformation 的 RDD 非常多或 transformation 的 RDD 本身计算特别复杂特别耗时,这时就要考虑对计算结果数据持久化保存。
- Spark 擅长多步骤迭代,也擅长基于 Job 的复用。如果能够对曾经计算产生的数据进行复用,可以极大的提升效率。
- 如果采用
persist
方法把数据缓存在内存中,虽然快速但易失;如果把数据缓存在磁盘上,则没有这个问题。
Checkpoint 的作用就是为了相对缓存而言更加可靠的持久化数据。Checkpoint 可以指定把数据以多复本的方式放在本地;在生产环境中可以把数据放在 HDFS 上,借助 HDFS 天然的高容错、高可靠的特征来实现可靠的持久化数据。
Checkpoint 会斩断之前的血缘关系,Checkpoint 后的 RDD 不知道它的父 RDD 了,从 Checkpoint 处直接拿到数据。对于长时间迭代型应用来说,迭代过程出错也不必通过非常长的血缘关系去重建分区数据了。
使用 Checkpoint 首先需要调用 sparkContext
的 setCheckpointDir()
方法,设置一个容错文件系统目录(如 HDFS),然后对RDD调用 checkpoint()
方法。之后在 RDD 所处的 Job 运行结束后,会启动一个单独的 Job 来将 Checkpoint 过的数据写入之前设置的文件系统。
1 | sc.setCheckpointDir("hdfs://node01:8020/ckpdir") |
- 存储位置不同:缓存的数据保存在 BlockManager 的内存或磁盘上;Checkpoint 的数据保存在 HDFS 上。
- 对血缘的影响不同:缓存的 RDD 的不会丢掉 Lineage,可以通过血缘关系重新计算;Checkpoint 执行完后,已经将当前计算结果安全保存在 HDFS,会斩断 Lineage。
- 生命周期不同:缓存的 RDD 会在程序结束后被清除;Checkpoint 保存的 RDD 在程序结束后依然存在于 HDFS。
数据读取与保存
文本文件
读取
1 | val input = sc.textFile(inputPath) |
注:路径支持通配符;如果传递目录,则将目录下的所有文件读取作为RDD。
将一个文本文件读取为 RDD 时,输入的每一行都会成为 RDD 的一个元素。也可以将多个文本文件一次性读取为一个 pair RDD, 其中 key 是文件名, value 是文件内容。
通过 wholeTextFiles()
对于大量的小文件读取效率比较高,大文件效果没有那么高。
保存
1 | result.saveAsTextFile(outputPath) |
注:路径作为输出目录,在目录下输出多个文件。
Demo
1 | scala> sc.textFile("./data/wordcount.txt") |
JSON文件
读取
如果JSON文件中每一行就是一条JSON记录,那么可以通过将JSON文件当做文本文件来读取,然后利用相关的JSON库对每一条数据进行JSON解析。如果JSON数据是跨行的,那么只能读入整个文件,然后对每个文件进行解析。
保存
在输出之前将结构化数据组成的 RDD 转为字符串 RDD,然后使用 Spark 的文本文件 API 写出去,本质还是以文本文件的形式保存。
Demo
1 | scala> import org.json4s._ |
CSV文件
读取
读取 CSV/TSV 数据和读取 JSON 数据相似,都需要先把文件当作普通文本文件来读取数据,然后通过将每一行进行解析实现对CSV的读取。
保存
CSV/TSV 数据的输出也是需要将结构化 RDD 通过相关的库转换成字符串 RDD,然后使用 Spark 的文本文件 API 写出去。
SequenceFile文件
SequenceFile 文件是 Hadoop 用来存储二进制形式的 key-value 对而设计的一种文件格式。
读取
Spark 有专门用来读取 SequenceFile 的接口。在 SparkContext 中调用 sequenceFile[keyClass, valueClass](path)
。
保存
调用 saveAsSequenceFile(path)
保存 PairRDD。需要 key 和 value 能够自动转为 Writable 类型。
Demo
1 | scala> val data=sc.parallelize(List((2,"aa"),(3,"bb"),(4,"cc"),(5,"dd"),(6,"ee"))) |
Object文件
对象文件是将对象序列化后保存的文件,采用 Java 的序列化机制。
读取
调用 objectFile[k,v](path)
读取对象文件,返回对应的 RDD。因为要反序列化所以要指定类型。
保存
调用saveAsObjectFile()
保存对象文件。
Demo
1 | scala> val data=sc.parallelize(List((2,"aa"),(3,"bb"),(4,"cc"),(5,"dd"),(6,"ee"))) |
Hadoop
Spark 的整个生态系统与 Hadoop 完全兼容,对于 Hadoop 所支持的文件类型或者数据库类型,Spark 也同样提供支持。由于 Hadoop 的 API 有新旧两个版本,所以 Spark 也提供了两套接口。
读取
对于读取 Hadoop 存储而言,hadoopRDD
和 newHadoopRDD
是最为抽象的两个函数接口,主要包含以下四个参数:
- 输入格式:指定数据输入的类型,如
TextInputFormat
等。 - 新版:
org.apache.hadoop.mapred.InputFormat
。 - 旧版:
org.apache.hadoop.mapreduce.InputFormat(NewInputFormat)
。 - 键类型:指定[K,V]键值对中K的类型。
- 值类型:指定[K,V]键值对中V的类型。
- 分区值:指定由外部存储生成的 RDD 的 Partition 数量的最小值,如果没有指定,系统会使用默认值
defaultMinSplits
。
其他创建操作的API接口都是为了方便最终的Spark程序开发者而设置的,是这两个接口的高效实现版本。例如,对于 textFile 而言,只需指定 path
这个文件路径的参数,其他参数在系统内部有默认值。
兼容旧版HadoopAPI的读取操作
文件路径 | 输入格式 | 键类型 | 值类型 | 分区值 | |
---|---|---|---|---|---|
textFile(path:String,minPartitions:Int=defaultMinPartitions) | path | TextInputFormat | LongWritable | Text | minSplits |
hadoopFile[K,V,F <: InputFormat[K,V]] (path:String,minPartitions:Int)(implicit km:ClassTag[K],vm:ClassTag[V],fm:ClassTag[F]):RDD[(K,V)] | path | F | K | V | minSplits |
hadoopFile[K,V,F <: [K,V]] (path:String)(implicit km:ClassTag[K],vm:ClassTag[V],fm:ClassTag[F]):RDD[(K,V)] | path | F | K | V | DefaultMinSplits |
hadoopFile[K,V] (path:String,inputFormatClass:Class[_ <: InputFormat[K,V]], keyClass:Class[K],valueClass:Class[V],minPartitions:Int=defaultMinPartitions):RDD[(K,V)] | path | inputFormatClass | keyClass | valueClass | defaultMinPartitions |
hadoopRDD[K,V] (conf:JobConf,inputFormatClass:Class[_ <: InputFormat[K, V]], keyClass:Class[K],valueClass:Class[V],minPartitions:Int=defaultMinPartitions):RDD[(K, V)] | n/a | inputFormatClass | keyClass | valueClass | defaultMinPartitions |
sequenceFile[K,V] (path:String,minPartitions:Int=defaultMinPartitions)(implicit km:ClassTag[K],vm:ClassTag[V],kcf:()⇒WritableConverter[K], vcf:()⇒WritableConverter[V]):RDD[(K,V)] | path | SequenceFileInputFormat[K,V] | K | V | defaultMinPartitions |
objectFile[T] (path:String,minPartitions:Int=defaultMinPartitions)(implicit arg0:ClassTag[T]):RDD[T] | path | SequenceFileInputFormat[NullWritable, BytesWritable] | NullWritable | BytesWritable | minSplits |
兼容新版HadoopAPI的读取操作
文件路径 | 输入格式 | 键类型 | 值类型 | 分区值 | |
---|---|---|---|---|---|
newAPIHadoopFile[K,V,F <: InputFormat[K,V]] (path:String,fClass:Class[F],kClass:Class[K],vClass:Class[V],conf:Configuration=hadoopConfiguration):RDD[(K,V)] | path | F | K | V | n/a |
newAPIHadoopFile[K,V,F <: InputFormat[K,V]] (path:String)(implicit km:ClassTag[K],vm:ClassTag[V],fm:ClassTag[F]):RDD[(K,V)] | path | F | K | V | n/a |
newAPIHadoopRDD[K,V,F <: InputFormat[K,V]] (conf:Configuration=hadoopConfiguration,fClass:Class[F],kClass:Class[K],vClass:Class[V]):RDD[(K,V)] | n/a | F | K | V | n/a |
保存
将 RDD 保存到 HDFS 中,通常需要设置以下五个参数:
- 输出路径:指定数据输出的路径。
- 键类型:指定[K,V]键值对中K的类型。
- 值类型:指定[K,V]键值对中V的类型。
- 输出格式:RDD 的输出格式,如
TextOutputFormat
、SequenceFileOutputFormat
。 - codec:数据存储的压缩格式,如
DefaultCodec
、Gzip
等。
兼容旧版HadoopAPI的保存操作
前六个 API 都是 saveAsHadoopDataset
的简易实现,仅支持将 RDD 存储到 HDFS;saveAsHadoopDataset
的参数类型是 JobConf
,不仅能够将 RDD 存储到 HDFS 中,也可以将 RDD 存储到其他数据库中,如 HBase、MangoDB 等。
1 | saveAsObjectFile(path:String): Unit |
兼容新版HadoopAPI的保存操作
前两个 API 是 saveAsNewAPIHadoopDataset
的简易实现,仅支持将 RDD 存储到 HDFS;saveAsNewAPIHadoopDataset
可以将 RDD 输出到任何 Hadoop 支持的存储系统。新版 API 没有 codec 的参数,所以要压缩文件需要使用 hadoopConfiguration
参数,设置对应 mapreduce.map.output.compress.codec
参数和 mapreduce.map.output.compress
参数。
1 | saveAsNewAPIHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], conf: Configuration = self.context.hadoopConfiguration): Unit |
Demo
读取示例:
1 | scala> import org.apache.hadoop.io._ |
写入示例:
1 | scala> val read = sc.newAPIHadoopFile[LongWritable, Text, org.apache.hadoop.mapreduce.lib.input.TextInputFormat]("hdfs://node01:8020/output4/part*", classOf[org.apache.hadoop.mapreduce.lib.input.TextInputFormat], classOf[LongWritable], classOf[Text]) |
数据库
MySQL
支持通过 JDBC 访问关系型数据库。需要通过 JdbcRDD
进行数据读取和写入。
读取
1 | def main(args: Array[String]) { |
写入
1 | def main(args: Array[String]) { |
Cassandra数据库和ElasticSearch集成
HBase
由于 org.apache.hadoop.hbase.mapreduce.TableInputFormat
类的实现,Spark 可以通过 Hadoop 输入格式访问 HBase。这个输入格式会返回键值对数据,其中键的类型为 org. apache.hadoop.hbase.io.ImmutableBytesWritable
,而值的类型为 org.apache.hadoop.hbase.client.Result
。
导入依赖
1 | <repositories> |
读取
1 | def main(args: Array[String]) { |
写入
1 | def main(args: Array[String]) { |
程序示例
访问日志分析
需求
根据访问日志 access.log
,统计 PV、UV 和访问次数 Top10 的IP。日志数据格式如下:
1 | 194.230.140.21 - - [18/Sep/2017:06:49:18 +0000] "GET /content/uploads/2017/07/active.html HTTP/1.1" 304 0 "-" "Mozilla/4.0 (compatible;)" |
导入依赖
1 | <properties> |
统计PV
PV(Page View),即页面点击量,用户每次对网站中的每个网页访问均被记录1次PV。用户对同一页面的多次访问,访问量累计。因此日志每条记录可以视为一次PV,统计日志记录的数量便可得到PV。
1 | object PV { |
统计UV
UV(Unique Visitor),即浏览这个网页的访客人数。每个用户对网站的多次访问只被记录1次UV,一般用IP地址标识用户。因此需要对每条日志记录的内容进行分析,得到每条记录的IP,并根据IP去重,得到UV。
1 | object UV { |
统计Top10
1 | object Top10 { |
访客地理位置分析
需求
通过城市热力图可以直观地展示访客所在的地理位置,需要使用 Spark 根据 IP 地址计算出对应的经纬度。
访问日志:
1 | 194.230.140.21 - - [18/Sep/2017:06:49:18 +0000] "GET /content/uploads/2017/07/active.html HTTP/1.1" 304 0 "-" "Mozilla/4.0 (compatible;)" |
IP段对应的经纬度:
1 | 114.218.107.0|114.218.107.255|1926916864|1926917119|亚洲|中国|江苏|苏州|昆山|电信|320583|China|CN|120.98074|31.38464 |
广播变量
根据 ip-city.txt
可以得到IP地址库数据集,该数据集会被其他数据集使用,适合作为广播变量。
广播变量可以用来高效分发较大的对象,由 Driver 端定义,在 Executor 端使用。Task 想要使用广播变量中的数据时,会从当前 Executor 节点对应的 BlockManager 中尝试获取变量副本,如果没有就从 Driver 端远程拉取并保存在 BlockManager。广播变量是被 Executor 节点上的所有 Task 共用的,不用每个 Task 携带一份,节省了通信的成本和服务器的资源。
广播变量使用方法如下:
- 对象 T 调用 SparkContext.broadcast 创建出一个 Broadcast[T] 对象。
- 通过 value 属性获得该对象的值。
- 变量只会被发到各个节点一次,应作为只读值处理。
1 | scala> val broadcastVar = sc.broadcast(Array(1, 2, 3)) |
代码实现
1 | object IPtoLocation { |
参考资料
Spark 2.2.x 中文文档 - RDD 编程指南
Understand RDD Operations: Transformations and Actions 作者:Khoa Nguyen
Spark学习之路 (三)Spark之RDD 作者:扎心了,老铁
coalesce 与 repartition的区别 作者:姜小嫌
Spark性能优化指南——基础篇 作者:李雪蕤(美团技术团队)
Spark性能优化指南——高级篇 作者:李雪蕤(美团技术团队)