问小白 wenxiaobai
资讯
历史
科技
环境与自然
成长
游戏
财经
文学与艺术
美食
健康
家居
文化
情感
汽车
三农
军事
旅行
运动
教育
生活
星座命理

Spark性能调优:Spill内存溢出

创作时间:
作者:
@小白创作中心

Spark性能调优:Spill内存溢出

引用
CSDN
1.
https://blog.csdn.net/lssilu/article/details/145065474

1.优化案例效果

内存溢出导致任务超长运行时间达到1个多小时。优化后,运行时间从1小时缩短到30分钟左右。

2.问题定位

打开SparkUI定位到运行时长最长的stage62,运行了19min,虽然不是很长,但是shuffle和output的大小很大,而task数又较少。查看detail发现output和shuffle read的文件大小、溢出都很大,这样会导致task处理时间较长。同时task的并行度仅为300,因此可以考虑增大shuffle并发度,减少每个task处理的数据量。

将并发度增大到3000。定位到运行最长的stage14,运行时长为6.5min。查看detail可以看到基本上没有什么溢出,每个task的输出和shuffle read的大小也减少了很多,因此运行速度加快很多。而总体任务运行时长从1h11min减少到30min。

案例讲完了,补充一点内容,基础好的可以跳过。

3.知识延展

什么是内存溢出?

在Spark中,如果数据集太大,超出了处理数据的机器的内存,那么Spark就会把这部分数据先存到硬盘里,等内存有空了再从硬盘读回来。这个过程叫做溢出。这样做会很慢,因为硬盘读写速度远不如内存。

Spark内存分配和使用

了解这个结构有助于我们明白数据是如何在内存中处理的。假设我们有一个 Spark 集群,其中有一个Executor内存结构如图所示。

堆内存被分为四个部分:

  • 保留内存(Reserved Memory):大约300MB,Spark系统内部使用,确保系统运行所需的内存。
  • 用户内存(User Memory):用于存储用户定义的数据结构,如UDF,由Spark管理。
  • 存储内存(Storage Memory):用于缓存和广播处理任务的数据。
  • 执行内存(Execution Memory):用于Shuffle、Join、排序和聚合等操作。

我们通过一个例子来了解用户内存、存储内存和执行内存是如何分配的。假设Executor有10GB(10240MB)的内存,Spark 配置的默认值为:

spark.memory.fraction = 0.75
spark.memory.storageFraction = 0.5

计算过程如下:

预留内存 =300 MB
用户内存 =(10240 MB - 300 MB) * (1 - spark.memory.fraction) = 9940 MB * (1-0.75) = 2485 MB = 2.4 GB
存储内存 (10240 MB - 300 MB) * spark.memory.fraction * (1 - spark.memory.storageFraction) = 9940 MB * 0.75 * 0.5 = 3727 MB = 3.6 GB
执行内存 (10240 MB - 300 MB) * spark.memory.fraction * (1 - spark.memory.storageFraction) = 9940 MB * 0.75 * 0.5 = 3727 MB = 3.6 GB

从spark1.6开始,存储内存和执行内存的边界就不明显了,会根据任务的具体情况进行共享。这里要介绍一个“动态占用机制”

“动态占用机制”是指在Spark中,存储内存和执行内存是共享的,它们可以互相借用对方的内存空间。这意味着如果存储内存不够用,它可以借用执行内存的空间,反之亦然。这种机制允许Spark更灵活地管理内存资源,以适应不同任务的需求,减少内存溢出到磁盘的可能性。

溢出条件:当任务需要处理的数据总量超过了执行内存和存储内存的总和时,就会发生溢出。这是因为这些内存区域不足以容纳所有数据,所以超出部分的数据必须被写入到磁盘。

堆外内存:这是默认未启用的内存区域,可以通过设置参数来启用。

set spark.memory.offHeap.enabled = true

堆外内存是操作系统管理的,不受Java垃圾收集器的影响,因此可以减少垃圾收集的开销。

堆外内存的开销:尽管堆外内存可以减少垃圾收集的开销,但当Spark需要将数据从堆外内存移动到堆内内存时,会有一个额外的开销,即需要将数据从字节数组反序列化回Java对象。这个过程会增加处理时间,因此在决定是否使用堆外内存时需要权衡这个因素。简而言之,堆外内存可以提高性能,但也可能会因为数据移动和反序列化而增加额外的处理成本。

可能产生泄漏的动作

  1. 减少Shuffle分区
    通过设置spark.sql.shuffle.partitions为较小的值,会导致每个分区的文件更大,因为数据需要在更少的分区中重新分配。

  2. 错误的Shuffle分区管理
    如果Shuffle分区设置不当,可能会创建一个非常大的分区,导致内存溢出。

  3. 使用join或crossJoin:
    在两个大数据表之间进行连接操作可能会导致数据量急剧增加,尤其是crossJoin,因为它会产生两个表的笛卡尔积。

  4. 设置spark.sql.files.maxPartitionBytes为高值
    这个设置控制每个文件分区的最大字节数,如果设置得太高,可能会导致单个分区的数据量过大,超出内存限制。

  5. 对多列执行explode:
    explode函数会将数组中的每个元素转换为行,如果数组很大,会导致数据集的大小急剧增加。

  6. 数据倾斜:
    数据倾斜是指数据在不同分区之间分布不均匀,这可能导致某些分区的数据量特别大,从而超出内存容量。

这些操作都可能导致数据量增加,如果超出了内存容量,就会发生溢出,即数据被写入磁盘。

内存溢出解决方法

  1. 如果溢出是由于数据倾斜造成的,即某些分区的数据量远大于其他分区,那么首先应该解决这个问题。数据倾斜可以通过重新分配数据或优化查询逻辑来解决。

  2. 增加工作程序内存:为每个Executor分配更多的内存可以减少溢出的发生,因为这样可以增加每个Executor能够处理的数据量。

  3. 管理Shuffle分区:通过调整spark.sql.shuffle.partitions的值,可以控制Shuffle操作后分区的数量,从而减少每个分区的文件大小,避免单个分区数据过大导致溢出。

  4. 明确执行Repartitioning:使用.repartition()操作可以明确地重新分配数据到新的分区,这有助于平衡数据分布,减少单个分区的数据量。

  5. 设置文件最大分区字节:通过调整spark.sql.files.maxPartitionBytes的值,可以控制每个文件分区的最大字节数,这有助于控制单个分区的数据量,避免数据量过大导致溢出。

4.总结

掌握Spark性能调优,尤其是Spill溢出处理,对于优化大数据处理至关重要。通过解决数据倾斜、增加内存、合理设置Shuffle分区和文件分区大小,你可以显著提升Spark作业的效率,让你的Spark应用飞起来!

© 2023 北京元石科技有限公司 ◎ 京公网安备 11010802042949号