blob: d0ac2f8e2359c0756cce403d150f36fb9d46e51e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.util.offheap;
import java.io.Closeable;
import java.io.IOException;
import java.util.Iterator;
import java.util.NoSuchElementException;
import com.google.common.base.Preconditions;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A hash table which can be off-heaped and uses probing.<p/>
*
* Not thread-safe. Requires external synchronization.<p/>
*
* Each entry must be stored in a slot which takes a fixed number of bytes. We
* assume that slots which are zeroed are empty.<p/>
*
* This hash table does not implement the Java collection interface, because we
* want to avoid some of the limitations of that interface. For example, we
* want to be able to have more than 2^^32 entries and to be able to use a hash
* function which is wider than 32 bits.<p/>
*
* This hash table uses linear probing rather than separate chaining to handle
* hash collisions. When we hit a collision when inserting, we put the new
* element into the next open slot.<p/>
*
* When the hash table gets more than a certain percent full, we double the size
* of the table. This requires moving all existing entries.<p/>
*/
@Public
@Unstable
public class ProbingHashTable<K extends ProbingHashTable.Key,
E extends ProbingHashTable.Entry<K>> implements Closeable {
static final Logger LOG =
LoggerFactory.getLogger(ProbingHashTable.class);
/**
* Adapts a given entry class to work with this hash table.<p/>
*
* Specifically, the Adaptor handles storing elements into slots,
* retrieving them, clearing slots, and getting the hash code for entries.<p/>
*/
public interface Adaptor<E> {
/**
* Get the slot size to use for this hash table.
*
* @return How many bytes each slot is in the hash table.
*/
int getSlotSize();
/**
* Load an entry from memory.
*
* @param addr The address to use to create the entry.
*
* @return null if the slot was empty; the entry, otherwise.
*/
E load(long addr);
/**
* Store an entry to memory.
*
* @param e The element to store to memory.
* @param addr The address to store the element to.
*/
void store(E e, long addr);
/**
* Clear a slot.
*
* @param addr The address to clear.
*/
void clear(long addr);
}
public interface Key {
/**
* Get a 64-bit hash code for this key.
*/
long longHash();
/**
* Determine if this key equals another key.
*/
boolean equals(Object other);
/**
* Get a human-readable representation of this key.
*/
String toString();
}
/**
* An entry in the ProbingHashTable.
*/
public interface Entry<K extends Key> {
/**
* Get the key for this entry.
*/
K getKey();
}
/**
* The minimum size to allow.
*/
private static long MIN_SIZE = 4;
/**
* The name of this hash table.
*/
private final String name;
/**
* The memory manager for this hash table.
*/
private final MemoryManager mman;
/**
* The size of each slot in bytes.
*/
private final int slotSize;
/**
* The adaptor to use.
*/
private final Adaptor<E> adaptor;
/**
* The base address of the hash table.
*/
private long base;
/**
* The current number of slots in the hash table.
*/
private long numSlots;
/**
* The current number of entries in the hash table.
*/
private long numEntries;
/**
* The maximum load factor for this hash table.
*/
private float maxLoadFactor;
/**
* The number of entries we should double at.
*/
private long expansionThreshold;
public static long roundUpToPowerOf2(long i) {
long r = 1;
while (r < i) {
r = r << 1;
}
return r;
}
/**
* Create a new ProbingHashTable.
*
* @param name The name of the ProbingHashTable.
* @param mman The memory manager to use.
* @param adaptor The entry factory to use.
* @param initialSize The initial size of the hash table (in number of
* slots, not elements.) Will be rounded up to a
* power of 2.
* @param maxLoadFactor The maximum load factor to allow before doubling
* the hash table size.
*/
public ProbingHashTable(String name, MemoryManager mman, Adaptor<E> adaptor,
long initialSize, float maxLoadFactor) {
this.name = name;
this.mman = mman;
this.slotSize = adaptor.getSlotSize();
this.adaptor = adaptor;
if (initialSize < MIN_SIZE) {
initialSize = MIN_SIZE;
}
this.numSlots = roundUpToPowerOf2((long)(initialSize / maxLoadFactor));
long allocLen = numSlots * slotSize;
this.base = mman.allocateZeroed(allocLen);
this.numEntries = 0;
this.maxLoadFactor = maxLoadFactor;
this.expansionThreshold = (long)(numSlots * maxLoadFactor);
LOG.debug("Created ProbingHashTable(name={}, mman={}, slotSize={}, " +
"adaptor={}, numSlots={}, base=0x{}, allocLen=0x{}," +
"maxLoadFactor={}, expansionThreshold={})",
name, mman.toString(), slotSize,
adaptor.getClass().getCanonicalName(), numSlots,
Long.toHexString(base), Long.toHexString(allocLen), maxLoadFactor,
expansionThreshold);
Preconditions.checkArgument(maxLoadFactor > 0.0f);
Preconditions.checkArgument(maxLoadFactor < 1.0f);
}
/**
* Frees the memory associated with this hash table and does error checking.
*/
public void close() throws IOException {
ProbingHashTableIterator iter = iterator();
if (iter.hasNext()) {
StringBuilder bld = new StringBuilder();
K k = iter.next();
bld.append(k.toString());
int numPrinted = 1;
while (iter.hasNext()) {
if (numPrinted >= 10) {
bld.append("...");
break;
}
bld.append(", ").append(iter.next().toString());
numPrinted++;
}
throw new RuntimeException("Attempted to close the hash table " +
" before all entries were removed. There are still " + numEntries +
" entries remaining, including " + bld.toString());
}
free();
}
/**
* Frees the memory associated with this hash table.
*/
void free() throws IOException {
if (this.base != 0) {
LOG.debug("Freeing {}.", this);
mman.free(this.base);
this.base = 0;
}
}
protected void finalize() throws Throwable {
try {
if (this.base != 0) {
LOG.error("Hash table {} was never closed.", this);
free();
}
} finally {
super.finalize();
}
}
private long getSlot(K key, long nSlots) {
long hash = key.longHash();
if (hash < 0) {
hash = -hash;
}
return hash % nSlots;
}
private E getInternal(K key, boolean remove) {
long originalSlot = getSlot(key, numSlots);
long slot = originalSlot;
long addr;
E target = null;
K targetKey = null;
while (true) {
addr = this.base + (slot * slotSize);
target = adaptor.load(addr);
if (target == null) {
// By the compactness invariant, we're done. See below for more
// discussion.
LOG.trace("{}: getInternal(key={}, remove={}) found nothing.",
this, key, remove);
return null;
}
targetKey = target.getKey();
if (targetKey.equals(key)) {
break;
}
slot++;
if (slot == numSlots) {
slot = 0;
}
if (slot == originalSlot) {
LOG.trace("{}: getInternal(key={}, remove={}) found nothing",
this, key, remove);
return null;
}
}
if (remove) {
adaptor.clear(addr);
numEntries--;
maintainCompactness(slot);
}
LOG.trace("{}: getInternal(key={}, remove={}) found {}",
this, key, remove, targetKey);
return target;
}
/**
* Maintain the compactness invariant.<p/>
*
* In order to avoid doing a full array search when looking for an element
* that may not be in the hash table, we maintain a compactness invariant.
* The compactness invariant states that if we start at slot N and continue
* searching until we hit an empty slot, we will have searched all the
* possible places where the element could be. We maintain the compactness
* invariant by doing a little bit of extra work each time we delete an entry.
* Specifically, we search forwards from the deleted entry, moving any keys
* that need to be moved to maintain the invariant. We can stop searching
* when we hit an empty slot.<p/>
*
* Although maintaining the compactness invariant is O(N) in the worst case,
* it should be O(1) in the average case. This is because the hash table is
* half empty at all times. Assuming good hash dispersion, on average every
* other slot should be empty. Therefore, the average number of entries we
* move here should be less than 1.<p/>
*/
private void maintainCompactness(long startSlot) {
long slot = startSlot;
while (true) {
slot++;
if (slot == numSlots) {
slot = 0;
}
if (slot == startSlot) {
return;
}
long addr = this.base + (slot * slotSize);
E e = adaptor.load(addr);
if (e == null) {
return;
}
E prevE = putInternal(e, false);
if (prevE != null) {
if (LOG.isTraceEnabled()) {
LOG.trace("{}: {} was already in the right place.",
this, e.getKey());
}
} else {
// The put didn't actually add anything, it just moved something.
// So decrement numEntries to its previous value.
numEntries--;
adaptor.clear(addr);
if (LOG.isTraceEnabled()) {
LOG.trace("{}: moved {} to the right place.",
this, e.getKey());
}
}
}
}
public E remove(K key) {
return getInternal(key, true);
}
public E get(K key) {
return getInternal(key, false);
}
private void expandTable(long newNumSlots) {
LOG.info("{}: Expanding table from {} slots to {}...",
this, numSlots, newNumSlots);
long newBase = mman.allocateZeroed(newNumSlots * slotSize);
long oldNumSlots = this.numSlots;
long oldExpansionThreshold = this.expansionThreshold;
long oldBase = this.base;
long oldNumEntries = this.numEntries;
try {
// Switch the hash table over to using the new memory region.
long entriesRemaining = oldNumEntries;
this.numSlots = newNumSlots;
this.expansionThreshold = (long)(newNumSlots * maxLoadFactor);
this.base = newBase;
this.numEntries = 0;
for (long slot = 0; slot < oldNumSlots; slot++) {
long addr = oldBase + (slot * slotSize);
E e = adaptor.load(addr);
if (e != null) {
E prevEntry = putInternal(e, false);
if (prevEntry != null) {
LOG.error("{}: Unexpected duplicate encountered when resizing " +
"hash table: entry {} duplicates {}.", this,
e.getKey(), prevEntry.getKey()
);
}
entriesRemaining--;
}
}
if (entriesRemaining != 0) {
LOG.error("{}: Unexpectedly failed to locate {} entries that we " +
"thought we needed to move when resizing the hash table.",
this, entriesRemaining
);
}
LOG.info("{}: Finished expanding hash table from {} slots to {}. " +
"Moved {} keys. Freed old memory base 0x{}. Using new memory " +
"base 0x{}.", this, oldNumSlots, numSlots, numEntries,
Long.toHexString(oldBase), Long.toHexString(newBase));
} catch (Throwable t) {
// In general we should never get here, since the functions used
// above should not throw exceptions. But it's nice to be safe.
LOG.error("{}: expanding failed! Restoring old memory region.", this, t);
// Switch back to using the old memory region.
this.numSlots = oldNumSlots;
this.expansionThreshold = oldExpansionThreshold;
this.base = oldBase;
this.numEntries = oldNumEntries;
mman.free(newBase);
throw new RuntimeException("Failed to expand " + this, t);
}
mman.free(oldBase);
}
/**
* Expand the hash table if it would need to expand to hold another key.
*/
private void expandTableIfNeeded() {
if (numEntries > expansionThreshold) {
expandTable(numSlots * 2L);
}
}
/**
* Put the entry into the hash table if there is no entry in the hash table
* which is equivalent.
*
* @param putEntry The entry to add if absent.
* @param overwrite If true, we will overwrite the entry which is equal
* to putEntry (if there is one.) If false, we will
* simply return that entry, but not overwrite it.
*
* @return The previous entry in the hash table that was equal
* to the one we wanted to insert. null if there
* was no such entry.
*/
private E putInternal(E putEntry, boolean overwrite) {
long slot = getSlot(putEntry.getKey(), numSlots);
K putKey = putEntry.getKey();
while (true) {
long addr = this.base + (slot * slotSize);
E e = adaptor.load(addr);
if (e == null) {
adaptor.store(putEntry, addr);
numEntries++;
if (LOG.isTraceEnabled()) {
LOG.trace("{}: stored {} into slot {} (addr 0x{})",
this, putKey, slot, Long.toHexString(addr));
}
return null;
}
K k = e.getKey();
if (k.equals(putKey)) {
if (!overwrite) {
if (LOG.isTraceEnabled()) {
LOG.trace("{}: could not store {} because we found an " +
"equivalent key {} in slot {} (addr 0x{})",
this, putKey, k, slot, Long.toHexString(addr));
}
return e;
}
// Overwrite the existing entry.
adaptor.store(putEntry, addr);
if (LOG.isTraceEnabled()) {
LOG.trace("{}: stored {} by overwriting the equivalent key {} " +
"in slot {} (addr 0x{})", this, putKey, k, slot,
Long.toHexString(addr));
}
return e;
}
slot++;
if (slot == numSlots) {
slot = 0;
}
}
}
/**
* Put the entry into the hash table if there is no entry in the hash table
* which is equivalent.
*
* @param putEntry The entry to add.
*
* @return Null if the element was inserted.
* Otherwise, returns the previous element that compares
* to be the same as the one we unsuccessfully tried to
* add.
*/
public E putIfAbsent(E putEntry) {
expandTableIfNeeded(); // call this first in case it fails (very unlikely)
return putInternal(putEntry, false);
}
/**
* Put an entry into the hash table, overwriting any existing element
* which is equivalent.
*
* @param putEntry The entry to add.
*
* @return null if there was no element in the table which was
* equivalent... the existing element which was
* equivalent, otherwise. The existing element will
* be removed.
*/
public E put(E putEntry) {
expandTableIfNeeded(); // call this first in case it fails (very unlikely)
return putInternal(putEntry, true);
}
/**
* Returns the current number of slots in the hash table.
*/
public long numSlots() {
return numSlots;
}
/**
* Returns the size of the table.
*/
public long size() {
return numEntries;
}
/**
* Returns true if the table is empty.
*/
public boolean isEmpty() {
return numEntries == 0;
}
/**
* An iterator for the ProbingHashTable.<p/>
*
* Since ProbingHashTable has no internal synchronization, you are responsible
* for ensuring that there are no concurrent write operations on the hash
* table while an iterator function is being called. The easiest way to do
* this is with external locking.<p/>
*
* You can still perform write operations after creating this iterator
* without invalidating the iterator object. There are a few caveats:<p/>
* 1. Keys inserted after the iterator was created may or may not be
* returned by the iterator.<p/>
* 2. If the hash table is enlarged due to adding more keys, this iterator
* may return keys more than once, and return some keys not at all.<p/>
*/
private class ProbingHashTableIterator implements Iterator<K> {
private long slotId = 0;
private K curKey;
private boolean refillCurKey() {
while (slotId < ProbingHashTable.this.numSlots) {
long addr = base + (slotId * slotSize);
E e = adaptor.load(addr);
slotId++;
if (e != null) {
curKey = e.getKey();
if (LOG.isTraceEnabled()) {
LOG.trace("{}: iterator found another key {} at slot {} " +
"(address 0x{})", ProbingHashTable.this.toString(), curKey,
(slotId - 1), Long.toHexString(addr));
}
return true;
}
}
LOG.trace("{}: no more keys to iterate over after reading all {} " +
"slots.", ProbingHashTable.this.toString(), slotId);
// Set slotId to Long.MAX_VALUE so that even if the hash table enlarges
// in the future, this iterator will continue to be at the end.
slotId = Long.MAX_VALUE;
return false;
}
@Override
public boolean hasNext() {
if (curKey != null) {
return true;
}
return refillCurKey();
}
@Override
public K next() {
if (curKey == null) {
if (!refillCurKey()) {
throw new IllegalStateException();
}
}
K key = curKey;
curKey = null;
return key;
}
@Override
public void remove() {
if (curKey == null) {
throw new IllegalStateException();
}
K key = curKey;
curKey = null;
if (ProbingHashTable.this.remove(key) == null) {
throw new NoSuchElementException("No such element as " +
key.toString());
}
}
}
public ProbingHashTableIterator iterator() {
return new ProbingHashTableIterator();
}
@Override
public String toString() {
return "ProbingHashTable(" + name + ")";
}
}