log4j.logger.org.apache.spark.memory.TaskMemoryManager=TRACE
TaskMemoryManager
TaskMemoryManager
manages the memory allocated to an individual task.
TaskMemoryManager
assumes that:
-
The number of bits to address pages (aka
PAGE_NUMBER_BITS
) is13
-
The number of bits to encode offsets in data pages (aka
OFFSET_BITS
) is51
(i.e. 64 bits -PAGE_NUMBER_BITS
) -
The number of entries in the page table and allocated pages (aka
PAGE_TABLE_SIZE
) is8192
(i.e. 1 <<PAGE_NUMBER_BITS
) -
The maximum page size (aka
MAXIMUM_PAGE_SIZE_BYTES
) is15GB
(i.e.((1L << 31) - 1) * 8L
)
Name | Description |
---|---|
The array of size When allocating a |
|
Collection of flags ( TIP: When allocatePage is called, it will record the page in the registry by setting the bit at the specified index (that corresponds to the allocated page) to |
|
Set of MemoryConsumers |
|
The size of memory allocated but not used. |
Note
|
TaskMemoryManager is used to create a TaskContextImpl .
|
Tip
|
Enable Add the following line to Refer to Logging. |
Caution
|
FIXME How to trigger the messages in the logs? What to execute to have them printed out to the logs? |
cleanUpAllAllocatedMemory
Method
cleanUpAllAllocatedMemory
clears page table.
Caution
|
FIXME |
All recorded consumers are queried for the size of used memory. If the memory used is greater than 0, the following WARN message is printed out to the logs:
WARN TaskMemoryManager: leak [bytes] memory from [consumer]
The consumers
collection is then cleared.
MemoryManager.releaseExecutionMemory is executed to release the memory that is not used by any consumer.
Before cleanUpAllAllocatedMemory
returns, it calls MemoryManager.releaseAllExecutionMemoryForTask that in turn becomes the return value.
Caution
|
FIXME Image with the interactions to MemoryManager .
|
pageSizeBytes
Method
Caution
|
FIXME |
releaseExecutionMemory
Method
Caution
|
FIXME |
showMemoryUsage
Method
Caution
|
FIXME |
Creating TaskMemoryManager Instance
TaskMemoryManager(MemoryManager memoryManager, long taskAttemptId)
A single TaskMemoryManager
manages the memory of a single task (by the task’s taskAttemptId
).
Note
|
Although the constructor parameter taskAttemptId refers to a task’s attempt id it is really a taskId . It should be changed perhaps?
|
When called, the constructor uses the input MemoryManager to know whether it is in Tungsten memory mode (disabled by default) and saves the MemoryManager
and taskAttemptId
for later use.
It also initializes the internal consumers to be empty.
Note
|
When a TaskRunner starts running, it creates a new instance of TaskMemoryManager for the task by taskId . It then assigns the TaskMemoryManager to the individual task before it runs.
|
Acquiring Execution Memory — acquireExecutionMemory
Method
long acquireExecutionMemory(long required, MemoryConsumer consumer)
acquireExecutionMemory
allocates up to required
size of memory for consumer
. When no memory could be allocated, it calls spill
on every consumer, itself including. Finally, acquireExecutionMemory
returns the allocated memory.
Note
|
acquireExecutionMemory synchronizes on itself, and so no other calls on the object could be completed.
|
Note
|
MemoryConsumer knows its mode — on- or off-heap. |
acquireExecutionMemory
first calls memoryManager.acquireExecutionMemory(required, taskAttemptId, mode)
.
Tip
|
TaskMemoryManager is a mere wrapper of MemoryManager to track consumers?
|
Caution
|
FIXME |
When the memory obtained is less than requested (by required
), acquireExecutionMemory
requests all consumers to release memory (by spilling it to disk).
Note
|
acquireExecutionMemory requests memory from consumers that work in the same mode except the requesting one.
|
You may see the following DEBUG message when spill
released some memory:
DEBUG Task [taskAttemptId] released [bytes] from [consumer] for [consumer]
acquireExecutionMemory
calls memoryManager.acquireExecutionMemory(required, taskAttemptId, mode)
again (it called it at the beginning).
It does the memory acquisition until it gets enough memory or there are no more consumers to request spill
from.
You may also see the following ERROR message in the logs when there is an error while requesting spill
with OutOfMemoryError
followed.
ERROR error while calling spill() on [consumer]
If the earlier spill
on the consumers did not work out and there is still memory to be acquired, acquireExecutionMemory
requests the input consumer
to spill memory to disk (that in fact requested more memory!)
If the consumer
releases some memory, you should see the following DEBUG message in the logs:
DEBUG Task [taskAttemptId] released [bytes] from itself ([consumer])
acquireExecutionMemory
calls memoryManager.acquireExecutionMemory(required, taskAttemptId, mode)
once more.
Note
|
memoryManager.acquireExecutionMemory(required, taskAttemptId, mode) could have been called "three" times, i.e. at the very beginning, for each consumer, and on itself.
|
It records the consumer
in consumers registry.
You should see the following DEBUG message in the logs:
DEBUG Task [taskAttemptId] acquired [bytes] for [consumer]
Note
|
acquireExecutionMemory is called when a MemoryConsumer tries to acquires a memory and allocatePage.
|
Getting Page — getPage
Method
Caution
|
FIXME |
Getting Page Offset — getOffsetInPage
Method
Caution
|
FIXME |
Freeing Memory Page — freePage
Method
Caution
|
FIXME |
Allocating Memory Block for Tungsten Consumers — allocatePage
Method
MemoryBlock allocatePage(long size, MemoryConsumer consumer)
Note
|
It only handles Tungsten Consumers, i.e. MemoryConsumers in tungstenMemoryMode mode.
|
allocatePage
allocates a block of memory (aka page) smaller than MAXIMUM_PAGE_SIZE_BYTES
maximum size.
It checks size
against the internal MAXIMUM_PAGE_SIZE_BYTES
maximum size. If it is greater than the maximum size, the following IllegalArgumentException
is thrown:
Cannot allocate a page with more than [MAXIMUM_PAGE_SIZE_BYTES] bytes
It then acquires execution memory (for the input size
and consumer
).
It finishes by returning null
when no execution memory could be acquired.
With the execution memory acquired, it finds the smallest unallocated page index and records the page number (using allocatedPages registry).
If the index is PAGE_TABLE_SIZE
or higher, releaseExecutionMemory(acquired, consumer) is called and then the following IllegalStateException
is thrown:
Have already allocated a maximum of [PAGE_TABLE_SIZE] pages
It then attempts to allocate a MemoryBlock
from Tungsten MemoryAllocator
(calling memoryManager.tungstenMemoryAllocator().allocate(acquired)
).
Caution
|
FIXME What is MemoryAllocator ?
|
When successful, MemoryBlock
gets assigned pageNumber
and it gets added to the internal pageTable registry.
You should see the following TRACE message in the logs:
TRACE Allocate page number [pageNumber] ([acquired] bytes)
The page
is returned.
If a OutOfMemoryError
is thrown when allocating a MemoryBlock
page, the following WARN message is printed out to the logs:
WARN Failed to allocate a page ([acquired] bytes), try again.
And acquiredButNotUsed
gets acquired
memory space with the pageNumber
cleared in allocatedPages (i.e. the index for pageNumber
gets false
).
Caution
|
FIXME Why is the code tracking acquiredButNotUsed ?
|
Another allocatePage attempt is recursively tried.
Caution
|
FIXME Why is there a hope for being able to allocate a page? |