Spark-理论笔记-执行内存管理
执行内存主要用来存储任务在执行 Shuffle 时占用的内存,Shuffle 是按照一定规则对 RDD 数据重新分区的过程, Shuffle 的 Write 和 Read 两阶段对执行内存的使用.
执行内存主要用来存储任务在执行 Shuffle 时占用的内存,Shuffle 是按照一定规则对 RDD 数据重新分区的过程, Shuffle 的 Write 和 Read 两阶段对执行内存的使用.
1. Shuffle
Shuffle Write
在 map 端会采用 ExternalSorter 进行外排,在内存中存储数据时主要占用堆内执行空间。
Shuffle Read
在 ExternalSorter 和 Aggregator 中, Spark 会使用一种叫 AppendOnlyMap 的哈希表在堆内执行内存中存储数据 但在 Shuffle 过程中所有数据并不能都保存到该哈希表中, 当这个哈希表占用的内存会进行周期性地采样估算,当其大到一定程度, 无法再从 MemoryManager 申请到新的执行内存时, Spark 就会将其全部内容存储到磁盘文件中,这个过程被称为溢存 [Spill] , 溢存到磁盘的文件最后会被归 并 [Merge]
- 在对 reduce端的数据进行聚合时, 要将数据交给 Aggregator处理, 在内存中存储数据时占用堆内执行空间。
- 如果需要进行最终结果排序,则要将再次将数据交给 ExternalSorter处理,占用堆内执行空间
2. Shuffle 内存使用
在使用 Spark 进行计算时,经常会碰到作业 Out Of Memory(OOM)
的情况,而且很大一部分情况是发生在 Shuffle 阶段。
2.1. OOM
内存不够,数据太多就会抛出 OOM 的 Exeception,主要有 driver OOM 和 executor OOM 两种
1 | java.lang.OutOfMemoryError: Java heap space |
2.1.1.driver OOM
用户在 driver 端口生成大对象, 比如创建了一个大的集合数据结构
使用了collect 等操作,将所有 executor 的数据聚合到 driver 导致
一般是使用了collect 等操作,将所有 executor 的数据聚合到 driver 导致。尽量不要使用 collect操作即可。
2.1.2. executor OOM
数据倾斜导致内存溢出
数据不平衡除了有可能导致内存溢出外,也有可能导致性能的问题,调用 repartition 重新分区
Reduce OOM
reduce task 去 map 端获取数据,reduce一边拉取数据一边聚合,reduce 端有一块聚合内存[executor memory * 0.2], 也就是这块内存不够
增加 reduce 聚合操作的内存的比例
增加 Executor memory 的大小 –executor-memory 5G
减少 reduce task 每次拉取的数据量 设置 spak.reducer.maxSizeInFlight 24m, 拉取的次数就多了,因此建立连接的次数增多,有可能会连接不上[正好赶上 map task 端进行GC]
map 过程产生大量对象导致内存溢出
这种溢出的原因是在单个 map 中产生了大量的对象导致的
例如:rdd.map(x=>for(i <- 1 to 10000) yield i.toString),这个操作在rdd中,每个对象都产生了10000 个对象,这肯定很容易产生内存溢出的问题。
针对这种问题,在不增加内存的情况下,可以通过减少每个 Task 的大小,以便达到每个 Task 即使产生大量的对象 Executor 的内存也能够装得下。具体做法可以在会产生大量对象的 map 操作之前调用 repartition方法,分区成更小的块传入map。
例如:rdd.repartition(10000).map(x=>for(i <- 1 to 10000) yield i.toString)
shuffle 后内存溢出
shuffle 后单个文件过大导致内存溢出。
在 Spark 中,join,reduceByKey 这一类型的过程,都会有 shuffle 的过程,在 shuffle 的使用,需要传入一个 partitioner,大部分 Spark 中的 shuffle 操作,默认的 partitioner 都是 HashPatitioner
spark.default.parallelism 参数只对 HashPartitioner 有效,所以如果是别的 Partitioner 或者自己实现的 Partitioner 就不能使用 spark.default.parallelism 这个参数来控制 shuffle 的并发量了。如果是别的partitioner 导致的 shuffle 内存溢出,就需要从 partitioner 的代码增加 partitions 的数量
coalesce 调用导致内存溢出
因为 hdfs 中不适合存小文件,所以 Spark 计算后如果产生的文件太小,调用 coalesce 合并文件再存入 hdfs中。但会导致一个问题,例如在 coalesce 之前有100个文件,这也意味着能够有 100 个 Task,现在调用 coalesce(10),最后只产生10个文件,因为 coalesce 并不是 shuffle 操作,这意味着 coalesce 并不是先执行100个 Task,再将 Task 的执行结果合并成10个,而是从头到位只有10个 Task 在执行,原本100个文件是分开执行的,现在每个 Task 同时一次读取10个文件,使用的内存是原来的10倍,这导致了OOM。
解决这个问题的方法是令程序按先执行100个 Task 再将结果合并成10个文件,这个问题同样可以通过repartition 解决,调用 repartition(10)
standalone 模式下资源分配不均匀导致内存溢出
在 standalone 的模式下如果配置了
--total-executor-cores
和--executor-memory
这两个参数,但是没有配置--executor-cores
参数,有可能导致,每个 Executor 的 memory 是一样的,但是 cores 的数量不同,那么在 cores 数量多的 Executor 中,由于能够同时执行多个Task,就容易导致内存溢出的情况。这种情况的解决方法就是同时配置
--executor-cores
或者spark.executor.cores
参数,确保 Executor 资源分配均匀
3. 总结
Spark 的存储内存和执行内存有着截然不同的管理方式
对于存储内存来说,Spark 用一个 **LinkedHashMap **来集中管理所有的 Block,Block 由需要缓存的 RDD 的 Partition 转化而成;
对于执行内存,Spark 用 AppendOnlyMap 来存储 Shuffle 过程中的数据, 在 Tungsten 排序中甚至抽象成为页式内存管理,开辟了全新的 JVM 内存管理机制