MemoryManager 统一管理 Execution 内存和 Storage 内存。
Execution 内存包括 shuffles, joins, sorts and aggregations。
Storage 内存包括 caching 和传播内部的数据结构,如 broadcast 的对象等。
在 UnifiedMemoryManager 中,Execution 和 Storage 内存可以共享堆内存,这两部分可以占用的总内存为 (the total heap space - 300MB) * spark.memory.fraction
(默认 0.6)。 Storage 内存默认占其中的 0.5,即大约堆内存 * 0.6 * 0.5。
当 Execution 内存占用的内存很少时,Storage 内存可以占用 Execution 内存。当 Execution 线程请求内存时,缓存的数据块要从内存中清除来释放空间。
类似的,Execution 可以向 Storage 内存借用空间,但是当 Storage 空间不足时,执行内存不会清理内存。也就是说当执行内存占用了大量空间是,cache 数据块可能失败。
当 Storage 内存占用比较多时,如果 Execution 内存空间不足,则最多从 Storage 内存释放它多占的空间,Storage 会释放到总内存的50%。
Execution 内存和 Storage 内存又分为 onHeap 和 offHeap,所以有以下4种 memory pool。文章来源:https://www.toymoban.com/news/detail-637382.html
protected val onHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.ON_HEAP)
protected val offHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.OFF_HEAP)
protected val onHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.ON_HEAP)
protected val offHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.OFF_HEAP)
Storage 可以使用的最大内存是动态变化的,如下示例:文章来源地址https://www.toymoban.com/news/detail-637382.html
override def maxOnHeapStorageMemory: Long = synchronized {
maxHeapMemory - onHeapExecutionMemoryPool.memoryUsed
}
override def maxOffHeapStorageMemory: Long = synchronized {
maxOffHeapMemory - offHeapExecutionMemoryPool.memoryUsed
}
- pageSizeBytes
val minPageSize = 1L * 1024 * 1024 // 1MB
val maxPageSize = 64L * minPageSize // 64MB
pageSizeBytes 为对应的执行内存/cores/16,并在以上两个范围之间。
- setMemoryStore
仅Storage memory pool 有 memory store.
final def setMemoryStore(store: MemoryStore): Unit = synchronized {
onHeapStorageMemoryPool.setMemoryStore(store)
offHeapStorageMemoryPool.setMemoryStore(store)
}
到了这里,关于Spark 3.2 MemoryManager源代码分析的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!