blob: fc2033bb6a67a40c43b2133acd6d77344c87714d [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.mnemonic.collections;
import org.apache.mnemonic.ConfigurationException;
import org.apache.mnemonic.EntityFactoryProxy;
import org.apache.mnemonic.DurableType;
import org.apache.mnemonic.Durable;
import org.apache.mnemonic.EntityFactoryProxyHelper;
import org.apache.mnemonic.MemChunkHolder;
import org.apache.mnemonic.MemoryDurableEntity;
import org.apache.mnemonic.OutOfHybridMemory;
import org.apache.mnemonic.RestorableAllocator;
import org.apache.mnemonic.RestoreDurableEntityError;
import org.apache.mnemonic.RetrieveDurableEntityError;
import org.apache.mnemonic.Utils;
import org.apache.commons.lang3.ArrayUtils;
import org.flowcomputing.commons.resgc.ReclaimContext;
import sun.misc.Unsafe;
import java.util.Iterator;
import java.util.NoSuchElementException;
public class DurableHashMapImpl<A extends RestorableAllocator<A>, K, V>
extends DurableHashMap<K, V> implements MemoryDurableEntity<A> {
private static final long DEFAULT_MAP_SIZE = 16;
private static final float DEFAULT_MAP_LOAD_FACTOR = 0.75f;
private static final long MAX_OBJECT_SIZE = 8;
private static long[][] fieldInfo;
private Unsafe unsafe;
private EntityFactoryProxy[] factoryProxy;
private EntityFactoryProxy[] listefproxies;
private DurableType[] genericField;
private DurableType[] listgftypes;
private volatile boolean autoResize = true;
private volatile boolean autoReclaim;
private volatile ReclaimContext reclaimcontext;
private MemChunkHolder<A> holder;
private MemChunkHolder<A> chunkAddr;
private A allocator;
* Set initial capacity for a hashmap. It can grow in size.
* @param capacity
* Initial capacity to be set
public void setCapacityHint(long capacity) {
if (0 == capacity) {
totalCapacity = DEFAULT_MAP_SIZE;
} else {
totalCapacity = 1;
while (totalCapacity < capacity) {
totalCapacity <<= 1;
threshold = (long) (totalCapacity * DEFAULT_MAP_LOAD_FACTOR);
* Add a new key-value pair to map
* @param key
* the key to be set
* @param value
* the value to be set
* @return previous value with key else return null
public V put(K key, V value) throws OutOfHybridMemory {
int hash = hash(key.hashCode());
long bucketIndex = getBucketIndex(hash);
long bucketAddr = holder.get() + MAX_OBJECT_SIZE * bucketIndex;
V retVal = addEntry(key, value, bucketAddr);
if (autoResize && (mapSize >= threshold)) {
resize(2 * totalCapacity);
return retVal;
* Add a new key-value pair to map at a given bucket address
* @param key
* the key to be set
* @param value
* the value to be set
* @param bucketAddr
* the addr of the bucket where key is hashed
* @return previous value with key else return null
protected V addEntry(K key, V value, long bucketAddr) throws OutOfHybridMemory {
V retValue = null;
long handler = unsafe.getAddress(bucketAddr);
if (0L == handler) {
SinglyLinkedNode<MapEntry<K, V>> head = null;
MapEntry<K, V> entry = null;
try {
head = SinglyLinkedNodeFactory.create(allocator, listefproxies, listgftypes, false, reclaimcontext);
entry = MapEntryFactory.create(allocator, factoryProxy, genericField, false, reclaimcontext);
} catch (OutOfHybridMemory fe) {
if (null != head) {
if (null != entry) {
throw fe;
entry.setKey(key, false);
entry.setValue(value, false);
head.setItem(entry, false);
unsafe.putLong(bucketAddr, head.getHandler());
} else {
SinglyLinkedNode<MapEntry<K, V>> head = SinglyLinkedNodeFactory.restore(allocator,
listefproxies, listgftypes, handler, false, reclaimcontext);
SinglyLinkedNode<MapEntry<K, V>> prev = head;
boolean found = false;
while (null != head) {
MapEntry<K, V> mapEntry = head.getItem();
K entryKey = mapEntry.getKey();
if (entryKey == key || entryKey.equals(key)) {
retValue = mapEntry.getValue();
if (retValue instanceof Durable) {
mapEntry.setValue(value, false);
} else {
mapEntry.setValue(value, true);
found = true;
prev = head;
head = head.getNext();
if (true != found) {
SinglyLinkedNode<MapEntry<K, V>> newNode = null;
MapEntry<K, V> entry = null;
try {
newNode = SinglyLinkedNodeFactory.create(allocator, listefproxies, listgftypes,
false, reclaimcontext);
entry = MapEntryFactory.create(allocator, factoryProxy, genericField, false, reclaimcontext);
} catch (OutOfHybridMemory fe) {
if (null != newNode) {
if (null != entry) {
throw fe;
entry.setKey(key, false);
entry.setValue(value, false);
newNode.setItem(entry, false);
prev.setNext(newNode, false);
return retValue;
* Return a value to which key is mapped
* @param key
* the key whose value is to be retrieved
* @return previous value with key else return null
public V get(K key) {
int hash = hash(key.hashCode());
long bucketIndex = getBucketIndex(hash);
long bucketAddr = holder.get() + MAX_OBJECT_SIZE * bucketIndex;
return getEntry(key, bucketAddr);
* Return a value to which key is mapped given a bucket address
* @param key
* the key whose value is to be retrieved
* @param bucketAddr
* the addr of the bucket where key is hashed
* @return previous value with key else return null
protected V getEntry(K key, long bucketAddr) {
V retValue = null;
long handler = unsafe.getAddress(bucketAddr);
if (0L != handler) {
SinglyLinkedNode<MapEntry<K, V>> head = SinglyLinkedNodeFactory.restore(allocator,
listefproxies, listgftypes, handler, false, reclaimcontext);
while (null != head) {
MapEntry<K, V> mapEntry = head.getItem();
K entryKey = mapEntry.getKey();
if (entryKey == key || entryKey.equals(key)) {
retValue = mapEntry.getValue();
head = head.getNext();
return retValue;
* Remove a mapping for a specified key
* @param key
* the key whose value is to be removed
* @return previous value with key else return null
public V remove(K key) {
int hash = hash(key.hashCode());
long bucketIndex = getBucketIndex(hash);
long bucketAddr = holder.get() + MAX_OBJECT_SIZE * bucketIndex;
return removeEntry(key, bucketAddr);
* Remove a mapping for a specified key at given bucket address
* @param key
* the key whose value is to be removed
* @param bucketAddr
* the addr of the bucket where key is hashed
* @return previous value with key else return null
protected V removeEntry(K key, long bucketAddr) {
V retValue = null;
long handler = unsafe.getAddress(bucketAddr);
if (0L != handler) {
SinglyLinkedNode<MapEntry<K, V>> head = SinglyLinkedNodeFactory.restore(allocator,
listefproxies, listgftypes, handler, false, reclaimcontext);
SinglyLinkedNode<MapEntry<K, V>> prev = null;
boolean found = false;
while (null != head) {
MapEntry<K, V> mapEntry = head.getItem();
K entryKey = mapEntry.getKey();
if (entryKey == key || entryKey.equals(key)) {
retValue = mapEntry.getValue();
found = true;
prev = head;
head = head.getNext();
if (true == found) {
if (null == prev) {
if (null == head.getNext()) {
unsafe.putAddress(bucketAddr, 0L);
} else {
unsafe.putAddress(bucketAddr, head.getNext().getHandler());
} else {
prev.setNext(head.getNext(), false);
return retValue;
* Rehashes the entire map into a new map of given capacity
* @param newCapacity
* the capacity of new map
public void resize(long newCapacity) {
MemChunkHolder<A> prevHolder = holder;
long bucketAddr = prevHolder.get();
long maxbucketAddr = bucketAddr + MAX_OBJECT_SIZE * totalCapacity;
holder = allocator.createChunk(MAX_OBJECT_SIZE * newCapacity, autoReclaim);
if (null == holder) {
autoResize = false;
holder = prevHolder;
totalCapacity = newCapacity;
threshold = (long) (totalCapacity * DEFAULT_MAP_LOAD_FACTOR);
unsafe.putLong(chunkAddr.get(), allocator.getChunkHandler(holder));
while (bucketAddr < maxbucketAddr) {
long handler = unsafe.getAddress(bucketAddr);
if (0L != handler) {
SinglyLinkedNode<MapEntry<K, V>> head = SinglyLinkedNodeFactory.restore(allocator,
listefproxies, listgftypes, handler, false, reclaimcontext);
SinglyLinkedNode<MapEntry<K, V>> curr = head;
while (null != curr) {
curr = curr.getNext();
head = curr;
bucketAddr += MAX_OBJECT_SIZE;
* Transfers a map item from old map to the new map
* @param elem
* the item in the old map
protected void transfer(SinglyLinkedNode<MapEntry<K, V>> elem) {
int hash = hash(elem.getItem().getKey().hashCode());
long bucketIndex = getBucketIndex(hash);
long bucketAddr = holder.get() + MAX_OBJECT_SIZE * bucketIndex;
long handler = unsafe.getAddress(bucketAddr);
if (0L != handler) {
SinglyLinkedNode<MapEntry<K, V>> head = SinglyLinkedNodeFactory.restore(allocator,
listefproxies, listgftypes, handler, false, reclaimcontext);
elem.setNext(head, false);
} else {
elem.setNext(null, false);
unsafe.putLong(bucketAddr, elem.getHandler());
* Recomputes the size of the map during restore without persistence
* @return size of the map
protected long recomputeMapSize() {
long size = 0;
long bucketAddr = holder.get();
long maxbucketAddr = bucketAddr + MAX_OBJECT_SIZE * totalCapacity;
while (bucketAddr < maxbucketAddr) {
long handler = unsafe.getAddress(bucketAddr);
if (0L != handler) {
SinglyLinkedNode<MapEntry<K, V>> head = SinglyLinkedNodeFactory.restore(allocator,
listefproxies, listgftypes, handler, false, reclaimcontext);
while (null != head) {
head = head.getNext();
bucketAddr += MAX_OBJECT_SIZE;
return size;
public boolean autoReclaim() {
return autoReclaim;
* sync. this object
public void syncToVolatileMemory() {
* Make any cached changes to this object persistent.
public void syncToNonVolatileMemory() {
* flush processors cache for this object
public void syncToLocal() {
public long[][] getNativeFieldInfo() {
return fieldInfo;
public void refbreak() {
public void destroy() throws RetrieveDurableEntityError {
long bucketAddr = holder.get();
long maxbucketAddr = bucketAddr + MAX_OBJECT_SIZE * totalCapacity;
SinglyLinkedNode<MapEntry<K, V>> head, prev;
while (bucketAddr < maxbucketAddr) {
long handler = unsafe.getAddress(bucketAddr);
if (0L != handler) {
head = SinglyLinkedNodeFactory.restore(allocator,
listefproxies, listgftypes, handler, false, reclaimcontext);
prev = head;
while (null != head) {
head = head.getNext();
prev.destroy(); //TODO: Destroy head in a cascading way
prev = head;
bucketAddr += MAX_OBJECT_SIZE;
public void cancelAutoReclaim() {
autoReclaim = false;
public void registerAutoReclaim() {
public void registerAutoReclaim(ReclaimContext rctx) {
autoReclaim = true;
reclaimcontext = rctx;
public long getHandler() {
return allocator.getChunkHandler(chunkAddr);
public void restoreDurableEntity(A allocator, EntityFactoryProxy[] factoryProxy,
DurableType[] gField, long phandler, boolean autoReclaim, ReclaimContext rctx)
throws RestoreDurableEntityError {
initializeDurableEntity(allocator, factoryProxy, gField, autoReclaim, rctx);
if (0L == phandler) {
throw new RestoreDurableEntityError("Input handler is null on restoreDurableEntity.");
chunkAddr = allocator.retrieveChunk(phandler, autoReclaim, reclaimcontext);
long chunkHandler = unsafe.getLong(chunkAddr.get());
holder = allocator.retrieveChunk(chunkHandler, autoReclaim, reclaimcontext);
if (null == holder || null == chunkAddr) {
throw new RestoreDurableEntityError("Retrieve Entity Failure!");
setCapacityHint(holder.getSize() / MAX_OBJECT_SIZE);
mapSize = recomputeMapSize();
public void initializeDurableEntity(A allocator, EntityFactoryProxy[] factoryProxy,
DurableType[] gField, boolean autoReclaim, ReclaimContext rctx) {
this.allocator = allocator;
this.factoryProxy = factoryProxy;
this.genericField = gField;
this.autoReclaim = autoReclaim;
this.reclaimcontext = rctx;
DurableType gftypes[] = {DurableType.DURABLE};
this.listgftypes = ArrayUtils.addAll(gftypes, genericField);
EntityFactoryProxy efproxies[] = new EntityFactoryProxy[0];
try {
efproxies = new EntityFactoryProxy[]{
new EntityFactoryProxyHelper<MapEntry>(MapEntry.class, 1, reclaimcontext)};
} catch (ClassNotFoundException e) {
throw new ConfigurationException("Class MapEntry not found");
} catch (NoSuchMethodException e) {
throw new ConfigurationException("The methods of class MapEntry not found");
this.listefproxies = ArrayUtils.addAll(efproxies, factoryProxy);
try {
this.unsafe = Utils.getUnsafe();
} catch (Exception e) {
public void createDurableEntity(A allocator, EntityFactoryProxy[] factoryProxy,
DurableType[] gField, boolean autoReclaim, ReclaimContext rctx) throws OutOfHybridMemory {
initializeDurableEntity(allocator, factoryProxy, gField, autoReclaim, rctx);
this.holder = allocator.createChunk(MAX_OBJECT_SIZE * totalCapacity, autoReclaim, reclaimcontext);
this.chunkAddr = allocator.createChunk(MAX_OBJECT_SIZE, autoReclaim, reclaimcontext);
unsafe.putLong(chunkAddr.get(), allocator.getChunkHandler(holder));
if (null == this.holder || null == this.chunkAddr) {
throw new OutOfHybridMemory("Create Durable Entity Error!");
public Iterator<MapEntry<K, V>> iterator() {
return new HashMapItr(this);
private class HashMapItr implements Iterator<MapEntry<K, V>> {
long currentBucketAddr = 0;
long prevBucketAddr = 0;
long maxBucketAddr = 0;
DurableHashMapImpl<A, K, V> map;
SinglyLinkedNode<MapEntry<K, V>> currentNode = null;
SinglyLinkedNode<MapEntry<K, V>> prevNode = null;
SinglyLinkedNode<MapEntry<K, V>> prevPrevNode = null;
HashMapItr(DurableHashMapImpl<A, K, V> map) { = map;
currentBucketAddr = map.holder.get();
maxBucketAddr = currentBucketAddr + MAX_OBJECT_SIZE * map.totalCapacity;
public boolean hasNext() {
return (null != currentNode);
public void nextValidBucket() {
while ((null == currentNode) && (currentBucketAddr < maxBucketAddr)) {
long handler = unsafe.getAddress(currentBucketAddr);
if (0L != handler) {
currentNode = SinglyLinkedNodeFactory.restore(allocator,
listefproxies, listgftypes, handler, false, reclaimcontext);
currentBucketAddr += MAX_OBJECT_SIZE;
public MapEntry<K, V> next() {
if (null != currentNode) {
MapEntry<K, V> entry = currentNode.getItem();
if (prevBucketAddr != currentBucketAddr) {
prevPrevNode = null;
prevBucketAddr = currentBucketAddr;
} else {
prevPrevNode = prevNode;
prevNode = currentNode;
currentNode = currentNode.getNext();
if (null == currentNode) {
currentBucketAddr += MAX_OBJECT_SIZE;
return entry;
} else {
throw new NoSuchElementException();
public void remove() {
if (null == prevPrevNode) {
if (null == prevNode.getNext()) {
unsafe.putAddress(prevBucketAddr, 0L);
} else {
unsafe.putAddress(prevBucketAddr, prevNode.getNext().getHandler());
prevNode = null;
} else {
prevPrevNode.setNext(prevNode.getNext(), false);
prevNode = prevPrevNode;