blob: 7b50c04dfec34c72097c77222fcc8bc777229a69 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
import java.lang.reflect.Array;
import java.util.Comparator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
* A BTree abstract class containing the methods shared by the PersistedBTree or the InMemoryBTree
* implementations.
* @param <K> The Key type
* @param <V> The Value type
* @author <a href="">Apache Directory Project</a>
/* No qualifier*/abstract class AbstractBTree<K, V> implements BTree<K, V>
/** The read transaction timeout */
protected long readTimeOut = DEFAULT_READ_TIMEOUT;
/** The current Header for a managed BTree */
protected BTreeHeader<K, V> currentBtreeHeader;
/** The Key serializer used for this tree.*/
protected ElementSerializer<K> keySerializer;
/** The Value serializer used for this tree. */
protected ElementSerializer<V> valueSerializer;
/** The list of read transactions being executed */
protected ConcurrentLinkedQueue<ReadTransaction<K, V>> readTransactions;
/** The size of the buffer used to write data in disk */
protected int writeBufferSize;
/** Flag to enable duplicate key support */
protected boolean allowDuplicates;
/** The number of elements in a page for this B-tree */
protected int pageSize;
/** The BTree name */
protected String name;
/** The FQCN of the Key serializer */
protected String keySerializerFQCN;
/** The FQCN of the Value serializer */
protected String valueSerializerFQCN;
/** The thread responsible for the cleanup of timed out reads */
protected Thread readTransactionsThread;
/** The BTree type : either in-memory, disk backed or persisted */
protected BTreeTypeEnum btreeType;
/** The current transaction */
protected AtomicBoolean transactionStarted = new AtomicBoolean( false );
/** The map of all the used BtreeHeaders */
protected Map<Long, BTreeHeader<K, V>> btreeRevisions = new ConcurrentHashMap<Long, BTreeHeader<K, V>>();
/** The current revision */
protected AtomicLong currentRevision = new AtomicLong( 0L );
/** The TransactionManager used for this BTree */
protected TransactionManager transactionManager;
/** The size of the stack to use to manage tree searches */
private final static int MAX_STACK_DEPTH = 32;
* Starts a Read Only transaction. If the transaction is not closed, it will be
* automatically closed after the timeout
* @return The created transaction
protected abstract ReadTransaction<K, V> beginReadTransaction();
* Starts a Read Only transaction. If the transaction is not closed, it will be
* automatically closed after the timeout
* @return The created transaction
protected abstract ReadTransaction<K, V> beginReadTransaction( long revision );
* {@inheritDoc}
public TupleCursor<K, V> browse() throws IOException, KeyNotFoundException
// Check that we have a TransactionManager
if ( transactionManager == null )
throw new BTreeCreationException( "We don't have a transactionLManager" );
ReadTransaction<K, V> transaction = beginReadTransaction();
if ( transaction == null )
return new EmptyTupleCursor<K, V>();
ParentPos<K, V>[] stack = ( ParentPos<K, V>[] ) Array.newInstance( ParentPos.class, MAX_STACK_DEPTH );
TupleCursor<K, V> cursor = getRootPage().browse( transaction, stack, 0 );
// Set the position before the first element
return cursor;
* {@inheritDoc}
public TupleCursor<K, V> browse( long revision ) throws IOException, KeyNotFoundException
// Check that we have a TransactionManager
if ( transactionManager == null )
throw new BTreeCreationException( "We don't have a transactionLManager" );
ReadTransaction<K, V> transaction = beginReadTransaction( revision );
if ( transaction == null )
return new EmptyTupleCursor<K, V>();
ParentPos<K, V>[] stack = ( ParentPos<K, V>[] ) Array.newInstance( ParentPos.class, MAX_STACK_DEPTH );
// And get the cursor
TupleCursor<K, V> cursor = getRootPage( transaction.getRevision() ).browse( transaction, stack, 0 );
return cursor;
* {@inheritDoc}
public TupleCursor<K, V> browseFrom( K key ) throws IOException
// Check that we have a TransactionManager
if ( transactionManager == null )
throw new BTreeCreationException( "We don't have a transactionLManager" );
ReadTransaction<K, V> transaction = beginReadTransaction();
ParentPos<K, V>[] stack = ( ParentPos<K, V>[] ) Array.newInstance( ParentPos.class, MAX_STACK_DEPTH );
TupleCursor<K, V> cursor = getRootPage( transaction.getRevision() ).browse( key, transaction, stack, 0 );
return cursor;
catch ( KeyNotFoundException e )
throw new IOException( e.getMessage() );
* {@inheritDoc}
public TupleCursor<K, V> browseFrom( long revision, K key ) throws IOException, KeyNotFoundException
// Check that we have a TransactionManager
if ( transactionManager == null )
throw new BTreeCreationException( "We don't have a transactionLManager" );
ReadTransaction<K, V> transaction = beginReadTransaction( revision );
if ( transaction == null )
return new EmptyTupleCursor<K, V>();
ParentPos<K, V>[] stack = ( ParentPos<K, V>[] ) Array.newInstance( ParentPos.class, MAX_STACK_DEPTH );
// And get the cursor
TupleCursor<K, V> cursor = getRootPage( transaction.getRevision() ).browse( key, transaction, stack, 0 );
return cursor;
* {@inheritDoc}
public boolean contains( K key, V value ) throws IOException
// Check that we have a TransactionManager
if ( transactionManager == null )
throw new BTreeCreationException( "We don't have a transactionLManager" );
ReadTransaction<K, V> transaction = beginReadTransaction();
if ( transaction == null )
return false;
return getRootPage( transaction.getRevision() ).contains( key, value );
catch ( KeyNotFoundException knfe )
throw new IOException( knfe.getMessage() );
* {@inheritDoc}
public boolean contains( long revision, K key, V value ) throws IOException, KeyNotFoundException
// Check that we have a TransactionManager
if ( transactionManager == null )
throw new BTreeCreationException( "We don't have a transactionLManager" );
// Fetch the root page for this revision
ReadTransaction<K, V> transaction = beginReadTransaction( revision );
if ( transaction == null )
return false;
return getRootPage( transaction.getRevision() ).contains( key, value );
* {@inheritDoc}
public Tuple<K, V> delete( K key ) throws IOException
// Check that we have a TransactionManager
if ( transactionManager == null )
throw new BTreeCreationException( "We don't have a transactionLManager" );
if ( key == null )
throw new IllegalArgumentException( "Key must not be null" );
// Take the lock if it's not already taken by another thread
Tuple<K, V> deleted = delete( key, currentRevision.get() + 1 );
// Commit now
return deleted;
catch ( IOException ioe )
// We have had an exception, we must rollback the transaction
return null;
* {@inheritDoc}
public Tuple<K, V> delete( K key, V value ) throws IOException
// Check that we have a TransactionManager
if ( transactionManager == null )
throw new BTreeCreationException( "We don't have a transactionLManager" );
if ( key == null )
throw new IllegalArgumentException( "Key must not be null" );
if ( value == null )
throw new IllegalArgumentException( "Value must not be null" );
Tuple<K, V> deleted = delete( key, value, currentRevision.get() + 1 );
return deleted;
catch ( IOException ioe )
throw ioe;
* Delete the entry which key is given as a parameter. If the entry exists, it will
* be removed from the tree, the old tuple will be returned. Otherwise, null is returned.
* @param key The key for the entry we try to remove
* @return A Tuple<K, V> containing the removed entry, or null if it's not found.
/*no qualifier*/Tuple<K, V> delete( K key, long revision ) throws IOException
return delete( key, null, revision );
/*no qualifier*/abstract Tuple<K, V> delete( K key, V value, long revision ) throws IOException;
* {@inheritDoc}
public V insert( K key, V value ) throws IOException
// Check that we have a TransactionManager
if ( transactionManager == null )
throw new BTreeCreationException( "We don't have a transactionLManager" );
V existingValue = null;
if ( key == null )
throw new IllegalArgumentException( "Key must not be null" );
// Take the lock if it's not already taken by another thread and if we
// aren't on a sub-btree
if ( btreeType != BTreeTypeEnum.PERSISTED_SUB )
InsertResult<K, V> result = insert( key, value, -1L );
if ( result instanceof ExistsResult )
existingValue = value;
else if ( result instanceof ModifyResult )
existingValue = ( ( ModifyResult<K, V> ) result ).getModifiedValue();
// Commit now if it's not a sub-btree
if ( btreeType != BTreeTypeEnum.PERSISTED_SUB )
//FIXME when result type is ExistsResult then we should avoid writing the headers
return existingValue;
catch ( IOException ioe )
// We have had an exception, we must rollback the transaction
// if it's not a sub-btree
if ( btreeType != BTreeTypeEnum.PERSISTED_SUB )
return null;
catch ( DuplicateValueNotAllowedException e )
// We have had an exception, we must rollback the transaction
// if it's not a sub-btree
if ( btreeType != BTreeTypeEnum.PERSISTED_SUB )
throw e;
* {@inheritDoc}
/* no qualifier */abstract InsertResult<K, V> insert( K key, V value, long revision ) throws IOException;
* Flush the latest revision to disk. We will replace the current file by the new one, as
* we flush in a temporary file.
public void flush() throws IOException
* {@inheritDoc}
public V get( K key ) throws IOException, KeyNotFoundException
// Check that we have a TransactionManager
if ( transactionManager == null )
throw new BTreeCreationException( "We don't have a transactionLManager" );
ReadTransaction<K, V> transaction = beginReadTransaction();
if ( transaction == null )
return null;
return getRootPage( transaction.getRevision() ).get( key );
* {@inheritDoc}
public V get( long revision, K key ) throws IOException, KeyNotFoundException
// Check that we have a TransactionManager
if ( transactionManager == null )
throw new BTreeCreationException( "We don't have a transactionLManager" );
ReadTransaction<K, V> transaction = beginReadTransaction( revision );
if ( transaction == null )
return null;
return getRootPage( transaction.getRevision() ).get( key );
* {@inheritDoc}
public abstract Page<K, V> getRootPage();
* {@inheritDoc}
/* no qualifier */abstract void setRootPage( Page<K, V> root );
* {@inheritDoc}
public ValueCursor<V> getValues( K key ) throws IOException, KeyNotFoundException
// Check that we have a TransactionManager
if ( transactionManager == null )
throw new BTreeCreationException( "We don't have a transactionLManager" );
ReadTransaction<K, V> transaction = beginReadTransaction();
if ( transaction == null )
return new EmptyValueCursor<V>( 0L );
return getRootPage( transaction.getRevision() ).getValues( key );
* {@inheritDoc}
public boolean hasKey( K key ) throws IOException, KeyNotFoundException
// Check that we have a TransactionManager
if ( transactionManager == null )
throw new BTreeCreationException( "We don't have a transactionLManager" );
if ( key == null )
return false;
ReadTransaction<K, V> transaction = beginReadTransaction();
if ( transaction == null )
return false;
return getRootPage( transaction.getRevision() ).hasKey( key );
* {@inheritDoc}
public boolean hasKey( long revision, K key ) throws IOException, KeyNotFoundException
// Check that we have a TransactionManager
if ( transactionManager == null )
throw new BTreeCreationException( "We don't have a transactionLManager" );
if ( key == null )
return false;
ReadTransaction<K, V> transaction = beginReadTransaction( revision );
if ( transaction == null )
return false;
return getRootPage( transaction.getRevision() ).hasKey( key );
* {@inheritDoc}
public ElementSerializer<K> getKeySerializer()
return keySerializer;
* {@inheritDoc}
public void setKeySerializer( ElementSerializer<K> keySerializer )
this.keySerializer = keySerializer;
keySerializerFQCN = keySerializer.getClass().getName();
* {@inheritDoc}
public String getKeySerializerFQCN()
return keySerializerFQCN;
* {@inheritDoc}
public ElementSerializer<V> getValueSerializer()
return valueSerializer;
* {@inheritDoc}
public void setValueSerializer( ElementSerializer<V> valueSerializer )
this.valueSerializer = valueSerializer;
valueSerializerFQCN = valueSerializer.getClass().getName();
* {@inheritDoc}
public String getValueSerializerFQCN()
return valueSerializerFQCN;
* {@inheritDoc}
public long getRevision()
// Check that we have a TransactionManager
if ( transactionManager == null )
throw new BTreeCreationException( "We don't have a transactionLManager" );
ReadTransaction<K, V> transaction = beginReadTransaction();
if ( transaction == null )
return -1L;
return transaction.getRevision();
* {@inheritDoc}
/* no qualifier */void setRevision( long revision )
transactionManager.getBTreeHeader( getName() ).setRevision( revision );
* Store the new revision in the map of btrees, increment the current revision
protected void storeRevision( BTreeHeader<K, V> btreeHeader, boolean keepRevisions )
long revision = btreeHeader.getRevision();
if ( keepRevisions )
synchronized ( btreeRevisions )
btreeRevisions.put( revision, btreeHeader );
currentRevision.set( revision );
currentBtreeHeader = btreeHeader;
// And update the newBTreeHeaders map
if ( btreeHeader.getBtree().getType() != BTreeTypeEnum.PERSISTED_SUB )
transactionManager.updateNewBTreeHeaders( btreeHeader );
* Store the new revision in the map of btrees, increment the current revision
protected void storeRevision( BTreeHeader<K, V> btreeHeader )
long revision = btreeHeader.getRevision();
synchronized ( btreeRevisions )
btreeRevisions.put( revision, btreeHeader );
currentRevision.set( revision );
currentBtreeHeader = btreeHeader;
// And update the newBTreeHeaders map
if ( btreeHeader.getBtree().getType() != BTreeTypeEnum.PERSISTED_SUB )
transactionManager.updateNewBTreeHeaders( btreeHeader );
* {@inheritDoc}
public long getReadTimeOut()
return readTimeOut;
* {@inheritDoc}
public void setReadTimeOut( long readTimeOut )
this.readTimeOut = readTimeOut;
* {@inheritDoc}
public long getNbElems()
// Check that we have a TransactionManager
if ( transactionManager == null )
throw new BTreeCreationException( "We don't have a transactionLManager" );
ReadTransaction<K, V> transaction = beginReadTransaction();
if ( transaction == null )
return -1L;
return transaction.getBtreeHeader().getNbElems();
* {@inheritDoc}
/* no qualifier */void setNbElems( long nbElems )
transactionManager.getBTreeHeader( getName() ).setNbElems( nbElems );
* {@inheritDoc}
public int getPageSize()
return pageSize;
* {@inheritDoc}
public void setPageSize( int pageSize )
if ( pageSize <= 2 )
this.pageSize = DEFAULT_PAGE_SIZE;
this.pageSize = getPowerOf2( pageSize );
* {@inheritDoc}
public String getName()
return name;
* {@inheritDoc}
public void setName( String name )
{ = name;
* {@inheritDoc}
public Comparator<K> getKeyComparator()
return keySerializer.getComparator();
* {@inheritDoc}
public Comparator<V> getValueComparator()
return valueSerializer.getComparator();
* {@inheritDoc}
public int getWriteBufferSize()
return writeBufferSize;
* {@inheritDoc}
public void setWriteBufferSize( int writeBufferSize )
this.writeBufferSize = writeBufferSize;
* {@inheritDoc}
public boolean isAllowDuplicates()
return allowDuplicates;
* {@inheritDoc}
public void setAllowDuplicates( boolean allowDuplicates )
this.allowDuplicates = allowDuplicates;
* {@inheritDoc}
public BTreeTypeEnum getType()
return btreeType;
* @param type the type to set
public void setType( BTreeTypeEnum type )
this.btreeType = type;
* Gets the number which is a power of 2 immediately above the given positive number.
private int getPowerOf2( int size )
int newSize = --size;
newSize |= newSize >> 1;
newSize |= newSize >> 2;
newSize |= newSize >> 4;
newSize |= newSize >> 8;
newSize |= newSize >> 16;
return newSize;
* @return The current BtreeHeader
protected BTreeHeader<K, V> getBtreeHeader()
return currentBtreeHeader;
* @return The current BtreeHeader
protected BTreeHeader<K, V> getBtreeHeader( long revision )
return btreeRevisions.get( revision );
* {@inheritDoc}
public KeyCursor<K> browseKeys() throws IOException, KeyNotFoundException
// Check that we have a TransactionManager
if ( transactionManager == null )
throw new BTreeCreationException( "We don't have a Transaction Manager" );
ReadTransaction transaction = beginReadTransaction();
if ( transaction == null )
return new KeyCursor<K>();
ParentPos<K, K>[] stack = ( ParentPos<K, K>[] ) Array.newInstance( ParentPos.class, MAX_STACK_DEPTH );
KeyCursor<K> cursor = getRootPage().browseKeys( transaction, stack, 0 );
// Set the position before the first element
return cursor;
* Create a thread that is responsible of cleaning the transactions when
* they hit the timeout
/*no qualifier*/void createTransactionManager()
Runnable readTransactionTask = new Runnable()
public void run()
ReadTransaction<K, V> transaction = null;
while ( !Thread.currentThread().isInterrupted() )
long timeoutDate = System.currentTimeMillis() - readTimeOut;
long t0 = System.currentTimeMillis();
int nbTxns = 0;
// Loop on all the transactions from the queue
while ( ( transaction = readTransactions.peek() ) != null )
if ( transaction.isClosed() )
// The transaction is already closed, remove it from the queue
// Check if the transaction has timed out
if ( transaction.getCreationDate() < timeoutDate )
synchronized ( btreeRevisions )
btreeRevisions.remove( transaction.getRevision() );
// We need to stop now
long t1 = System.currentTimeMillis();
// Wait until we reach the timeout
Thread.sleep( readTimeOut );
catch ( InterruptedException ie )
//System.out.println( "Interrupted" );
catch ( Exception e )
throw new RuntimeException( e );
readTransactionsThread = new Thread( readTransactionTask );
readTransactionsThread.setDaemon( true );