Shuffle 操作
牵扯到跨节点的网络传输以及 IO 操作,是复杂且昂贵的操作,所以后续对它的优化是重中之重
In Spark, data is generally not distributed across partitions to be in the necessary place for a specific operation. During computations,
a single task will operate on a single partition - thus, to organize all the data for a single `reduceByKey` reduce task to execute,
Spark needs to perform an all-to-all operation. It must read from all partitions to find all the values for all keys, and then bring
together values across partitions to compute the final result for each key - this is called the shuffle
概括来说某个算子需要其他分区的结果集的时候,将其他分区的结果传递汇总的过程就称之为 Shuffle.所以说 sort,distinct,reduce,group,aggregate 等都会产生 Shuffle 操作。
窄/宽依赖
窄依赖:父 RDD 中,每个分区内的数据,都只会被子 RDD 中特定的分区所消费,父子分区消费关系为 1 对 1
宽依赖:父 RDD 的每个分区都可能被多个子 RDD 分区所消费,父子分区消费关系为 1 对 N
宽依赖和窄依赖如下图所示:
相对于宽依赖,窄依赖对优化很有优势,主要有以下几点:
1)窄依赖不会产生 Shuffle 操作,所以不会像宽依赖那样有昂贵的 IO 操作以及网络传输。
2)RDD 分区丢失的时候,窄依赖只要计算对应的子分区对应的父分区即可,而宽依赖的子分区的数据可能来源于多个父分区,会产生额外的冗余计算,极端情况下,可能全部父分区都要重新计算。
常用的窄依赖算子:
map,mapToPair,mapPartitions,filter,union,flatMap,flatMapToPair,mapValues,flatMapValues,join(父 RDD 是 hash-partitioned)
常用的宽依赖算子:
sort,distinct,reduce,group,aggregate,partitionBy,join(父 RDD 不是 hash-partitioned)
分区策略
为了保证数据的均匀分布,spark 有 2 种分区策略,一种是 hash 分区,一种是范围分区.
1)hash 分区(HashPartitioner),spark 的默认分区策略。
// 部分代码
public int getPartition(Object key) {
int var3;
if (key == null) {
var3 = 0;
} else {
var3 = .MODULE$.nonNegativeMod(key.hashCode(), this.numPartitions());
}
return var3;
}
//scala代码
def nonNegativeMod(x: Int, mod: Int): Int = { val rawMod = x % mod rawMod + (if (rawMod < 0) mod else 0) }
取 key 的 hashCode,然后对分区个数取模,取模后的值就是数据将要进入的分区。如果该值小于 0,则该值再加上分区个数。
2)范围分区(RangePartitioner)
这个分区的主要逻辑:
2-1 抽样,先重整个 RDD 中抽取出样本数据,将样本数据排序(默认升序),计算出每个分区的最大 key 值,形成一个 array[key]类型的数组变量 rangeBounds
2-2 确定边界,判断 key 在 rangeBounds 中所处的范围,给出该 key 值在下一个 RDD 中的分区 id 下标
public int getPartition(Object key) {
Object k = key;
int partition = 0;
if (.MODULE$.array_length(this.org$apache$spark$RangePartitioner$rangeBounds()) <= 128) {
while(partition < .MODULE$.array_length(this.org$apache$spark$RangePartitioner$rangeBounds()) && this.org$apache$spark$RangePartitioner$ordering().gt(k, .MODULE$.array_apply(this.org$apache$spark$RangePartitioner$rangeBounds(), partition))) {
++partition;
}
} else {
partition = BoxesRunTime.unboxToInt(this.org$apache$spark$RangePartitioner$binarySearch().apply(this.org$apache$spark$RangePartitioner$rangeBounds(), key));
if (partition < 0) {
partition = -partition - 1;
}
if (partition > .MODULE$.array_length(this.org$apache$spark$RangePartitioner$rangeBounds())) {
partition = .MODULE$.array_length(this.org$apache$spark$RangePartitioner$rangeBounds());
}
}
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于