blob: 3506e2a88864e376855117d30f974e257af9324d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.unsafe.map;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.io.Closeables;
import org.apache.spark.SparkEnv;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.internal.LogKeys;
import org.apache.spark.internal.Logger;
import org.apache.spark.internal.LoggerFactory;
import org.apache.spark.internal.MDC;
import org.apache.spark.memory.MemoryConsumer;
import org.apache.spark.memory.SparkOutOfMemoryError;
import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.serializer.SerializerManager;
import org.apache.spark.storage.BlockManager;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.UnsafeAlignedOffset;
import org.apache.spark.unsafe.array.ByteArrayMethods;
import org.apache.spark.unsafe.array.LongArray;
import org.apache.spark.unsafe.hash.Murmur3_x86_32;
import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader;
import org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillWriter;
/**
* An append-only hash map where keys and values are contiguous regions of bytes.
*
* This is backed by a power-of-2-sized hash table, using quadratic probing with triangular numbers,
* which is guaranteed to exhaust the space.
*
* The map can support up to 2^29 keys. If the key cardinality is higher than this, you should
* probably be using sorting instead of hashing for better cache locality.
*
* The key and values under the hood are stored together, in the following format:
* First uaoSize bytes: len(k) (key length in bytes) + len(v) (value length in bytes) + uaoSize
* Next uaoSize bytes: len(k)
* Next len(k) bytes: key data
* Next len(v) bytes: value data
* Last 8 bytes: pointer to next pair
*
* It means first uaoSize bytes store the entire record (key + value + uaoSize) length. This format
* is compatible with {@link org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter},
* so we can pass records from this map directly into the sorter to sort records in place.
*/
public final class BytesToBytesMap extends MemoryConsumer {
private static final Logger logger = LoggerFactory.getLogger(BytesToBytesMap.class);
private static final HashMapGrowthStrategy growthStrategy = HashMapGrowthStrategy.DOUBLING;
private final TaskMemoryManager taskMemoryManager;
/**
* A linked list for tracking all allocated data pages so that we can free all of our memory.
*/
private final LinkedList<MemoryBlock> dataPages = new LinkedList<>();
/**
* The data page that will be used to store keys and values for new hashtable entries. When this
* page becomes full, a new page will be allocated and this pointer will change to point to that
* new page.
*/
private MemoryBlock currentPage = null;
/**
* Offset into `currentPage` that points to the location where new data can be inserted into
* the page. This does not incorporate the page's base offset.
*/
private long pageCursor = 0;
/**
* The maximum number of keys that BytesToBytesMap supports. The hash table has to be
* power-of-2-sized and its backing Java array can contain at most (1 &lt;&lt; 30) elements,
* since that's the largest power-of-2 that's less than Integer.MAX_VALUE. We need two long array
* entries per key, giving us a maximum capacity of (1 &lt;&lt; 29).
*/
public static final int MAX_CAPACITY = (1 << 29);
// This choice of page table size and page size means that we can address up to 500 gigabytes
// of memory.
/**
* A single array to store the key and value.
*
* Position {@code 2 * i} in the array is used to track a pointer to the key at index {@code i},
* while position {@code 2 * i + 1} in the array holds key's full 32-bit hashcode.
*/
@Nullable private LongArray longArray;
// TODO: we're wasting 32 bits of space here; we can probably store fewer bits of the hashcode
// and exploit word-alignment to use fewer bits to hold the address. This might let us store
// only one long per map entry, increasing the chance that this array will fit in cache at the
// expense of maybe performing more lookups if we have hash collisions. Say that we stored only
// 27 bits of the hashcode and 37 bits of the address. 37 bits is enough to address 1 terabyte
// of RAM given word-alignment. If we use 13 bits of this for our page table, that gives us a
// maximum page size of 2^24 * 8 = ~134 megabytes per page. This change will require us to store
// full base addresses in the page table for off-heap mode so that we can reconstruct the full
// absolute memory addresses.
/**
* Whether or not the longArray can grow. We will not insert more elements if it's false.
*/
private boolean canGrowArray = true;
private final double loadFactor;
/**
* The size of the data pages that hold key and value data. Map entries cannot span multiple
* pages, so this limits the maximum entry size.
*/
private final long pageSizeBytes;
/**
* Number of keys defined in the map.
*/
private int numKeys;
/**
* Number of values defined in the map. A key could have multiple values.
*/
private int numValues;
/**
* The map will be expanded once the number of keys exceeds this threshold.
*/
private int growthThreshold;
/**
* Mask for truncating hashcodes so that they do not exceed the long array's size.
* This is a strength reduction optimization; we're essentially performing a modulus operation,
* but doing so with a bitmask because this is a power-of-2-sized hash map.
*/
private int mask;
/**
* Return value of {@link BytesToBytesMap#lookup(Object, long, int)}.
*/
private final Location loc;
private long numProbes = 0L;
private long numKeyLookups = 0L;
private long peakMemoryUsedBytes = 0L;
private final int initialCapacity;
private final BlockManager blockManager;
private final SerializerManager serializerManager;
private volatile MapIterator destructiveIterator = null;
private LinkedList<UnsafeSorterSpillWriter> spillWriters = new LinkedList<>();
public BytesToBytesMap(
TaskMemoryManager taskMemoryManager,
BlockManager blockManager,
SerializerManager serializerManager,
int initialCapacity,
double loadFactor,
long pageSizeBytes) {
super(taskMemoryManager, pageSizeBytes, taskMemoryManager.getTungstenMemoryMode());
this.taskMemoryManager = taskMemoryManager;
this.blockManager = blockManager;
this.serializerManager = serializerManager;
this.loadFactor = loadFactor;
this.loc = new Location();
this.pageSizeBytes = pageSizeBytes;
if (initialCapacity <= 0) {
throw new IllegalArgumentException("Initial capacity must be greater than 0");
}
if (initialCapacity > MAX_CAPACITY) {
throw new IllegalArgumentException(
"Initial capacity " + initialCapacity + " exceeds maximum capacity of " + MAX_CAPACITY);
}
if (pageSizeBytes > TaskMemoryManager.MAXIMUM_PAGE_SIZE_BYTES) {
throw new IllegalArgumentException("Page size " + pageSizeBytes + " cannot exceed " +
TaskMemoryManager.MAXIMUM_PAGE_SIZE_BYTES);
}
this.initialCapacity = initialCapacity;
allocate(initialCapacity);
}
public BytesToBytesMap(
TaskMemoryManager taskMemoryManager,
int initialCapacity,
long pageSizeBytes) {
this(
taskMemoryManager,
SparkEnv.get() != null ? SparkEnv.get().blockManager() : null,
SparkEnv.get() != null ? SparkEnv.get().serializerManager() : null,
initialCapacity,
// In order to re-use the longArray for sorting, the load factor cannot be larger than 0.5.
0.5,
pageSizeBytes);
}
/**
* Returns the number of keys defined in the map.
*/
public int numKeys() { return numKeys; }
/**
* Returns the number of values defined in the map. A key could have multiple values.
*/
public int numValues() { return numValues; }
public final class MapIterator implements Iterator<Location> {
private int numRecords;
private final Location loc;
private MemoryBlock currentPage = null;
private int recordsInPage = 0;
private Object pageBaseObject;
private long offsetInPage;
// If this iterator destructive or not. When it is true, it frees each page as it moves onto
// next one.
private boolean destructive = false;
private UnsafeSorterSpillReader reader = null;
private MapIterator(int numRecords, Location loc, boolean destructive) {
this.numRecords = numRecords;
this.loc = loc;
this.destructive = destructive;
if (destructive) {
destructiveIterator = this;
// longArray will not be used anymore if destructive is true, release it now.
if (longArray != null) {
freeArray(longArray);
longArray = null;
}
}
}
private void advanceToNextPage() {
// SPARK-26265: We will first lock this `MapIterator` and then `TaskMemoryManager` when going
// to free a memory page by calling `freePage`. At the same time, it is possibly that another
// memory consumer first locks `TaskMemoryManager` and then this `MapIterator` when it
// acquires memory and causes spilling on this `MapIterator`. To avoid deadlock here, we keep
// reference to the page to free and free it after releasing the lock of `MapIterator`.
MemoryBlock pageToFree = null;
try {
synchronized (this) {
int nextIdx = dataPages.indexOf(currentPage) + 1;
if (destructive && currentPage != null) {
dataPages.remove(currentPage);
pageToFree = currentPage;
nextIdx--;
}
if (dataPages.size() > nextIdx) {
currentPage = dataPages.get(nextIdx);
pageBaseObject = currentPage.getBaseObject();
offsetInPage = currentPage.getBaseOffset();
recordsInPage = UnsafeAlignedOffset.getSize(pageBaseObject, offsetInPage);
offsetInPage += UnsafeAlignedOffset.getUaoSize();
} else {
currentPage = null;
if (reader != null) {
handleFailedDelete();
}
try {
Closeables.close(reader, /* swallowIOException = */ false);
reader = spillWriters.getFirst().getReader(serializerManager);
recordsInPage = -1;
} catch (IOException e) {
// Scala iterator does not handle exception
Platform.throwException(e);
}
}
}
} finally {
if (pageToFree != null) {
freePage(pageToFree);
}
}
}
@Override
public boolean hasNext() {
if (numRecords == 0) {
if (reader != null) {
handleFailedDelete();
}
}
return numRecords > 0;
}
@Override
public Location next() {
if (recordsInPage == 0) {
advanceToNextPage();
}
numRecords--;
if (currentPage != null) {
int totalLength = UnsafeAlignedOffset.getSize(pageBaseObject, offsetInPage);
loc.with(currentPage, offsetInPage);
// [total size] [key size] [key] [value] [pointer to next]
offsetInPage += UnsafeAlignedOffset.getUaoSize() + totalLength + 8;
recordsInPage --;
return loc;
} else {
assert(reader != null);
if (!reader.hasNext()) {
advanceToNextPage();
}
try {
reader.loadNext();
} catch (IOException e) {
try {
reader.close();
} catch(IOException e2) {
logger.error("Error while closing spill reader", e2);
}
// Scala iterator does not handle exception
Platform.throwException(e);
}
loc.with(reader.getBaseObject(), reader.getBaseOffset(), reader.getRecordLength());
return loc;
}
}
public synchronized long spill(long numBytes) throws IOException {
if (!destructive || dataPages.size() == 1) {
return 0L;
}
updatePeakMemoryUsed();
// TODO: use existing ShuffleWriteMetrics
ShuffleWriteMetrics writeMetrics = new ShuffleWriteMetrics();
long released = 0L;
while (dataPages.size() > 0) {
MemoryBlock block = dataPages.getLast();
// The currentPage is used, cannot be released
if (block == currentPage) {
break;
}
Object base = block.getBaseObject();
long offset = block.getBaseOffset();
int numRecords = UnsafeAlignedOffset.getSize(base, offset);
int uaoSize = UnsafeAlignedOffset.getUaoSize();
offset += uaoSize;
final UnsafeSorterSpillWriter writer =
new UnsafeSorterSpillWriter(blockManager, 32 * 1024, writeMetrics, numRecords);
while (numRecords > 0) {
int length = UnsafeAlignedOffset.getSize(base, offset);
writer.write(base, offset + uaoSize, length, 0);
offset += uaoSize + length + 8;
numRecords--;
}
writer.close();
spillWriters.add(writer);
dataPages.removeLast();
released += block.size();
freePage(block);
if (released >= numBytes) {
break;
}
}
return released;
}
private void handleFailedDelete() {
if (spillWriters.size() > 0) {
// remove the spill file from disk
File file = spillWriters.removeFirst().getFile();
if (file != null && file.exists() && !file.delete()) {
logger.error("Was unable to delete spill file {}",
MDC.of(LogKeys.PATH$.MODULE$, file.getAbsolutePath()));
}
}
}
}
/**
* Returns an iterator for iterating over the entries of this map.
*
* For efficiency, all calls to `next()` will return the same {@link Location} object.
*
* The returned iterator is thread-safe. However if the map is modified while iterating over it,
* the behavior of the returned iterator is undefined.
*/
public MapIterator iterator() {
return new MapIterator(numValues, new Location(), false);
}
/**
* Returns a destructive iterator for iterating over the entries of this map. It frees each page
* as it moves onto next one. Notice: it is illegal to call any method on the map after
* `destructiveIterator()` has been called.
*
* For efficiency, all calls to `next()` will return the same {@link Location} object.
*
* The returned iterator is thread-safe. However if the map is modified while iterating over it,
* the behavior of the returned iterator is undefined.
*/
public MapIterator destructiveIterator() {
updatePeakMemoryUsed();
return new MapIterator(numValues, new Location(), true);
}
/**
* Iterator for the entries of this map. This is to first iterate over key indices in
* `longArray` then accessing values in `dataPages`. NOTE: this is different from `MapIterator`
* in the sense that key index is preserved here
* (See `UnsafeHashedRelation` for example of usage).
*/
public final class MapIteratorWithKeyIndex implements Iterator<Location> {
/**
* The index in `longArray` where the key is stored.
*/
private int keyIndex = 0;
private int numRecords;
private final Location loc;
private MapIteratorWithKeyIndex() {
this.numRecords = numValues;
this.loc = new Location();
}
@Override
public boolean hasNext() {
return numRecords > 0;
}
@Override
public Location next() {
if (!loc.isDefined() || !loc.nextValue()) {
while (longArray.get(keyIndex * 2) == 0) {
keyIndex++;
}
loc.with(keyIndex, 0, true);
keyIndex++;
}
numRecords--;
return loc;
}
}
/**
* Returns an iterator for iterating over the entries of this map,
* by first iterating over the key index inside hash map's `longArray`.
*
* For efficiency, all calls to `next()` will return the same {@link Location} object.
*
* The returned iterator is NOT thread-safe. If the map is modified while iterating over it,
* the behavior of the returned iterator is undefined.
*/
public MapIteratorWithKeyIndex iteratorWithKeyIndex() {
return new MapIteratorWithKeyIndex();
}
/**
* The maximum number of allowed keys index.
*
* The value of allowed keys index is in the range of [0, maxNumKeysIndex - 1].
*/
public int maxNumKeysIndex() {
return (int) (longArray.size() / 2);
}
/**
* Looks up a key, and return a {@link Location} handle that can be used to test existence
* and read/write values.
*
* This function always returns the same {@link Location} instance to avoid object allocation.
* This function is not thread-safe.
*/
public Location lookup(Object keyBase, long keyOffset, int keyLength) {
safeLookup(keyBase, keyOffset, keyLength, loc,
Murmur3_x86_32.hashUnsafeWords(keyBase, keyOffset, keyLength, 42));
return loc;
}
/**
* Looks up a key, and return a {@link Location} handle that can be used to test existence
* and read/write values.
*
* This function always returns the same {@link Location} instance to avoid object allocation.
* This function is not thread-safe.
*/
public Location lookup(Object keyBase, long keyOffset, int keyLength, int hash) {
safeLookup(keyBase, keyOffset, keyLength, loc, hash);
return loc;
}
/**
* Looks up a key, and saves the result in provided `loc`.
*
* This is a thread-safe version of `lookup`, could be used by multiple threads.
*/
public void safeLookup(Object keyBase, long keyOffset, int keyLength, Location loc, int hash) {
assert(longArray != null);
numKeyLookups++;
int pos = hash & mask;
int step = 1;
while (true) {
numProbes++;
if (longArray.get(pos * 2) == 0) {
// This is a new key.
loc.with(pos, hash, false);
return;
} else {
long stored = longArray.get(pos * 2 + 1);
if ((int) (stored) == hash) {
// Full hash code matches. Let's compare the keys for equality.
loc.with(pos, hash, true);
if (loc.getKeyLength() == keyLength) {
final boolean areEqual = ByteArrayMethods.arrayEquals(
keyBase,
keyOffset,
loc.getKeyBase(),
loc.getKeyOffset(),
keyLength
);
if (areEqual) {
return;
}
}
}
}
pos = (pos + step) & mask;
step++;
}
}
/**
* Handle returned by {@link BytesToBytesMap#lookup(Object, long, int)} function.
*/
public final class Location {
/** An index into the hash map's Long array */
private int pos;
/** True if this location points to a position where a key is defined, false otherwise */
private boolean isDefined;
/**
* The hashcode of the most recent key passed to
* {@link BytesToBytesMap#lookup(Object, long, int, int)}. Caching this hashcode here allows us
* to avoid re-hashing the key when storing a value for that key.
*/
private int keyHashcode;
private Object baseObject; // the base object for key and value
private long keyOffset;
private int keyLength;
private long valueOffset;
private int valueLength;
/**
* Memory page containing the record. Only set if created by {@link BytesToBytesMap#iterator()}.
*/
@Nullable private MemoryBlock memoryPage;
private void updateAddressesAndSizes(long fullKeyAddress) {
updateAddressesAndSizes(
taskMemoryManager.getPage(fullKeyAddress),
taskMemoryManager.getOffsetInPage(fullKeyAddress));
}
private void updateAddressesAndSizes(final Object base, long offset) {
baseObject = base;
final int totalLength = UnsafeAlignedOffset.getSize(base, offset);
int uaoSize = UnsafeAlignedOffset.getUaoSize();
offset += uaoSize;
keyLength = UnsafeAlignedOffset.getSize(base, offset);
offset += uaoSize;
keyOffset = offset;
valueOffset = offset + keyLength;
valueLength = totalLength - keyLength - uaoSize;
}
private Location with(int pos, int keyHashcode, boolean isDefined) {
assert(longArray != null);
this.pos = pos;
this.isDefined = isDefined;
this.keyHashcode = keyHashcode;
if (isDefined) {
final long fullKeyAddress = longArray.get(pos * 2);
updateAddressesAndSizes(fullKeyAddress);
}
return this;
}
private Location with(MemoryBlock page, long offsetInPage) {
this.isDefined = true;
this.memoryPage = page;
updateAddressesAndSizes(page.getBaseObject(), offsetInPage);
return this;
}
/**
* This is only used for spilling
*/
private Location with(Object base, long offset, int length) {
this.isDefined = true;
this.memoryPage = null;
baseObject = base;
int uaoSize = UnsafeAlignedOffset.getUaoSize();
keyOffset = offset + uaoSize;
keyLength = UnsafeAlignedOffset.getSize(base, offset);
valueOffset = offset + uaoSize + keyLength;
valueLength = length - uaoSize - keyLength;
return this;
}
/**
* Find the next pair that has the same key as current one.
*/
public boolean nextValue() {
assert isDefined;
long nextAddr = Platform.getLong(baseObject, valueOffset + valueLength);
if (nextAddr == 0) {
return false;
} else {
updateAddressesAndSizes(nextAddr);
return true;
}
}
/**
* Returns the memory page that contains the current record.
* This is only valid if this is returned by {@link BytesToBytesMap#iterator()}.
*/
public MemoryBlock getMemoryPage() {
return this.memoryPage;
}
/**
* Returns true if the key is defined at this position, and false otherwise.
*/
public boolean isDefined() {
return isDefined;
}
/**
* Returns index for key.
*/
public int getKeyIndex() {
assert (isDefined);
return pos;
}
/**
* Returns the base object for key.
*/
public Object getKeyBase() {
assert (isDefined);
return baseObject;
}
/**
* Returns the offset for key.
*/
public long getKeyOffset() {
assert (isDefined);
return keyOffset;
}
/**
* Returns the base object for value.
*/
public Object getValueBase() {
assert (isDefined);
return baseObject;
}
/**
* Returns the offset for value.
*/
public long getValueOffset() {
assert (isDefined);
return valueOffset;
}
/**
* Returns the length of the key defined at this position.
* Unspecified behavior if the key is not defined.
*/
public int getKeyLength() {
assert (isDefined);
return keyLength;
}
/**
* Returns the length of the value defined at this position.
* Unspecified behavior if the key is not defined.
*/
public int getValueLength() {
assert (isDefined);
return valueLength;
}
/**
* Append a new value for the key. This method could be called multiple times for a given key.
* The return value indicates whether the put succeeded or whether it failed because additional
* memory could not be acquired.
* <p>
* It is only valid to call this method immediately after calling `lookup()` using the same key.
* </p>
* <p>
* The key and value must be word-aligned (that is, their sizes must be a multiple of 8).
* </p>
* <p>
* After calling this method, calls to `get[Key|Value]Address()` and `get[Key|Value]Length`
* will return information on the data stored by this `append` call.
* </p>
* <p>
* As an example usage, here's the proper way to store a new key:
* </p>
* <pre>
* Location loc = map.lookup(keyBase, keyOffset, keyLength);
* if (!loc.isDefined()) {
* if (!loc.append(keyBase, keyOffset, keyLength, ...)) {
* // handle failure to grow map (by spilling, for example)
* }
* }
* </pre>
* <p>
* Unspecified behavior if the key is not defined.
* </p>
*
* @return true if the put() was successful and false if the put() failed because memory could
* not be acquired.
*/
public boolean append(Object kbase, long koff, int klen, Object vbase, long voff, int vlen) {
assert (klen % 8 == 0);
assert (vlen % 8 == 0);
assert (longArray != null);
// We should not increase number of keys to be MAX_CAPACITY. The usage pattern of this map is
// lookup + append. If we append key until the number of keys to be MAX_CAPACITY, next time
// the call of lookup will hang forever because it cannot find an empty slot.
if (numKeys == MAX_CAPACITY - 1
// The map could be reused from last spill (because of no enough memory to grow),
// then we don't try to grow again if hit the `growthThreshold`.
|| !canGrowArray && numKeys >= growthThreshold) {
return false;
}
// Here, we'll copy the data into our data pages. Because we only store a relative offset from
// the key address instead of storing the absolute address of the value, the key and value
// must be stored in the same memory page.
// (total length) (key length) (key) (value) (8 byte pointer to next value)
int uaoSize = UnsafeAlignedOffset.getUaoSize();
final long recordLength = (2L * uaoSize) + klen + vlen + 8;
if (currentPage == null || currentPage.size() - pageCursor < recordLength) {
if (!acquireNewPage(recordLength + uaoSize)) {
return false;
}
}
// --- Append the key and value data to the current data page --------------------------------
final Object base = currentPage.getBaseObject();
long offset = currentPage.getBaseOffset() + pageCursor;
final long recordOffset = offset;
UnsafeAlignedOffset.putSize(base, offset, klen + vlen + uaoSize);
UnsafeAlignedOffset.putSize(base, offset + uaoSize, klen);
offset += (2L * uaoSize);
Platform.copyMemory(kbase, koff, base, offset, klen);
offset += klen;
Platform.copyMemory(vbase, voff, base, offset, vlen);
offset += vlen;
// put this value at the beginning of the list
Platform.putLong(base, offset, isDefined ? longArray.get(pos * 2) : 0);
// --- Update bookkeeping data structures ----------------------------------------------------
offset = currentPage.getBaseOffset();
UnsafeAlignedOffset.putSize(base, offset, UnsafeAlignedOffset.getSize(base, offset) + 1);
pageCursor += recordLength;
final long storedKeyAddress = taskMemoryManager.encodePageNumberAndOffset(
currentPage, recordOffset);
longArray.set(pos * 2, storedKeyAddress);
updateAddressesAndSizes(storedKeyAddress);
numValues++;
if (!isDefined) {
numKeys++;
longArray.set(pos * 2 + 1, keyHashcode);
isDefined = true;
// If the map has reached its growth threshold, try to grow it.
if (numKeys >= growthThreshold) {
// We use two array entries per key, so the array size is twice the capacity.
// We should compare the current capacity of the array, instead of its size.
if (longArray.size() / 2 < MAX_CAPACITY) {
try {
growAndRehash();
} catch (SparkOutOfMemoryError oom) {
canGrowArray = false;
}
} else {
// The map is already at MAX_CAPACITY and cannot grow. Instead, we prevent it from
// accepting any more new elements to make sure we don't exceed the load factor. If we
// need to spill later, this allows UnsafeKVExternalSorter to reuse the array for
// sorting.
canGrowArray = false;
}
}
}
return true;
}
}
/**
* Acquire a new page from the memory manager.
* @return whether there is enough space to allocate the new page.
*/
private boolean acquireNewPage(long required) {
try {
currentPage = allocatePage(required);
} catch (SparkOutOfMemoryError e) {
return false;
}
dataPages.add(currentPage);
UnsafeAlignedOffset.putSize(currentPage.getBaseObject(), currentPage.getBaseOffset(), 0);
pageCursor = UnsafeAlignedOffset.getUaoSize();
return true;
}
@Override
public long spill(long size, MemoryConsumer trigger) throws IOException {
if (trigger != this && destructiveIterator != null) {
return destructiveIterator.spill(size);
}
return 0L;
}
/**
* Allocate new data structures for this map. When calling this outside of the constructor,
* make sure to keep references to the old data structures so that you can free them.
*
* @param capacity the new map capacity
*/
private void allocate(int capacity) {
assert (capacity >= 0);
capacity = Math.max((int) Math.min(MAX_CAPACITY, ByteArrayMethods.nextPowerOf2(capacity)), 64);
assert (capacity <= MAX_CAPACITY);
longArray = allocateArray(capacity * 2L);
longArray.zeroOut();
this.growthThreshold = (int) (capacity * loadFactor);
this.mask = capacity - 1;
}
/**
* Free all allocated memory associated with this map, including the storage for keys and values
* as well as the hash map array itself.
*
* This method is idempotent and can be called multiple times.
*/
public void free() {
updatePeakMemoryUsed();
if (longArray != null) {
freeArray(longArray);
longArray = null;
}
Iterator<MemoryBlock> dataPagesIterator = dataPages.iterator();
while (dataPagesIterator.hasNext()) {
MemoryBlock dataPage = dataPagesIterator.next();
dataPagesIterator.remove();
freePage(dataPage);
}
assert(dataPages.isEmpty());
while (!spillWriters.isEmpty()) {
File file = spillWriters.removeFirst().getFile();
if (file != null && file.exists()) {
if (!file.delete()) {
logger.error("Was unable to delete spill file {}",
MDC.of(LogKeys.PATH$.MODULE$, file.getAbsolutePath()));
}
}
}
}
public TaskMemoryManager getTaskMemoryManager() {
return taskMemoryManager;
}
public long getPageSizeBytes() {
return pageSizeBytes;
}
/**
* Returns the total amount of memory, in bytes, consumed by this map's managed structures.
*/
public long getTotalMemoryConsumption() {
long totalDataPagesSize = 0L;
for (MemoryBlock dataPage : dataPages) {
totalDataPagesSize += dataPage.size();
}
return totalDataPagesSize + ((longArray != null) ? longArray.memoryBlock().size() : 0L);
}
private void updatePeakMemoryUsed() {
long mem = getTotalMemoryConsumption();
if (mem > peakMemoryUsedBytes) {
peakMemoryUsedBytes = mem;
}
}
/**
* Return the peak memory used so far, in bytes.
*/
public long getPeakMemoryUsedBytes() {
updatePeakMemoryUsed();
return peakMemoryUsedBytes;
}
/**
* Returns the average number of probes per key lookup.
*/
public double getAvgHashProbesPerKey() {
return (1.0 * numProbes) / numKeyLookups;
}
@VisibleForTesting
public int getNumDataPages() {
return dataPages.size();
}
/**
* Returns the underline long[] of longArray.
*/
public LongArray getArray() {
assert(longArray != null);
return longArray;
}
/**
* Reset this map to initialized state.
*/
public void reset() {
updatePeakMemoryUsed();
numKeys = 0;
numValues = 0;
freeArray(longArray);
longArray = null;
while (dataPages.size() > 0) {
MemoryBlock dataPage = dataPages.removeLast();
freePage(dataPage);
}
allocate(initialCapacity);
canGrowArray = true;
currentPage = null;
pageCursor = 0;
}
/**
* Grows the size of the hash table and re-hash everything.
*/
@VisibleForTesting
void growAndRehash() {
assert(longArray != null);
// Store references to the old data structures to be used when we re-hash
final LongArray oldLongArray = longArray;
final int oldCapacity = (int) oldLongArray.size() / 2;
// Allocate the new data structures
allocate(Math.min(growthStrategy.nextCapacity(oldCapacity), MAX_CAPACITY));
// Re-mask (we don't recompute the hashcode because we stored all 32 bits of it)
for (int i = 0; i < oldLongArray.size(); i += 2) {
final long keyPointer = oldLongArray.get(i);
if (keyPointer == 0) {
continue;
}
final int hashcode = (int) oldLongArray.get(i + 1);
int newPos = hashcode & mask;
int step = 1;
while (longArray.get(newPos * 2) != 0) {
newPos = (newPos + step) & mask;
step++;
}
longArray.set(newPos * 2, keyPointer);
longArray.set(newPos * 2 + 1, hashcode);
}
freeArray(oldLongArray);
}
}