blob: aeabd358144f42fab1729176af10e0ed1b043abf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.memory;
import javax.annotation.concurrent.GuardedBy;
import java.io.IOException;
import java.nio.channels.ClosedByInterruptException;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import com.google.common.annotations.VisibleForTesting;
import org.apache.spark.internal.Logger;
import org.apache.spark.internal.LoggerFactory;
import org.apache.spark.internal.LogKeys;
import org.apache.spark.internal.MDC;
import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.util.Utils;
/**
* Manages the memory allocated by an individual task.
* <p>
* Most of the complexity in this class deals with encoding of off-heap addresses into 64-bit longs.
* In off-heap mode, memory can be directly addressed with 64-bit longs. In on-heap mode, memory is
* addressed by the combination of a base Object reference and a 64-bit offset within that object.
* This is a problem when we want to store pointers to data structures inside of other structures,
* such as record pointers inside hashmaps or sorting buffers. Even if we decided to use 128 bits
* to address memory, we can't just store the address of the base object since it's not guaranteed
* to remain stable as the heap gets reorganized due to GC.
* <p>
* Instead, we use the following approach to encode record pointers in 64-bit longs: for off-heap
* mode, just store the raw address, and for on-heap mode use the upper 13 bits of the address to
* store a "page number" and the lower 51 bits to store an offset within this page. These page
* numbers are used to index into a "page table" array inside of the MemoryManager in order to
* retrieve the base object.
* <p>
* This allows us to address 8192 pages. In on-heap mode, the maximum page size is limited by the
* maximum size of a long[] array, allowing us to address 8192 * (2^31 - 1) * 8 bytes, which is
* approximately 140 terabytes of memory.
*/
public class TaskMemoryManager {
private static final Logger logger = LoggerFactory.getLogger(TaskMemoryManager.class);
/** The number of bits used to address the page table. */
private static final int PAGE_NUMBER_BITS = 13;
/** The number of bits used to encode offsets in data pages. */
@VisibleForTesting
static final int OFFSET_BITS = 64 - PAGE_NUMBER_BITS; // 51
/** The number of entries in the page table. */
private static final int PAGE_TABLE_SIZE = 1 << PAGE_NUMBER_BITS;
/**
* Maximum supported data page size (in bytes). In principle, the maximum addressable page size is
* (1L &lt;&lt; OFFSET_BITS) bytes, which is 2+ petabytes. However, the on-heap allocator's
* maximum page size is limited by the maximum amount of data that can be stored in a long[]
* array, which is (2^31 - 1) * 8 bytes (or about 17 gigabytes). Therefore, we cap this at 17
* gigabytes.
*/
public static final long MAXIMUM_PAGE_SIZE_BYTES = ((1L << 31) - 1) * 8L;
/** Bit mask for the lower 51 bits of a long. */
private static final long MASK_LONG_LOWER_51_BITS = 0x7FFFFFFFFFFFFL;
/**
* Similar to an operating system's page table, this array maps page numbers into base object
* pointers, allowing us to translate between the hashtable's internal 64-bit address
* representation and the baseObject+offset representation which we use to support both on- and
* off-heap addresses. When using an off-heap allocator, every entry in this map will be `null`.
* When using an on-heap allocator, the entries in this map will point to pages' base objects.
* Entries are added to this map as new data pages are allocated.
*/
private final MemoryBlock[] pageTable = new MemoryBlock[PAGE_TABLE_SIZE];
/**
* Bitmap for tracking free pages.
*/
private final BitSet allocatedPages = new BitSet(PAGE_TABLE_SIZE);
private final MemoryManager memoryManager;
private final long taskAttemptId;
/**
* Tracks whether we're on-heap or off-heap. For off-heap, we short-circuit most of these methods
* without doing any masking or lookups. Since this branching should be well-predicted by the JIT,
* this extra layer of indirection / abstraction hopefully shouldn't be too expensive.
*/
final MemoryMode tungstenMemoryMode;
/**
* Tracks spillable memory consumers.
*/
@GuardedBy("this")
private final HashSet<MemoryConsumer> consumers;
/**
* The amount of memory that is acquired but not used.
*/
private volatile long acquiredButNotUsed = 0L;
/**
* Construct a new TaskMemoryManager.
*/
public TaskMemoryManager(MemoryManager memoryManager, long taskAttemptId) {
this.tungstenMemoryMode = memoryManager.tungstenMemoryMode();
this.memoryManager = memoryManager;
this.taskAttemptId = taskAttemptId;
this.consumers = new HashSet<>();
}
/**
* Acquire N bytes of memory for a consumer. If there is no enough memory, it will call
* spill() of consumers to release more memory.
*
* @return number of bytes successfully granted (<= N).
*/
public long acquireExecutionMemory(long required, MemoryConsumer requestingConsumer) {
assert(required >= 0);
assert(requestingConsumer != null);
MemoryMode mode = requestingConsumer.getMode();
// If we are allocating Tungsten pages off-heap and receive a request to allocate on-heap
// memory here, then it may not make sense to spill since that would only end up freeing
// off-heap memory. This is subject to change, though, so it may be risky to make this
// optimization now in case we forget to undo it late when making changes.
synchronized (this) {
long got = memoryManager.acquireExecutionMemory(required, taskAttemptId, mode);
// Try to release memory from other consumers first, then we can reduce the frequency of
// spilling, avoid to have too many spilled files.
if (got < required) {
if (logger.isDebugEnabled()) {
logger.debug("Task {} need to spill {} for {}", taskAttemptId,
Utils.bytesToString(required - got), requestingConsumer);
}
// We need to call spill() on consumers to free up more memory. We want to optimize for two
// things:
// * Minimize the number of spill calls, to reduce the number of spill files and avoid small
// spill files.
// * Avoid spilling more data than necessary - if we only need a little more memory, we may
// not want to spill as much data as possible. Many consumers spill more than the
// requested amount, so we can take that into account in our decisions.
// We use a heuristic that selects the smallest memory consumer with at least `required`
// bytes of memory in an attempt to balance these factors. It may work well if there are
// fewer larger requests, but can result in many small spills if there are many smaller
// requests.
// Build a map of consumer in order of memory usage to prioritize spilling. Assign current
// consumer (if present) a nominal memory usage of 0 so that it is always last in priority
// order. The map will include all consumers that have previously acquired memory.
TreeMap<Long, List<MemoryConsumer>> sortedConsumers = new TreeMap<>();
for (MemoryConsumer c: consumers) {
if (c.getUsed() > 0 && c.getMode() == mode) {
long key = c == requestingConsumer ? 0 : c.getUsed();
List<MemoryConsumer> list =
sortedConsumers.computeIfAbsent(key, k -> new ArrayList<>(1));
list.add(c);
}
}
// Iteratively spill consumers until we've freed enough memory or run out of consumers.
while (got < required && !sortedConsumers.isEmpty()) {
// Get the consumer using the least memory more than the remaining required memory.
Map.Entry<Long, List<MemoryConsumer>> currentEntry =
sortedConsumers.ceilingEntry(required - got);
// No consumer has enough memory on its own, start with spilling the biggest consumer.
if (currentEntry == null) {
currentEntry = sortedConsumers.lastEntry();
}
List<MemoryConsumer> cList = currentEntry.getValue();
got += trySpillAndAcquire(requestingConsumer, required - got, cList, cList.size() - 1);
if (cList.isEmpty()) {
sortedConsumers.remove(currentEntry.getKey());
}
}
}
consumers.add(requestingConsumer);
if (logger.isDebugEnabled()) {
logger.debug("Task {} acquired {} for {}", taskAttemptId, Utils.bytesToString(got),
requestingConsumer);
}
return got;
}
}
/**
* Try to acquire as much memory as possible from `cList[idx]`, up to `requested` bytes by
* spilling and then acquiring the freed memory. If no more memory can be spilled from
* `cList[idx]`, remove it from the list.
*
* @return number of bytes acquired (<= requested)
* @throws RuntimeException if task is interrupted
* @throws SparkOutOfMemoryError if an IOException occurs during spilling
*/
private long trySpillAndAcquire(
MemoryConsumer requestingConsumer,
long requested,
List<MemoryConsumer> cList,
int idx) {
MemoryMode mode = requestingConsumer.getMode();
MemoryConsumer consumerToSpill = cList.get(idx);
if (logger.isDebugEnabled()) {
logger.debug("Task {} try to spill {} from {} for {}", taskAttemptId,
Utils.bytesToString(requested), consumerToSpill, requestingConsumer);
}
try {
long released = consumerToSpill.spill(requested, requestingConsumer);
if (released > 0) {
if (logger.isDebugEnabled()) {
logger.debug("Task {} spilled {} of requested {} from {} for {}", taskAttemptId,
Utils.bytesToString(released), Utils.bytesToString(requested), consumerToSpill,
requestingConsumer);
}
// When our spill handler releases memory, `ExecutionMemoryPool#releaseMemory()` will
// immediately notify other tasks that memory has been freed, and they may acquire the
// newly-freed memory before we have a chance to do so (SPARK-35486). Therefore we may
// not be able to acquire all the memory that was just spilled. In that case, we will
// try again in the next loop iteration.
return memoryManager.acquireExecutionMemory(requested, taskAttemptId, mode);
} else {
cList.remove(idx);
return 0;
}
} catch (ClosedByInterruptException e) {
// This called by user to kill a task (e.g: speculative task).
logger.error("error while calling spill() on {}", e,
MDC.of(LogKeys.MEMORY_CONSUMER$.MODULE$, consumerToSpill));
throw new RuntimeException(e.getMessage());
} catch (IOException e) {
logger.error("error while calling spill() on {}", e,
MDC.of(LogKeys.MEMORY_CONSUMER$.MODULE$, consumerToSpill));
// checkstyle.off: RegexpSinglelineJava
throw new SparkOutOfMemoryError("error while calling spill() on " + consumerToSpill + " : "
+ e.getMessage());
// checkstyle.on: RegexpSinglelineJava
}
}
/**
* Release N bytes of execution memory for a MemoryConsumer.
*/
public void releaseExecutionMemory(long size, MemoryConsumer consumer) {
if (logger.isDebugEnabled()) {
logger.debug("Task {} release {} from {}", taskAttemptId, Utils.bytesToString(size),
consumer);
}
memoryManager.releaseExecutionMemory(size, taskAttemptId, consumer.getMode());
}
/**
* Dump the memory usage of all consumers.
*/
public void showMemoryUsage() {
logger.info("Memory used in task {}",
MDC.of(LogKeys.TASK_ATTEMPT_ID$.MODULE$, taskAttemptId));
synchronized (this) {
long memoryAccountedForByConsumers = 0;
for (MemoryConsumer c: consumers) {
long totalMemUsage = c.getUsed();
memoryAccountedForByConsumers += totalMemUsage;
if (totalMemUsage > 0) {
logger.info("Acquired by {}: {}",
MDC.of(LogKeys.MEMORY_CONSUMER$.MODULE$, c),
MDC.of(LogKeys.MEMORY_SIZE$.MODULE$, Utils.bytesToString(totalMemUsage)));
}
}
long memoryNotAccountedFor =
memoryManager.getExecutionMemoryUsageForTask(taskAttemptId) - memoryAccountedForByConsumers;
logger.info(
"{} bytes of memory were used by task {} but are not associated with specific consumers",
MDC.of(LogKeys.MEMORY_SIZE$.MODULE$, memoryNotAccountedFor),
MDC.of(LogKeys.TASK_ATTEMPT_ID$.MODULE$, taskAttemptId));
logger.info(
"{} bytes of memory are used for execution and {} bytes of memory are used for storage",
MDC.of(LogKeys.EXECUTION_MEMORY_SIZE$.MODULE$, memoryManager.executionMemoryUsed()),
MDC.of(LogKeys.STORAGE_MEMORY_SIZE$.MODULE$, memoryManager.storageMemoryUsed()));
}
}
/**
* Return the page size in bytes.
*/
public long pageSizeBytes() {
return memoryManager.pageSizeBytes();
}
/**
* Allocate a block of memory that will be tracked in the MemoryManager's page table; this is
* intended for allocating large blocks of Tungsten memory that will be shared between operators.
*
* Returns `null` if there was not enough memory to allocate the page. May return a page that
* contains fewer bytes than requested, so callers should verify the size of returned pages.
*
* @throws TooLargePageException
*/
public MemoryBlock allocatePage(long size, MemoryConsumer consumer) {
assert(consumer != null);
assert(consumer.getMode() == tungstenMemoryMode);
if (size > MAXIMUM_PAGE_SIZE_BYTES) {
throw new TooLargePageException(size);
}
long acquired = acquireExecutionMemory(size, consumer);
if (acquired <= 0) {
return null;
}
final int pageNumber;
synchronized (this) {
pageNumber = allocatedPages.nextClearBit(0);
if (pageNumber >= PAGE_TABLE_SIZE) {
releaseExecutionMemory(acquired, consumer);
throw new IllegalStateException(
"Have already allocated a maximum of " + PAGE_TABLE_SIZE + " pages");
}
allocatedPages.set(pageNumber);
}
MemoryBlock page = null;
try {
page = memoryManager.tungstenMemoryAllocator().allocate(acquired);
} catch (OutOfMemoryError e) {
logger.warn("Failed to allocate a page ({} bytes), try again.",
MDC.of(LogKeys.PAGE_SIZE$.MODULE$, acquired));
// there is no enough memory actually, it means the actual free memory is smaller than
// MemoryManager thought, we should keep the acquired memory.
synchronized (this) {
acquiredButNotUsed += acquired;
allocatedPages.clear(pageNumber);
}
// this could trigger spilling to free some pages.
return allocatePage(size, consumer);
}
page.pageNumber = pageNumber;
pageTable[pageNumber] = page;
if (logger.isTraceEnabled()) {
logger.trace("Allocate page number {} ({} bytes)", pageNumber, acquired);
}
return page;
}
/**
* Free a block of memory allocated via {@link TaskMemoryManager#allocatePage}.
*/
public void freePage(MemoryBlock page, MemoryConsumer consumer) {
assert (page.pageNumber != MemoryBlock.NO_PAGE_NUMBER) :
"Called freePage() on memory that wasn't allocated with allocatePage()";
assert (page.pageNumber != MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER) :
"Called freePage() on a memory block that has already been freed";
assert (page.pageNumber != MemoryBlock.FREED_IN_TMM_PAGE_NUMBER) :
"Called freePage() on a memory block that has already been freed";
assert(allocatedPages.get(page.pageNumber));
pageTable[page.pageNumber] = null;
synchronized (this) {
allocatedPages.clear(page.pageNumber);
}
if (logger.isTraceEnabled()) {
logger.trace("Freed page number {} ({} bytes)", page.pageNumber, page.size());
}
long pageSize = page.size();
// Clear the page number before passing the block to the MemoryAllocator's free().
// Doing this allows the MemoryAllocator to detect when a TaskMemoryManager-managed
// page has been inappropriately directly freed without calling TMM.freePage().
page.pageNumber = MemoryBlock.FREED_IN_TMM_PAGE_NUMBER;
memoryManager.tungstenMemoryAllocator().free(page);
releaseExecutionMemory(pageSize, consumer);
}
/**
* Given a memory page and offset within that page, encode this address into a 64-bit long.
* This address will remain valid as long as the corresponding page has not been freed.
*
* @param page a data page allocated by {@link TaskMemoryManager#allocatePage}/
* @param offsetInPage an offset in this page which incorporates the base offset. In other words,
* this should be the value that you would pass as the base offset into an
* UNSAFE call (e.g. page.baseOffset() + something).
* @return an encoded page address.
*/
public long encodePageNumberAndOffset(MemoryBlock page, long offsetInPage) {
if (tungstenMemoryMode == MemoryMode.OFF_HEAP) {
// In off-heap mode, an offset is an absolute address that may require a full 64 bits to
// encode. Due to our page size limitation, though, we can convert this into an offset that's
// relative to the page's base offset; this relative offset will fit in 51 bits.
offsetInPage -= page.getBaseOffset();
}
return encodePageNumberAndOffset(page.pageNumber, offsetInPage);
}
@VisibleForTesting
public static long encodePageNumberAndOffset(int pageNumber, long offsetInPage) {
assert (pageNumber >= 0) : "encodePageNumberAndOffset called with invalid page";
return (((long) pageNumber) << OFFSET_BITS) | (offsetInPage & MASK_LONG_LOWER_51_BITS);
}
@VisibleForTesting
public static int decodePageNumber(long pagePlusOffsetAddress) {
return (int) (pagePlusOffsetAddress >>> OFFSET_BITS);
}
private static long decodeOffset(long pagePlusOffsetAddress) {
return (pagePlusOffsetAddress & MASK_LONG_LOWER_51_BITS);
}
/**
* Get the page associated with an address encoded by
* {@link TaskMemoryManager#encodePageNumberAndOffset(MemoryBlock, long)}
*/
public Object getPage(long pagePlusOffsetAddress) {
if (tungstenMemoryMode == MemoryMode.ON_HEAP) {
final int pageNumber = decodePageNumber(pagePlusOffsetAddress);
assert (pageNumber >= 0 && pageNumber < PAGE_TABLE_SIZE);
final MemoryBlock page = pageTable[pageNumber];
assert (page != null);
assert (page.getBaseObject() != null);
return page.getBaseObject();
} else {
return null;
}
}
/**
* Get the offset associated with an address encoded by
* {@link TaskMemoryManager#encodePageNumberAndOffset(MemoryBlock, long)}
*/
public long getOffsetInPage(long pagePlusOffsetAddress) {
final long offsetInPage = decodeOffset(pagePlusOffsetAddress);
if (tungstenMemoryMode == MemoryMode.ON_HEAP) {
return offsetInPage;
} else {
// In off-heap mode, an offset is an absolute address. In encodePageNumberAndOffset, we
// converted the absolute address into a relative address. Here, we invert that operation:
final int pageNumber = decodePageNumber(pagePlusOffsetAddress);
assert (pageNumber >= 0 && pageNumber < PAGE_TABLE_SIZE);
final MemoryBlock page = pageTable[pageNumber];
assert (page != null);
return page.getBaseOffset() + offsetInPage;
}
}
/**
* Clean up all allocated memory and pages. Returns the number of bytes freed. A non-zero return
* value can be used to detect memory leaks.
*/
public long cleanUpAllAllocatedMemory() {
synchronized (this) {
for (MemoryConsumer c: consumers) {
if (c != null && c.getUsed() > 0) {
if (logger.isDebugEnabled()) {
// In case of failed task, it's normal to see leaked memory
logger.debug("unreleased {} memory from {}", Utils.bytesToString(c.getUsed()), c);
}
}
}
consumers.clear();
for (MemoryBlock page : pageTable) {
if (page != null) {
if (logger.isDebugEnabled()) {
logger.debug("unreleased page: {} in task {}", page, taskAttemptId);
}
page.pageNumber = MemoryBlock.FREED_IN_TMM_PAGE_NUMBER;
memoryManager.tungstenMemoryAllocator().free(page);
}
}
Arrays.fill(pageTable, null);
}
// release the memory that is not used by any consumer (acquired for pages in tungsten mode).
memoryManager.releaseExecutionMemory(acquiredButNotUsed, taskAttemptId, tungstenMemoryMode);
return memoryManager.releaseAllExecutionMemoryForTask(taskAttemptId);
}
/**
* Returns the memory consumption, in bytes, for the current task.
*/
public long getMemoryConsumptionForThisTask() {
return memoryManager.getExecutionMemoryUsageForTask(taskAttemptId);
}
/**
* Returns Tungsten memory mode
*/
public MemoryMode getTungstenMemoryMode() {
return tungstenMemoryMode;
}
}