统一内存管理和静态内存管理的实现都位于core模块的memory包(org/apache/spark/memory)路径下面。
Spark1.6.0版本之前提供的是静态内存管理,实现类StaticMemoryManager;Spark1.6.0版本及之后默认的是统一内存管理,实现类是UnifiedMemoryManager,也向前兼容静态内存管理。
注意:Spark 3.0及之后版本,只支持统一内存管理,遗弃了静态内存管理。
Spark是使用静态内存关了还是统一内存管理,是在初始化SparkContext的时候,创建Spark执行环境时指定的:
SparkEnv对象create()函数对应实现:
private def create( conf: SparkConf, executorId: String, bindAddress: String, advertiseAddress: String, port: Option[Int], isLocal: Boolean, numUsableCores: Int, ioEncryptionKey: Option[Array[Byte]], listenerBus: LiveListenerBus = null, mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = { ... val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false) val memoryManager: MemoryManager = if (useLegacyMemoryManager) { new StaticMemoryManager(conf, numUsableCores) } else { UnifiedMemoryManager(conf, numUsableCores) } ... }设置参数"spark.memory.useLegacyMode"为true,便可以使用静态内存管理,默认为false。
不管是静态内存管理还是统一内存管理,Spark内存都是被划分为执行内存区域(Execution Memory)和存储内存区域(Storage Memory)。执行内存主要用在shuffle、join、sort和aggregation的计算,而存储内存用来缓存和传输Spark应用程序中使用的数据。
Spark最初采用的是静态内存管理机制,存储内存(Storage Memory)和执行内存(Execution Memory)都是固定的,只能在Spark应用程序启动的时候分别通过’spark.storage.memoryFraction’和’spark.shuffle.memoryFraction’两个配置参数进行指定。
Storage区域内存划分的源码如下:
/** * Storage区域可用的总内存 */ private def getMaxStorageMemory(conf: SparkConf): Long = { //生产环境一般不会使用参数"spark.testing.memory", 那么返回的就是JVM最大内存. val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory) //分给spark内存缓存的内存比率 val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6) //安全系数 val safetyFraction = conf.getDouble("spark.storage.safetyFraction", 0.9) //最终可分给spark内存缓存的内存大小 (systemMaxMemory * memoryFraction * safetyFraction).toLong }Execution区域内存划分的源码如下:
/** * Execution区域可用的总内存 */ private def getMaxExecutionMemory(conf: SparkConf): Long = { //生产环境一般不会使用参数"spark.testing.memory", 那么返回的就是JVM最大内存. val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory) ... //分配给shuffle操作(aggregation、cogroups等)的内存比率 val memoryFraction = conf.getDouble("spark.shuffle.memoryFraction", 0.2) //内存安全系数 val safetyFraction = conf.getDouble("spark.shuffle.safetyFraction", 0.8) //分配给shuffle操作的内存大小 (systemMaxMemory * memoryFraction * safetyFraction).toLong }通过静态内存管理的源码,我们可以得到存储内存和执行内存的计算公式:
存储内存:systemMaxMemory * spark.storage.memoryFraction * spark.storage.safetyFraction执行内存:systemMaxMemory * spark.shuffle.memoryFraction * spark.shuffle.safetyFraction假设,Spark应用程序为每个Executor分配的JVM堆内存为10G(也就是,参数spark.executor.memory=10G),那么最终能分配给Storage区域的内存为10G * 0.6 * 0.9 = 5.4G,分配给Execution区域的内存大小为10G * 0.2 * 0.8 = 1.6G。
Spark统一内存管理模型简图:
Execution和Storage所共享区域的内存大小由参数’spark.memory.fraction’决定(默认是0.6,即(总的堆大小 - 300M) * 0.6)。*在这个共享区域中,Storage占用的内存大小由’spark.memory.storageFraction’决定(默认是0.5),也就是说,Storage区域默认为堆空间的0.6 * 0.5 = 0.3。默认情况下,Execution和Storage两个区域的内存大小是相等的。
与静态内存管理不同的是,在统一内存管理模型中,Execution区域和Storage区域间的边界线并不是固定死的,两者之间可以认为存在着一个可以滑动的边界线,可以动态地相互借用对方的内存。
在统一内存管理类UnifiedMemoryManager中,存储内存和执行内存的初始化没有分别由两个函数实现,而是一起放到了getMaxMemroy()方法中。存储内存和执行内存的共享区域内存划分源码如下:
//预留给系统的内存(300M). private val RESERVED_SYSTEM_MEMORY_BYTES = 300 * 1024 * 1024 /** * Execution区域和Storage区域总的可用内存. */ private def getMaxMemory(conf: SparkConf): Long = { //生产环境一般不会使用参数"spark.testing.memory", 那么返回的就是JVM最大内存. val systemMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory) //系统预留内存 val reservedMemory = conf.getLong("spark.testing.reservedMemory", if (conf.contains("spark.testing")) 0 else RESERVED_SYSTEM_MEMORY_BYTES) // 最小系统内存限制 val minSystemMemory = (reservedMemory * 1.5).ceil.toLong //系统内存systemMemory不能小于minSystemMemory if (systemMemory < minSystemMemory) { throw new IllegalArgumentException(s"System memory $systemMemory must " + s"be at least $minSystemMemory. Please increase heap size using the --driver-memory " + s"option or spark.driver.memory in Spark configuration.") } //参数"spark.executor.memory"指定的是JVM Heap内存, 不能小于minSystemMemory if (conf.contains("spark.executor.memory")) { val executorMemory = conf.getSizeAsBytes("spark.executor.memory") if (executorMemory < minSystemMemory) { throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " + s"$minSystemMemory. Please increase executor memory using the " + s"--executor-memory option or spark.executor.memory in Spark configuration.") } } //可用内存 val usableMemory = systemMemory - reservedMemory //Execution区域和Storage区域占usableMemory的比率 val memoryFraction = conf.getDouble("spark.memory.fraction", 0.6) // Execution区域和Storage区域可使用的总内存大小 (usableMemory * memoryFraction).toLong }默认情况下,Spark是不使用非堆内存的,我们可以通过参数"spark.memory.offHeap.enabled"=true启用非堆内存。另外还要设置"spark.memory.offHeap.size"(单位为bytes)来指定要使用的非堆内存的大小。
静态内存管理中执行内存支持使用非堆内存,存储内存不支持使用非堆内存;统一内存管理中存储内存和执行内存都使用非堆内存。
不管是静态内存管理还是统一内存管理,开启非堆内存时,所申请的非堆内存中,两者都是各占一半。唯一的区别就是,静态内存管理中存储内存和执行内存都是固定,而统一内存管理中存储内存和执行内存是可以相互借用对方内存的。
在静态内存管理(StaticMemoryManager)和统一内存管理(UnifiedMemoryManager)的源码中,我们除了acquireStorageMemory()和acquireExecutionMemory()这两个函数之外还有一个函数,就是acquireUnrollMemory(),那么什么是Unroll内存呢?又是用来做什么的呢?
在Spark的应用程序中,如果我们对RDD/Dataset进行了缓存,那么RDD/Dataset中每个partition会以block的形式缓存到存储内存中,同一个partition中的数据在内存中并不是连续的。那么,为了将partition由不连续的存储空间转换成连续的存储空间,就需要申请额外的内存空间,这些申请的内存空间就称为Unroll内存。
UnifiedMemoryManager类中acquireUnrollMemory()函数实现:
override def acquireUnrollMemory( blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean = synchronized { acquireStorageMemory(blockId, numBytes, memoryMode) }从acquireUnrollMemory()函数中可以看出,它就是在acquireStorageMemory()上进行了一层包装而已。所以,它申请的就是一块存储内存中的内存空间。
