2022-02-24-Spark-44(性能调优通用调优)

2023-06-30

本文主要是 2022-02-24-Spark-44(性能调优通用调优) 相关的知识问答,如果你也了解,请帮忙补充。

参考知识1 计算负载主要由 Executors 承担,Driver 主要负责分布式调度,调优空间有限,因此对 Driver 端的配置项我们不作考虑

通过如下参数就可以明确有多少 CPU 资源被划拨给 Spark 用于分布式计算。
spark.cores.max 集群
spark.executor.cores Executor
spark.task.cpus 计算任务

并行度
spark.default.parallelism 并行度
spark.sql.shuffle.partitions 用于明确指定数据关联或聚合操作中 Reduce 端的分区数量。

在平衡 Execution memory 与 Storage memory 的时候,如果 RDD 缓存是刚需,我们就把 spark.memory.storageFraction 调大,并且在应用中优先把缓存灌满,再把计算逻辑应用在缓存数据之上。除此之外,我们还可以同时调整 spark.rdd.compress 和 spark.memory.storageFraction 来缓和 Full GC 的冲击

spark.local.dir 这个配置项,这个参数允许开发者设置磁盘目录,该目录用于存储 RDD cache 落盘数据块和 Shuffle 中间文件。如果你的经费比较充裕,有条件在计算节点中配备足量的 SSD 存储,甚至是更多的内存资源,完全可以把 SSD 上的文件系统目录,或是内存文件系统添加到 spark.local.dir 配置项中去,从而提供更好的 I/O 性能。

Configuration - Spark 3.2.1

自 1.6 版本之后,Spark 统一采用 Sort shuffle manager 来管理 Shuffle 操作,在 Sort shuffle manager 的管理机制下,无论计算结果本身是否需要排序,Shuffle 计算过程在 Map 阶段和 Reduce 阶段都会引入排序操作。
在不需要聚合,也不需要排序的计算场景中,我们就可以通过设置 spark.shuffle.sort.bypassMergeThreshold 的参数,来改变 Reduce 端的并行度(默认值是 200)。当 Reduce 端的分区数小于这个设置值的时候,我们就能避免 Shuffle 在计算过程引入排序。

Spark SQL 下面的配置项还是蛮多的,其中对执行性能贡献最大的,当属 AQE(Adaptive query execution,自适应查询引擎)引入的那 3 个特性了,也就是自动分区合并、自动数据倾斜处理和 Join 策略调整。

AQE 事先并不判断哪些分区足够小,而是按照分区编号进行扫描,当扫描量超过“目标尺寸”时,就合并一次,那么,“目标尺寸”由什么来决定的呢?Spark 提供了两个配置项来共同决定分区合并的“目标尺寸”,分区合并的目标尺寸取 advisoryPartitionSizeInBytes 与 partitionSize (每个分区的平均大小)之间的最小值。
我们来举个例子。假设,Shuffle 过后数据大小为 20GB,minPartitionNum 设置为 200,反推过来,每个分区的尺寸就是 20GB / 200 = 100MB。再假设,advisoryPartitionSizeInBytes 设置为 200MB,最终的目标分区尺寸就是取(100MB,200MB)之间的最小值,也就是 100MB。因此你看,并不是你指定了 advisoryPartitionSizeInBytes 是多少

首先,分区尺寸必须要大于 spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes 参数的设定值,才有可能被判定为倾斜分区。然后,AQE 统计所有数据分区大小并排序,取中位数作为放大基数,尺寸大于中位数一定倍数的分区会被判定为倾斜分区,中位数的放大倍数也是由参数 spark.sql.adaptive.skewJoin.skewedPartitionFactor(默认值是 5 倍) 控制。

实际上指的是,把会引入 Shuffle 的 Join 方式,如 Hash Join、Sort Merge Join,“降级”(Demote)为 Broadcast Join。
在 Spark 发布 AQE 之前,开发者可以利用 spark.sql.autoBroadcastJoinThreshold 配置项对数据关联操作进行主动降级。这个参数的默认值是 10MB,参与 Join 的两张表中只要有一张数据表的尺寸小于 10MB

不过,autoBroadcastJoinThreshold 这个参数虽然好用,但是有两个让人头疼的短板。一是可靠性较差。尽管开发者明确设置了广播阈值,而且小表数据量在阈值以内,但 Spark 对小表尺寸的误判时有发生,导致 Broadcast Join 降级失败。二来,预先设置广播阈值是一种静态的优化机制,它没有办法在运行时动态对数据关联进行降级调整。

AQE 很好地解决了这两个头疼的问题。首先,AQE 的 Join 策略调整是一种动态优化机制,对于刚才的两张大表,AQE 会在数据表完成过滤操作之后动态计算剩余数据量,当数据量满足广播条件时,AQE 会重新调整逻辑执行计划,在新的逻辑计划中把 Shuffle Joins 降级为 Broadcast Join。再者,运行时的数据量估算要比编译时准确得多,因此 AQE 的动态 Join 策略调整相比静态优化会更可靠、更稳定。

每个 Map Task 生成的数据文件,都包含所有 Reduce Task 所需的部分数据。因此,任何一个 Reduce Task 要想完成计算,必须先从所有 Map Task 的中间文件里去拉取属于自己的那部分数据。索引文件正是用于帮助判定哪部分数据属于哪个 Reduce Task。Reduce Task 通过网络拉取中间文件的过程,实际上就是不同 Stages 之间数据分发的过程。
显然,Shuffle 中数据分发的网络开销,会随着 Map Task 与 Reduce Task 的线性增长,呈指数级爆炸。

Shuffle Joins
第一步就是对参与关联的左右表分别进行 Shuffle,Shuffle 的分区规则是先对 Join keys 计算哈希值,再把哈希值对分区数取模。Shuffle 完成之后,第二步就是在同一个 Executors 内,Reduce task 就可以对 userID 一致的记录进行关联操作。

Broadcast Join

使用广播阈值配置项让 Spark 优先选择 Broadcast Joins 的关键,就是要确保至少有一张表的存储尺寸小于广播阈值(数据表在磁盘上的存储大小,同一份数据在内存中的存储大小往往会比磁盘中的存储大小膨胀数倍)

Spark 将内存分成了 Execution Memory 和 Storage Memory 两类,分别用于分布式任务执行和 RDD 缓存。其中,RDD 缓存虽然最终占用的是 Storage Memory,但在 RDD 展开(Unroll)之前,计算任务消耗的还是 Execution Memory。因此,Spark 中 CPU 与内存的平衡,其实就是 CPU 与执行内存之间的协同与配比。

并行度指的是为了实现分布式计算,分布式数据集被划分出来的份数。并行度明确了数据划分的粒度:并行度越高,数据的粒度越细,数据分片越多,数据越分散。并行度可以通过两个参数来设置,分别是 spark.default.parallelism 和 spark.sql.shuffle.partitions。前者用于设置 RDD 的默认并行度,后者在 Spark SQL 开发框架下,指定了 Shuffle Reduce 阶段默认的并行度。并发度呢?Executor 的线程池大小由参数 spark.executor.cores 决定,每个任务在执行期间需要消耗的线程数由 spark.task.cpus 配置项给定。两者相除得到的商就是并发度,也就是同一时间内,一个 Executor 内部可以同时运行的最大任务数量。又因为,spark.task.cpus 默认数值为 1,并且通常不需要调整,所以,并发度基本由 spark.executor.cores 参数敲定。就 Executor 的线程池来说,尽管线程本身可以复用,但每个线程在同一时间只能计算一个任务,每个任务负责处理一个数据分片。因此,在运行时,线程、任务与分区是一一对应的关系。

对于 User Memory 内存区域来说,使用 空间去重复存储同样的数据,本身就是降低了内存的利用率

对于存储级别来说,实际开发中最常用到的有两个,MEMORY_ONLY 和 MEMORY_AND_DISK,它们分别是 RDD 缓存和 DataFrame 缓存的默认存储级别。对于缓存计算来说,它分为 3 个步骤,第一步是 Unroll,把 RDD 数据分片的 Iterator 物化为对象值,第二步是 Transfer,把对象值封装为 MemoryEntry,第三步是把 BlockId、MemoryEntry 价值对注册到 LinkedHashMap 数据结构。另外,当数据缓存需求远大于 Storage Memory 区域的空间供给时,Spark 利用 LinkedHashMap 数据结构提供的特性,会遵循 LRU 和兔子不吃窝边草这两个基本原则来清除内存空间:LRU:按照元素的访问顺序,优先清除那些“最近最少访问”的 BlockId、MemoryEntry 键值对兔子不吃窝边草:在清除的过程中,同属一个 RDD 的 MemoryEntry 拥有“赦免权”

PROCESS_LOCAL:任务与数据同在一个 JVM 进程中
NODE_LOCAL:任务与数据同在一个计算节点,数据可能在磁盘上或是另一个 JVM 进程中
RACK_LOCAL:任务与数据不在同一节点,但在同一个物理机架上
ANY:任务与数据是跨机架、甚至是跨 DC(Data Center,数据中心)的关系访问数据源是否会引入网络开销,取决于任务与数据的本地性关系,也就是任务的本地性级别

Shuffle 作为大多数计算场景的“性能瓶颈担当”,确实是网络开销的罪魁祸首。根据“能省则省”的开发原则,我们自然要想尽办法去避免 Shuffle。

在数据通过网络分发之前,我们可以利用 Kryo Serializer 序列化器,提升序列化字节的存储效率,从而有效降低在网络中分发的数据量,整体上减少网络开销。
相似知识
Flink性能调优(一) 参考知识1Flink是依赖内存计算,计算过程中内存不够对Flink的执行效率影响很大。可以通过监控GC(GarbageCollection),评估内存使用及剩余情况来判断内存是否变成性能瓶颈,并根据情
JVM性能调优-G1 参考知识1本篇是对Java官网G1收集器调优的精简版。针对G1垃圾的收集阶段可能出现的问题,非合理内存分配,大对象占用,FullGC等问题作出解决方式和操作参数。G1是一个吞吐量和时间延迟之间相互平衡
JVM性能调优指南(一) 参考知识1-help-server-client-version-showversion-cp-classpath调整为完全解释执行编译模式:调整为编译执行编译模式:最后一行的mixedmode表明J
2022-02-26-Spark-46(性能调优SparkUI) 参考知识1ExecutorsTab的主要内容如下,主要包含“Summary”和“Executors”两部分。这两部分所记录的度量指标是一致的,其中“Executors”以更细的粒度记录着每一个Exec
tomcat 性能调优 参考知识1java性能优化原则:代码运算性能内存回收应用配置(影响java程序注意原因是垃圾回收)代码层优化:避免过多的循环嵌套调用和复杂逻辑Tomcat调优主要内容1.增加最大连接数2.调整工作模式
jvm性能调优都做了啥 JVM是最好的软件工程之一,它为Java提供了坚实的基础,许多流行语言如Kotlin、Scala、Clojure、Groovy都使用JVM作为运行基础。一个专业的Java工程师必须要了解并掌握JVM,
北大青鸟设计培训:简单的Java性能调优技巧? 参考知识1  大多数JAVA开发人员理所当然地以为性能优化很复杂,需要大量的经验和知识。好吧,不能说这是完全错误的。  优化应用程序以获得最佳性能不是一件容易的
Spark性能调优篇七之JVM相关参数调整 参考知识1        由于Spark程序是运行在JVM基础之上的,所以我们这一篇来讨论一下关于JVM的一些优化操作。