blob: fb2ef27243c0209816656453fbaa263c9bb31474 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.kaha.impl.index.hash;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.StoreEntry;
import org.apache.activemq.kaha.impl.index.Index;
import org.apache.activemq.kaha.impl.index.IndexManager;
import org.apache.activemq.util.DataByteArrayInputStream;
import org.apache.activemq.util.DataByteArrayOutputStream;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.LRUCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* BTree implementation
*
*
*/
public class HashIndex implements Index, HashIndexMBean {
public static final int DEFAULT_PAGE_SIZE;
public static final int DEFAULT_KEY_SIZE;
public static final int DEFAULT_BIN_SIZE;
public static final int MAXIMUM_CAPACITY;
public static final int DEFAULT_LOAD_FACTOR;
private static final int LOW_WATER_MARK=1024*16;
private static final String NAME_PREFIX = "hash-index-";
private static final Logger LOG = LoggerFactory.getLogger(HashIndex.class);
private final String name;
private File directory;
private File file;
private RandomAccessFile indexFile;
private IndexManager indexManager;
private int pageSize = DEFAULT_PAGE_SIZE;
private int keySize = DEFAULT_KEY_SIZE;
private int numberOfBins = DEFAULT_BIN_SIZE;
private int keysPerPage = this.pageSize /this.keySize;
private DataByteArrayInputStream dataIn;
private DataByteArrayOutputStream dataOut;
private byte[] readBuffer;
private HashBin[] bins;
private Marshaller keyMarshaller;
private long length;
private LinkedList<HashPage> freeList = new LinkedList<HashPage>();
private AtomicBoolean loaded = new AtomicBoolean();
private LRUCache<Long, HashPage> pageCache;
private boolean enablePageCaching=false;//this is off by default - see AMQ-1667
private int pageCacheSize = 10;
private int size;
private int highestSize=0;
private int activeBins;
private int threshold;
private int maximumCapacity=MAXIMUM_CAPACITY;
private int loadFactor=DEFAULT_LOAD_FACTOR;
/**
* Constructor
*
* @param directory
* @param name
* @param indexManager
* @throws IOException
*/
public HashIndex(File directory, String name, IndexManager indexManager) throws IOException {
this.directory = directory;
this.name = name;
this.indexManager = indexManager;
openIndexFile();
pageCache = new LRUCache<Long, HashPage>(pageCacheSize, pageCacheSize, 0.75f, true);
}
/**
* Set the marshaller for key objects
*
* @param marshaller
*/
public synchronized void setKeyMarshaller(Marshaller marshaller) {
this.keyMarshaller = marshaller;
}
/**
* @return the keySize
*/
public synchronized int getKeySize() {
return this.keySize;
}
/**
* @param keySize the keySize to set
*/
public synchronized void setKeySize(int keySize) {
this.keySize = keySize;
if (loaded.get()) {
throw new RuntimeException("Pages already loaded - can't reset key size");
}
}
/**
* @return the pageSize
*/
public synchronized int getPageSize() {
return this.pageSize;
}
/**
* @param pageSize the pageSize to set
*/
public synchronized void setPageSize(int pageSize) {
if (loaded.get() && pageSize != this.pageSize) {
throw new RuntimeException("Pages already loaded - can't reset page size");
}
this.pageSize = pageSize;
}
/**
* @return number of bins
*/
public int getNumberOfBins() {
return this.numberOfBins;
}
/**
* @param numberOfBins
*/
public void setNumberOfBins(int numberOfBins) {
if (loaded.get() && numberOfBins != this.numberOfBins) {
throw new RuntimeException("Pages already loaded - can't reset bin size");
}
this.numberOfBins = numberOfBins;
}
/**
* @return the enablePageCaching
*/
public synchronized boolean isEnablePageCaching() {
return this.enablePageCaching;
}
/**
* @param enablePageCaching the enablePageCaching to set
*/
public synchronized void setEnablePageCaching(boolean enablePageCaching) {
this.enablePageCaching = enablePageCaching;
}
/**
* @return the pageCacheSize
*/
public synchronized int getPageCacheSize() {
return this.pageCacheSize;
}
/**
* @param pageCacheSize the pageCacheSize to set
*/
public synchronized void setPageCacheSize(int pageCacheSize) {
this.pageCacheSize = pageCacheSize;
pageCache.setMaxCacheSize(pageCacheSize);
}
public synchronized boolean isTransient() {
return false;
}
/**
* @return the threshold
*/
public int getThreshold() {
return threshold;
}
/**
* @param threshold the threshold to set
*/
public void setThreshold(int threshold) {
this.threshold = threshold;
}
/**
* @return the loadFactor
*/
public int getLoadFactor() {
return loadFactor;
}
/**
* @param loadFactor the loadFactor to set
*/
public void setLoadFactor(int loadFactor) {
this.loadFactor = loadFactor;
}
/**
* @return the maximumCapacity
*/
public int getMaximumCapacity() {
return maximumCapacity;
}
/**
* @param maximumCapacity the maximumCapacity to set
*/
public void setMaximumCapacity(int maximumCapacity) {
this.maximumCapacity = maximumCapacity;
}
public synchronized int getSize() {
return size;
}
public synchronized int getActiveBins(){
return activeBins;
}
public synchronized void load() {
if (loaded.compareAndSet(false, true)) {
int capacity = 1;
while (capacity < numberOfBins) {
capacity <<= 1;
}
this.bins = new HashBin[capacity];
this.numberOfBins=capacity;
threshold = calculateThreashold();
keysPerPage = pageSize / keySize;
dataIn = new DataByteArrayInputStream();
dataOut = new DataByteArrayOutputStream(pageSize);
readBuffer = new byte[pageSize];
try {
openIndexFile();
if (indexFile.length() > 0) {
doCompress();
}
} catch (IOException e) {
LOG.error("Failed to load index ", e);
throw new RuntimeException(e);
}
}
}
public synchronized void unload() throws IOException {
if (loaded.compareAndSet(true, false)) {
if (indexFile != null) {
indexFile.close();
indexFile = null;
freeList.clear();
pageCache.clear();
bins = new HashBin[bins.length];
}
}
}
public synchronized void store(Object key, StoreEntry value) throws IOException {
load();
HashEntry entry = new HashEntry();
entry.setKey((Comparable)key);
entry.setIndexOffset(value.getOffset());
if (!getBin(key).put(entry)) {
this.size++;
}
if (this.size >= this.threshold) {
resize(2*bins.length);
}
if(this.size > this.highestSize) {
this.highestSize=this.size;
}
}
public synchronized StoreEntry get(Object key) throws IOException {
load();
HashEntry entry = new HashEntry();
entry.setKey((Comparable)key);
HashEntry result = getBin(key).find(entry);
return result != null ? indexManager.getIndex(result.getIndexOffset()) : null;
}
public synchronized StoreEntry remove(Object key) throws IOException {
load();
StoreEntry result = null;
HashEntry entry = new HashEntry();
entry.setKey((Comparable)key);
HashEntry he = getBin(key).remove(entry);
if (he != null) {
this.size--;
result = this.indexManager.getIndex(he.getIndexOffset());
}
if (this.highestSize > LOW_WATER_MARK && this.highestSize > (this.size *2)) {
int newSize = this.size/this.keysPerPage;
newSize = Math.max(128, newSize);
this.highestSize=0;
resize(newSize);
}
return result;
}
public synchronized boolean containsKey(Object key) throws IOException {
return get(key) != null;
}
public synchronized void clear() throws IOException {
unload();
delete();
openIndexFile();
load();
}
public synchronized void delete() throws IOException {
unload();
if (file.exists()) {
file.delete();
}
length = 0;
}
HashPage lookupPage(long pageId) throws IOException {
HashPage result = null;
if (pageId >= 0) {
result = getFromCache(pageId);
if (result == null) {
result = getFullPage(pageId);
if (result != null) {
if (result.isActive()) {
addToCache(result);
} else {
throw new IllegalStateException("Trying to access an inactive page: " + pageId);
}
}
}
}
return result;
}
HashPage createPage(int binId) throws IOException {
HashPage result = getNextFreePage();
if (result == null) {
// allocate one
result = new HashPage(keysPerPage);
result.setId(length);
result.setBinId(binId);
writePageHeader(result);
length += pageSize;
indexFile.seek(length);
indexFile.write(HashEntry.NOT_SET);
}
addToCache(result);
return result;
}
void releasePage(HashPage page) throws IOException {
removeFromCache(page);
page.reset();
page.setActive(false);
writePageHeader(page);
freeList.add(page);
}
private HashPage getNextFreePage() throws IOException {
HashPage result = null;
if(!freeList.isEmpty()) {
result = freeList.removeFirst();
result.setActive(true);
result.reset();
writePageHeader(result);
}
return result;
}
void writeFullPage(HashPage page) throws IOException {
dataOut.reset();
page.write(keyMarshaller, dataOut);
if (dataOut.size() > pageSize) {
throw new IOException("Page Size overflow: pageSize is " + pageSize + " trying to write " + dataOut.size());
}
indexFile.seek(page.getId());
indexFile.write(dataOut.getData(), 0, dataOut.size());
}
void writePageHeader(HashPage page) throws IOException {
dataOut.reset();
page.writeHeader(dataOut);
indexFile.seek(page.getId());
indexFile.write(dataOut.getData(), 0, HashPage.PAGE_HEADER_SIZE);
}
HashPage getFullPage(long id) throws IOException {
indexFile.seek(id);
indexFile.readFully(readBuffer, 0, pageSize);
dataIn.restart(readBuffer);
HashPage page = new HashPage(keysPerPage);
page.setId(id);
page.read(keyMarshaller, dataIn);
return page;
}
HashPage getPageHeader(long id) throws IOException {
indexFile.seek(id);
indexFile.readFully(readBuffer, 0, HashPage.PAGE_HEADER_SIZE);
dataIn.restart(readBuffer);
HashPage page = new HashPage(keysPerPage);
page.setId(id);
page.readHeader(dataIn);
return page;
}
void addToBin(HashPage page) throws IOException {
int index = page.getBinId();
if (index >= this.bins.length) {
resize(index+1);
}
HashBin bin = getBin(index);
bin.addHashPageInfo(page.getId(), page.getPersistedSize());
}
private HashBin getBin(int index) {
HashBin result = bins[index];
if (result == null) {
result = new HashBin(this, index, pageSize / keySize);
bins[index] = result;
activeBins++;
}
return result;
}
private void openIndexFile() throws IOException {
if (indexFile == null) {
file = new File(directory, NAME_PREFIX + IOHelper.toFileSystemSafeName(name));
IOHelper.mkdirs(file.getParentFile());
indexFile = new RandomAccessFile(file, "rw");
}
}
private HashBin getBin(Object key) {
int hash = hash(key);
int i = indexFor(hash, bins.length);
return getBin(i);
}
private HashPage getFromCache(long pageId) {
HashPage result = null;
if (enablePageCaching) {
result = pageCache.get(pageId);
}
return result;
}
private void addToCache(HashPage page) {
if (enablePageCaching) {
pageCache.put(page.getId(), page);
}
}
private void removeFromCache(HashPage page) {
if (enablePageCaching) {
pageCache.remove(page.getId());
}
}
private void doLoad() throws IOException {
long offset = 0;
if (loaded.compareAndSet(false, true)) {
while ((offset + pageSize) <= indexFile.length()) {
indexFile.seek(offset);
indexFile.readFully(readBuffer, 0, HashPage.PAGE_HEADER_SIZE);
dataIn.restart(readBuffer);
HashPage page = new HashPage(keysPerPage);
page.setId(offset);
page.readHeader(dataIn);
if (!page.isActive()) {
page.reset();
freeList.add(page);
} else {
addToBin(page);
size+=page.size();
}
offset += pageSize;
}
length=offset;
}
}
private void doCompress() throws IOException {
String backFileName = name + "-COMPRESS";
HashIndex backIndex = new HashIndex(directory,backFileName,indexManager);
backIndex.setKeyMarshaller(keyMarshaller);
backIndex.setKeySize(getKeySize());
backIndex.setNumberOfBins(getNumberOfBins());
backIndex.setPageSize(getPageSize());
backIndex.load();
File backFile = backIndex.file;
long offset = 0;
while ((offset + pageSize) <= indexFile.length()) {
indexFile.seek(offset);
HashPage page = getFullPage(offset);
if (page.isActive()) {
for (HashEntry entry : page.getEntries()) {
backIndex.getBin(entry.getKey()).put(entry);
backIndex.size++;
}
}
page=null;
offset += pageSize;
}
backIndex.unload();
unload();
IOHelper.deleteFile(file);
IOHelper.copyFile(backFile, file);
IOHelper.deleteFile(backFile);
openIndexFile();
doLoad();
}
private void resize(int newCapacity) throws IOException {
if (bins.length < getMaximumCapacity()) {
if (newCapacity != numberOfBins) {
int capacity = 1;
while (capacity < newCapacity) {
capacity <<= 1;
}
newCapacity=capacity;
if (newCapacity != numberOfBins) {
LOG.info("Resize hash bins " + this.name + " from " + numberOfBins + " to " + newCapacity);
String backFileName = name + "-REISZE";
HashIndex backIndex = new HashIndex(directory,backFileName,indexManager);
backIndex.setKeyMarshaller(keyMarshaller);
backIndex.setKeySize(getKeySize());
backIndex.setNumberOfBins(newCapacity);
backIndex.setPageSize(getPageSize());
backIndex.load();
File backFile = backIndex.file;
long offset = 0;
while ((offset + pageSize) <= indexFile.length()) {
indexFile.seek(offset);
HashPage page = getFullPage(offset);
if (page.isActive()) {
for (HashEntry entry : page.getEntries()) {
backIndex.getBin(entry.getKey()).put(entry);
backIndex.size++;
}
}
page=null;
offset += pageSize;
}
backIndex.unload();
unload();
IOHelper.deleteFile(file);
IOHelper.copyFile(backFile, file);
IOHelper.deleteFile(backFile);
setNumberOfBins(newCapacity);
bins = new HashBin[newCapacity];
threshold = calculateThreashold();
openIndexFile();
doLoad();
}
}
}else {
threshold = Integer.MAX_VALUE;
return;
}
}
private int calculateThreashold() {
return (int)(bins.length * loadFactor);
}
public String toString() {
String str = "HashIndex"+System.identityHashCode(this)+": "+file.getName();
return str;
}
static int hash(Object x) {
int h = x.hashCode();
h += ~(h << 9);
h ^= h >>> 14;
h += h << 4;
h ^= h >>> 10;
return h;
}
static int indexFor(int h, int length) {
return h & (length - 1);
}
static {
DEFAULT_PAGE_SIZE = Integer.parseInt(System.getProperty("defaultPageSize", "1024"));
DEFAULT_KEY_SIZE = Integer.parseInt(System.getProperty("defaultKeySize", "96"));
DEFAULT_BIN_SIZE= Integer.parseInt(System.getProperty("defaultBinSize", "1024"));
MAXIMUM_CAPACITY = Integer.parseInt(System.getProperty("maximumCapacity", "16384"));
DEFAULT_LOAD_FACTOR=Integer.parseInt(System.getProperty("defaultLoadFactor","50"));
}
}