UnifiedMemoryManager

UnifiedMemoryManager is the default MemoryManager with onHeapStorageMemory being ??? and onHeapExecutionMemory being ???

Calculate Maximum Memory to Use — getMaxMemory Method

getMaxMemory(conf: SparkConf): Long

getMaxMemory calculates the maximum memory to use for execution and storage.

// local mode with --conf spark.driver.memory=2g
scala> sc.getConf.getSizeAsBytes("spark.driver.memory")
res0: Long = 2147483648

scala> val systemMemory = Runtime.getRuntime.maxMemory

// fixed amount of memory for non-storage, non-execution purposes
val reservedMemory = 300 * 1024 * 1024

// minimum system memory required
val minSystemMemory = (reservedMemory * 1.5).ceil.toLong

val usableMemory = systemMemory - reservedMemory

val memoryFraction = sc.getConf.getDouble("spark.memory.fraction", 0.6)
scala> val maxMemory = (usableMemory * memoryFraction).toLong
maxMemory: Long = 956615884

import org.apache.spark.network.util.JavaUtils
scala> JavaUtils.byteStringAsMb(maxMemory + "b")
res1: Long = 912

getMaxMemory reads the maximum amount of memory that the Java virtual machine will attempt to use and decrements it by reserved system memory (for non-storage and non-execution purposes).

getMaxMemory makes sure that the following requirements are met:

  1. System memory is not smaller than about 1,5 of the reserved system memory.

  2. spark.executor.memory is not smaller than about 1,5 of the reserved system memory.

Ultimately, getMaxMemory returns spark.memory.fraction of the maximum amount of memory for the JVM (minus the reserved system memory).

Caution
FIXME omnigraffle it.

Creating UnifiedMemoryManager Instance

class UnifiedMemoryManager(
  conf: SparkConf,
  val maxHeapMemory: Long,
  onHeapStorageRegionSize: Long,
  numCores: Int)

UnifiedMemoryManager requires a SparkConf and the following values:

  • maxHeapMemory — the maximum on-heap memory to manage. It is assumed that onHeapExecutionMemoryPool with onHeapStorageMemoryPool is exactly maxHeapMemory.

  • onHeapStorageRegionSize

  • numCores

UnifiedMemoryManager makes sure that the sum of offHeapExecutionMemoryPool and offHeapStorageMemoryPool pool sizes is exactly maxOffHeapMemory.

Caution
FIXME Describe the pools

apply Factory Method

apply(conf: SparkConf, numCores: Int): UnifiedMemoryManager

Internally, apply calculates the maximum memory to use (given conf). It then creates a UnifiedMemoryManager with the following values:

  1. maxHeapMemory being the maximum memory just calculated.

  2. onHeapStorageRegionSize being spark.memory.storageFraction of maximum memory.

  3. numCores as configured.

Note
apply is used when SparkEnv is created.

acquireStorageMemory Method

acquireStorageMemory(
  blockId: BlockId,
  numBytes: Long,
  memoryMode: MemoryMode): Boolean

acquireStorageMemory has two modes of operation per memoryMode, i.e. MemoryMode.ON_HEAP or MemoryMode.OFF_HEAP, for execution and storage pools, and the maximum amount of memory to use.

Caution
FIXME Where are they used?
Note
acquireStorageMemory is a part of the MemoryManager Contract.

In MemoryMode.ON_HEAP, onHeapExecutionMemoryPool, onHeapStorageMemoryPool, and maxOnHeapStorageMemory are used.

In MemoryMode.OFF_HEAP, offHeapExecutionMemoryPool, offHeapStorageMemoryPool, and maxOffHeapMemory are used.

Caution
FIXME What is the difference between them?

It makes sure that the requested number of bytes numBytes (for a block to store) fits the available memory. If it is not the case, you should see the following INFO message in the logs and the method returns false.

INFO Will not store [blockId] as the required space ([numBytes] bytes) exceeds our memory limit ([maxMemory] bytes)

If the requested number of bytes numBytes is greater than memoryFree in the storage pool, acquireStorageMemory will attempt to use the free memory from the execution pool.

Note
The storage pool can use the free memory from the execution pool.

It will take as much memory as required to fit numBytes from memoryFree in the execution pool (up to the whole free memory in the pool).

Ultimately, acquireStorageMemory requests the storage pool for numBytes for blockId.

Note

acquireStorageMemory is used when MemoryStore acquires storage memory to putBytes or putIteratorAsValues and putIteratorAsBytes.

It is also used internally when UnifiedMemoryManager acquires unroll memory.

acquireUnrollMemory Method

Note
acquireUnrollMemory is a part of the MemoryManager Contract.

acquireUnrollMemory simply forwards all the calls to acquireStorageMemory.

acquireExecutionMemory Method

acquireExecutionMemory(
  numBytes: Long,
  taskAttemptId: Long,
  memoryMode: MemoryMode): Long

acquireExecutionMemory does…​FIXME

Internally, acquireExecutionMemory varies per MemoryMode, i.e. ON_HEAP and OFF_HEAP.

Table 1. acquireExecutionMemory and MemoryMode
ON_HEAP OFF_HEAP

executionPool

onHeapExecutionMemoryPool

offHeapExecutionMemoryPool

storagePool

onHeapStorageMemoryPool

offHeapStorageMemoryPool

storageRegionSize

onHeapStorageRegionSize <1>

offHeapStorageMemory

maxMemory

maxHeapMemory <2>

maxOffHeapMemory

Note
acquireExecutionMemory is a part of the MemoryManager Contract.
Caution
FIXME

maxOnHeapStorageMemory Method

maxOnHeapStorageMemory: Long

maxOnHeapStorageMemory is the difference between maxHeapMemory of the UnifiedMemoryManager and the memory currently in use in onHeapExecutionMemoryPool execution memory pool.

Note
maxOnHeapStorageMemory is a part of the MemoryManager Contract.

Settings

Table 2. Spark Properties
Spark Property Default Value Description

spark.memory.fraction

0.6

Fraction of JVM heap space used for execution and storage.

spark.memory.storageFraction

0.5

spark.testing.memory

Java’s Runtime.getRuntime.maxMemory

System memory

spark.testing.reservedMemory

300M or 0 (with spark.testing enabled)

results matching ""

    No results matching ""