/*
 *  Licensed to the Apache Software Foundation (ASF) under one
 *  or more contributor license agreements.  See the NOTICE file
 *  distributed with this work for additional information
 *  regarding copyright ownership.  The ASF licenses this file
 *  to you under the Apache License, Version 2.0 (the
 *  "License"); you may not use this file except in compliance
 *  with the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing,
 *  software distributed under the License is distributed on an
 *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 *  KIND, either express or implied.  See the License for the
 *  specific language governing permissions and limitations
 *  under the License.
 *
 */
package org.apache.directory.mavibot.btree;


import static org.apache.directory.mavibot.btree.BTreeTypeEnum.PERSISTED_SUB;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.Stack;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.apache.directory.mavibot.btree.exception.BTreeAlreadyManagedException;
import org.apache.directory.mavibot.btree.exception.BTreeCreationException;
import org.apache.directory.mavibot.btree.exception.EndOfFileExceededException;
import org.apache.directory.mavibot.btree.exception.FileException;
import org.apache.directory.mavibot.btree.exception.InvalidOffsetException;
import org.apache.directory.mavibot.btree.exception.KeyNotFoundException;
import org.apache.directory.mavibot.btree.exception.RecordManagerException;
import org.apache.directory.mavibot.btree.serializer.ElementSerializer;
import org.apache.directory.mavibot.btree.serializer.IntSerializer;
import org.apache.directory.mavibot.btree.serializer.LongArraySerializer;
import org.apache.directory.mavibot.btree.serializer.LongSerializer;
import org.apache.directory.mavibot.btree.serializer.StringSerializer;
import org.apache.directory.mavibot.btree.util.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
 * The RecordManager is used to manage the file in which we will store the B-trees.
 * A RecordManager will manage more than one B-tree.<br/>
 *
 * It stores data in fixed size pages (default size is 512 bytes), which may be linked one to
 * the other if the data we want to store is too big for a page.
 *
 * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
 */
public class RecordManager extends AbstractTransactionManager
{
    /** The LoggerFactory used by this class */
    protected static final Logger LOG = LoggerFactory.getLogger( RecordManager.class );

    /** The LoggerFactory used by this class */
    protected static final Logger LOG_PAGES = LoggerFactory.getLogger( "org.apache.directory.mavibot.LOG_PAGES" );

    /** A dedicated logger for the check */
    protected static final Logger LOG_CHECK = LoggerFactory.getLogger( "org.apache.directory.mavibot.LOG_CHECK" );

    /** The associated file */
    private File file;

    /** The channel used to read and write data */
    /* no qualifier */ FileChannel fileChannel;

    /** The number of managed B-trees */
    /* no qualifier */ int nbBtree;

    /** The first and last free page */
    /* no qualifier */ long firstFreePage;

    /** The list of available free pages */
    List<PageIO> freePages = new ArrayList<PageIO>();

    /** Some counters to track the number of free pages */
    public AtomicLong nbFreedPages = new AtomicLong( 0 );
    public AtomicLong nbCreatedPages = new AtomicLong( 0 );
    public AtomicLong nbReusedPages = new AtomicLong( 0 );
    public AtomicLong nbUpdateRMHeader = new AtomicLong( 0 );
    public AtomicLong nbUpdateBtreeHeader = new AtomicLong( 0 );
    public AtomicLong nbUpdatePageIOs = new AtomicLong( 0 );

    /** The offset of the end of the file */
    private long endOfFileOffset;

    /**
     * A Map used to hold the pages that were copied in a new version.
     * Those pages can be reclaimed when the associated version is dead.
     * 
     * Note: the offsets are of AbstractPageS' while freeing the associated
     *       PageIOs will be fetched and freed.
     **/
    /* no qualifier */ Map<RevisionName, long[]> copiedPageMap = null;

    /** A constant for an offset on a non existing page */
    public static final long NO_PAGE = -1L;

    /** The number of bytes needed to store the data size of a page */
    private static final int PAGE_DATA_SIZE_LENGTH = 4;

    /** The size of the link to next page */
    private static final int LINK_SIZE = 8;

    /** Some constants */
    private static final int BYTE_SIZE = 1;
    /* no qualifier */ static final int INT_SIZE = 4;
    /* no qualifier */ static final int LONG_SIZE = 8;

    /** The default page size */
    public static final int DEFAULT_PAGE_SIZE = 512;

    /** The minimal page size. Can't be below 64, as we have to store many thing sin the RMHeader */
    private static final int MIN_PAGE_SIZE = 64;

    /** The RecordManager header size */
    /* no qualifier */ static int RECORD_MANAGER_HEADER_SIZE = DEFAULT_PAGE_SIZE;

    /** A global buffer used to store the RecordManager header */
    private ByteBuffer RECORD_MANAGER_HEADER_BUFFER;

    /** A static buffer used to store the RecordManager header */
    private byte[] RECORD_MANAGER_HEADER_BYTES;

    /** The length of an Offset, as a negative value */
    private  byte[] LONG_LENGTH = new byte[]
        { ( byte ) 0xFF, ( byte ) 0xFF, ( byte ) 0xFF, ( byte ) 0xF8 };

    /** The RecordManager underlying page size. */
    /* no qualifier */ int pageSize = DEFAULT_PAGE_SIZE;

    /** The set of managed B-trees */
    private Map<String, BTree<Object, Object>> managedBtrees;
    
    /** The queue of recently closed transactions */
    private Queue<RevisionName> closedTransactionsQueue = new LinkedBlockingQueue<RevisionName>();

    /** The default file name */
    private static final String DEFAULT_FILE_NAME = "mavibot.db";

    /** A flag set to true if we want to keep old revisions */
    private boolean keepRevisions;

    /** A flag used by internal btrees */
    public static final boolean INTERNAL_BTREE = true;

    /** A flag used by internal btrees */
    public static final boolean NORMAL_BTREE = false;

    /** The B-tree of B-trees */
    /* no qualifier */ BTree<String, RevisionAndHeaderOffset> btreeOfBtrees;

    /** The B-tree of B-trees management btree name */
    /* no qualifier */ static final String BTREE_OF_BTREES_NAME = "_btree_of_btrees_";

    /** The CopiedPages management btree name */
    /* no qualifier */ static final String COPIED_PAGE_BTREE_NAME = "_copiedPageBtree_";

    /** The current B-tree of B-trees header offset */
    /* no qualifier */ long currentBtreeOfBtreesOffset;

    /** The previous B-tree of B-trees header offset */
    private long previousBtreeOfBtreesOffset = NO_PAGE;

    /** A lock to protect the transaction handling */
    private Lock transactionLock = new ReentrantLock();
    
    /** A ThreadLocalStorage used to store the current transaction */
    private static final ThreadLocal<Integer> context = new ThreadLocal<Integer>();

    /** The list of PageIO that can be freed after a commit */
    List<PageIO> freedPages = new ArrayList<PageIO>();

    /** The list of PageIO that can be freed after a roolback */
    private List<PageIO> allocatedPages = new ArrayList<PageIO>();
    
    /** A Map keeping the latest revisions for each managed BTree */
    private Map<String, BTreeHeader<?, ?>> currentBTreeHeaders = new HashMap<String, BTreeHeader<?, ?>>();

    /** A Map storing the new revisions when some change have been made in some BTrees */
    private Map<String, BTreeHeader<?, ?>> newBTreeHeaders = new HashMap<String, BTreeHeader<?, ?>>();
    
    /** A lock to protect the BtreeHeader maps */
    private ReadWriteLock btreeHeadersLock = new ReentrantReadWriteLock();
    
    /** A value stored into the transaction context for rollbacked transactions */
    private static final int ROLLBACKED_TXN = 0;

    /** A lock to protect the freepage pointers */
    private ReentrantLock freePageLock = new ReentrantLock();

    /** the space reclaimer */
    private SpaceReclaimer reclaimer;
    
    /** variable to keep track of the write commit count */
    private int commitCount = 0;
    
    /** the threshold at which the SpaceReclaimer will be run to free the copied pages */
    private int spaceReclaimerThreshold = 200;
    
    /**
     * Create a Record manager which will either create the underlying file
     * or load an existing one. If a folder is provided, then we will create
     * a file with a default name : mavibot.db
     *
     * @param name The file name, or a folder name
     */
    public RecordManager( String fileName )
    {
        this( fileName, DEFAULT_PAGE_SIZE );
    }


    /**
     * Create a Record manager which will either create the underlying file
     * or load an existing one. If a folder is provider, then we will create
     * a file with a default name : mavibot.db
     *
     * @param name The file name, or a folder name
     * @param pageSize the size of a page on disk, in bytes
     */
    public RecordManager( String fileName, int pageSize )
    {
        managedBtrees = new LinkedHashMap<String, BTree<Object, Object>>();

        if ( pageSize < MIN_PAGE_SIZE )
        {
            this.pageSize = MIN_PAGE_SIZE;
        }
        else
        {
            this.pageSize = pageSize;
        }

        RECORD_MANAGER_HEADER_BUFFER = ByteBuffer.allocate( this.pageSize );
        RECORD_MANAGER_HEADER_BYTES = new byte[this.pageSize];
        RECORD_MANAGER_HEADER_SIZE = this.pageSize;

        // Open the file or create it
        File tmpFile = new File( fileName );

        if ( tmpFile.isDirectory() )
        {
            // It's a directory. Check that we don't have an existing mavibot file
            tmpFile = new File( tmpFile, DEFAULT_FILE_NAME );
        }

        // We have to create a new file, if it does not already exist
        boolean isNewFile = createFile( tmpFile );

        try
        {
            RandomAccessFile randomFile = new RandomAccessFile( file, "rw" );
            fileChannel = randomFile.getChannel();

            // get the current end of file offset
            endOfFileOffset = fileChannel.size();

            if ( isNewFile )
            {
                initRecordManager();
            }
            else
            {
                loadRecordManager();
            }
            
            reclaimer = new SpaceReclaimer( this );
            
            copiedPageMap = reclaimer.readCopiedPageMap( file.getParentFile() );
            runReclaimer();
        }
        catch ( Exception e )
        {
            LOG.error( "Error while initializing the RecordManager : {}", e.getMessage() );
            LOG.error( "", e );
            throw new RecordManagerException( e );
        }
    }

    
    /**
     * runs the SpaceReclaimer to free the copied pages
     */
    private void runReclaimer()
    {
        try
        {
            commitCount = 0;
            reclaimer.reclaim();
        }
        catch( Exception e )
        {
            LOG.warn( "SpaceReclaimer failed to free the pages", e );
        }
    }

    
    /**
     * Create the mavibot file if it does not exist
     */
    private boolean createFile( File mavibotFile )
    {
        try
        {
            boolean creation = mavibotFile.createNewFile();

            file = mavibotFile;

            if ( mavibotFile.length() == 0 )
            {
                return true;
            }
            else
            {
                return creation;
            }
        }
        catch ( IOException ioe )
        {
            LOG.error( "Cannot create the file {}", mavibotFile.getName() );
            return false;
        }
    }


    /**
     * We will create a brand new RecordManager file, containing nothing, but the RecordManager header,
     * a B-tree to manage the old revisions we want to keep and
     * a B-tree used to manage pages associated with old versions.
     * <br/>
     * The RecordManager header contains the following details :
     * <pre>
     * +--------------------------+
     * | PageSize                 | 4 bytes : The size of a physical page (default to {@link #DEFAULT_PAGE_SIZE})
     * +--------------------------+
     * |  NbTree                  | 4 bytes : The number of managed B-trees (at least 1)
     * +--------------------------+
     * | FirstFree                | 8 bytes : The offset of the first free page
     * +--------------------------+
     * | current BoB offset       | 8 bytes : The offset of the current BoB
     * +--------------------------+
     * | previous BoB offset      | 8 bytes : The offset of the previous BoB
     * +--------------------------+
     * </pre>
     *
     * We then store the B-tree managing the pages that have been copied when we have added
     * or deleted an element in the B-tree. They are associated with a version.
     *
     * Last, we add the bTree that keep a track on each revision we can have access to.
     */
    private void initRecordManager() throws IOException
    {
        // Create a new Header
        nbBtree = 0;
        firstFreePage = NO_PAGE;
        currentBtreeOfBtreesOffset = 0L;

        updateRecordManagerHeader();

        // Set the offset of the end of the file
        endOfFileOffset = fileChannel.size();

        // First, create the btree of btrees <NameRevision, Long>
        createBtreeOfBtrees();

        // Inject these B-trees into the RecordManager. They are internal B-trees.
        try
        {
            manage( btreeOfBtrees, INTERNAL_BTREE );

            currentBtreeOfBtreesOffset = ((PersistedBTree<String, RevisionAndHeaderOffset>)btreeOfBtrees).getBtreeHeader().getBTreeHeaderOffset();
            updateRecordManagerHeader();
            
            // Inject the BtreeOfBtrees into the currentBtreeHeaders map
            currentBTreeHeaders.put( BTREE_OF_BTREES_NAME,  ((PersistedBTree<String, RevisionAndHeaderOffset>)btreeOfBtrees).getBtreeHeader() );
            newBTreeHeaders.put( BTREE_OF_BTREES_NAME,  ((PersistedBTree<String, RevisionAndHeaderOffset>)btreeOfBtrees).getBtreeHeader() );
        }
        catch ( BTreeAlreadyManagedException btame )
        {
            // Can't happen here.
        }

        // We are all set ! Verify the file
        if ( LOG_CHECK.isDebugEnabled() )
        {
            MavibotInspector.check( this );
        }

    }


    /**
     * Create the B-treeOfBtrees
     */
    private void createBtreeOfBtrees()
    {
        PersistedBTreeConfiguration<String, RevisionAndHeaderOffset> configuration = new PersistedBTreeConfiguration<String, RevisionAndHeaderOffset>();
        configuration.setKeySerializer( StringSerializer.INSTANCE );
        configuration.setName( BTREE_OF_BTREES_NAME );
        configuration.setValueSerializer( RevisionAndHeaderOffsetSerializer.INSTANCE );
        configuration.setBtreeType( BTreeTypeEnum.BTREE_OF_BTREES );
        configuration.setCacheSize( PersistedBTree.DEFAULT_CACHE_SIZE );
        configuration.setAllowDuplicates( true );

        btreeOfBtrees = BTreeFactory.createPersistedBTree( configuration );
    }


    /**
     * Load the BTrees from the disk.
     *
     * @throws InstantiationException
     * @throws IllegalAccessException
     * @throws ClassNotFoundException
     * @throws NoSuchFieldException
     * @throws SecurityException
     * @throws IllegalArgumentException
     */
    private void loadRecordManager() throws IOException, ClassNotFoundException, IllegalAccessException,
        InstantiationException, IllegalArgumentException, SecurityException, NoSuchFieldException, KeyNotFoundException
    {
        if ( fileChannel.size() != 0 )
        {
            ByteBuffer recordManagerHeader = ByteBuffer.allocate( RECORD_MANAGER_HEADER_SIZE );

            // The file exists, we have to load the data now
            fileChannel.read( recordManagerHeader );

            recordManagerHeader.rewind();

            // read the RecordManager Header :
            // +---------------------+
            // | PageSize            | 4 bytes : The size of a physical page (default to DEFAULT_PAGE_SIZE)
            // +---------------------+
            // | NbTree              | 4 bytes : The number of managed B-trees (at least 1)
            // +---------------------+
            // | FirstFree           | 8 bytes : The offset of the first free page
            // +---------------------+
            // | current BoB offset  | 8 bytes : The offset of the current B-tree of B-trees
            // +---------------------+
            // | previous BoB offset | 8 bytes : The offset of the previous B-tree of B-trees
            // +---------------------+

            // The page size
            pageSize = recordManagerHeader.getInt();

            // The number of managed B-trees
            nbBtree = recordManagerHeader.getInt();

            // The first and last free page
            firstFreePage = recordManagerHeader.getLong();

            // The current BOB offset
            currentBtreeOfBtreesOffset = recordManagerHeader.getLong();

            // The previous BOB offset
            previousBtreeOfBtreesOffset = recordManagerHeader.getLong();

            // read the B-tree of B-trees
            PageIO[] bobHeaderPageIos = readPageIOs( currentBtreeOfBtreesOffset, Long.MAX_VALUE );

            btreeOfBtrees = BTreeFactory.<String, RevisionAndHeaderOffset> createPersistedBTree( BTreeTypeEnum.BTREE_OF_BTREES );
            //BTreeFactory.<NameRevision, Long> setBtreeHeaderOffset( ( PersistedBTree<NameRevision, Long> )btreeOfBtrees, currentBtreeOfBtreesOffset );

            loadBtree( bobHeaderPageIos, btreeOfBtrees );

            // Now, read all the B-trees from the btree of btrees
            TupleCursor<String, RevisionAndHeaderOffset> btreeCursor = btreeOfBtrees.browse();
            Map<String, RevisionAndHeaderOffset> loadedBtrees = new HashMap<String, RevisionAndHeaderOffset>();

            while ( btreeCursor.hasNext() )
            {
                Tuple<String, RevisionAndHeaderOffset> btreeTuple = btreeCursor.next();
                String name = btreeTuple.getKey();
                ValueCursor<RevisionAndHeaderOffset> cursor = btreeOfBtrees.getValues( name );
                
                cursor.afterLast();
                RevisionAndHeaderOffset revisionHeader = cursor.prev();

                // This is a new B-tree
                loadedBtrees.put( name, revisionHeader );
                
                if ( cursor instanceof ValueBTreeCursor )
                {
                    PersistedBTree subBtree = ( PersistedBTree ) ( ( ValueBTreeCursor ) cursor ).getValueBTree();
                    _deleteBTree( subBtree, 0 );
                }
                
                cursor.close();
            }

            btreeCursor.close();
            
            // Now, we can load the real btrees using the offsets
            for ( String btreeName : loadedBtrees.keySet() )
            {
                RevisionAndHeaderOffset headerOffset = loadedBtrees.get( btreeName );

                PageIO[] btreePageIos = readPageIOs( headerOffset.getOffset(), Long.MAX_VALUE );

                BTree<?, ?> btree = BTreeFactory.<NameRevision, Long> createPersistedBTree();
                //( ( PersistedBTree<NameRevision, Long> ) btree ).setBtreeHeaderOffset( btreeOffset );
                loadBtree( btreePageIos, btree );

                // Add the btree into the map of managed B-trees
                managedBtrees.put( btreeName, ( BTree<Object, Object> ) btree );
                
                btreeOfBtrees.delete( btreeName );
                btreeOfBtrees.insert( btreeName, headerOffset );
            }

            // We are done ! Let's finish with the last initialization parts
            endOfFileOffset = fileChannel.size();
        }
    }


    /**
     * Starts a transaction
     */
    public void beginTransaction()
    {
        // First, take the lock
        transactionLock.lock();
        
        // Now, check the TLS state
        incrementTxnLevel();
    }


    /**
     * Commits a transaction
     */
    public void commit()
    {
        if ( !fileChannel.isOpen() )
        {
            // The file has been closed, nothing remains to commit, let's get out
            transactionLock.unlock();
            
            // Still we have to decrement the TransactionLevel
            decrementTxnLevel();
            
            return;
        }

        int nbTxnStarted = context.get();
        
        switch ( nbTxnStarted )
        {
            case ROLLBACKED_TXN :
                // The transaction was rollbacked, quit immediatelly
                transactionLock.unlock();
                
                return;
            
            case 1 :
                // We are done with the transaction, we can update the RMHeader and swap the BTreeHeaders
                // First update the RMHeader to be sure that we have a way to restore from a crash
                updateRecordManagerHeader();
                
                // Swap the BtreeHeaders maps
                swapCurrentBtreeHeaders();
        
                // We can now free pages
                for ( PageIO pageIo : freedPages )
                {
                    try
                    {
                        free( pageIo );
                    }
                    catch ( IOException ioe )
                    {
                        throw new RecordManagerException( ioe.getMessage() );
                    }
                }
        
                // Release the allocated and freed pages list
                freedPages.clear();
                allocatedPages.clear();
        
                // And update the RMHeader again, removing the old references to BOB and CPB b-tree headers
                // here, we have to erase the old references to keep only the new ones.
                updateRecordManagerHeader();
                
                // And decrement the number of started transactions
                decrementTxnLevel();

                commitCount++;
                
                if( commitCount >= spaceReclaimerThreshold )
                {
                    runReclaimer();
                }
                
                // Finally, release the global lock
                transactionLock.unlock();
                
                return;
                
            default :
                // We are inner an existing transaction. Just update the necessary elements
                // Update the RMHeader to be sure that we have a way to restore from a crash
                updateRecordManagerHeader();
                
                // Swap the BtreeHeaders maps
                //swapCurrentBtreeHeaders();
        
                // We can now free pages
                for ( PageIO pageIo : freedPages )
                {
                    try
                    {
                        free( pageIo );
                    }
                    catch ( IOException ioe )
                    {
                        throw new RecordManagerException( ioe.getMessage() );
                    }
                }
        
                // Release the allocated and freed pages list
                freedPages.clear();
                allocatedPages.clear();
        
                // And update the RMHeader again, removing the old references to BOB and CPB b-tree headers
                // here, we have to erase the old references to keep only the new ones.
                updateRecordManagerHeader();
                
                // And decrement the number of started transactions
                decrementTxnLevel();

                commitCount++;
                
                if( commitCount >= spaceReclaimerThreshold )
                {
                    runReclaimer();
                }

                // Finally, release the global lock
                transactionLock.unlock();
                
                return;
        }
    }
    
    
    public boolean isContextOk()
    {
        return ( context == null ? true : ( context.get() == 0 ) );
    }
    
    /**
     * Increment the transactionLevel
     */
    private void incrementTxnLevel()
    {
        Integer nbTxnLevel = context.get();
        
        if ( nbTxnLevel == null )
        {
            context.set( 1 );
        }
        else
        {
            // And increment the counter of inner txn.
            context.set( nbTxnLevel + 1 );
        }
        
        /*
        System.out.println( "Incrementing : " + context.get() );
        
        if ( context.get() == 0 )
        {
            System.out.println( "-------------" );
        }
        */
    }
    
    
    /**
     * Decrement the transactionLevel
     */
    private void decrementTxnLevel()
    {
        int nbTxnStarted = context.get();

        context.set(  nbTxnStarted - 1 );
        
        //System.out.println( "Incrementing : " + context.get() );
    }


    /**
     * Rollback a transaction
     */
    public void rollback()
    {
        // Reset the counter
        context.set( ROLLBACKED_TXN );

        // We can now free allocated pages, this is the end of the transaction
        for ( PageIO pageIo : allocatedPages )
        {
            try
            {
                free( pageIo );
            }
            catch ( IOException ioe )
            {
                throw new RecordManagerException( ioe.getMessage() );
            }
        }

        // Release the allocated and freed pages list
        freedPages.clear();
        allocatedPages.clear();

        // And update the RMHeader
        updateRecordManagerHeader();
        
        // And restore the BTreeHeaders new Map to the current state
        revertBtreeHeaders();

        transactionLock.unlock();
    }


    /**
     * Reads all the PageIOs that are linked to the page at the given position, including
     * the first page.
     *
     * @param position The position of the first page
     * @param limit The maximum bytes to read. Set this value to -1 when the size is unknown.
     * @return An array of pages
     */
    /*no qualifier*/ PageIO[] readPageIOs( long position, long limit ) throws IOException, EndOfFileExceededException
    {
        LOG.debug( "Read PageIOs at position {}", position );

        if ( limit <= 0 )
        {
            limit = Long.MAX_VALUE;
        }

        PageIO firstPage = fetchPage( position );
        
        if( firstPage.isFree() )
        {
            System.out.println("------------- NOT freeing free page ---------------");
            return new PageIO[]{};
        }
        else
        {
            firstPage.setSize();
        }
        
        List<PageIO> listPages = new ArrayList<PageIO>();
        listPages.add( firstPage );
        long dataRead = pageSize - LONG_SIZE - INT_SIZE;

        // Iterate on the pages, if needed
        long nextPage = firstPage.getNextPage();

        if ( ( dataRead < limit ) && ( nextPage != NO_PAGE ) )
        {
            while ( dataRead < limit )
            {
                PageIO page = fetchPage( nextPage );
                listPages.add( page );
                nextPage = page.getNextPage();
                dataRead += pageSize - LONG_SIZE;

                if ( nextPage == NO_PAGE )
                {
                    page.setNextPage( NO_PAGE );
                    break;
                }
            }
        }

        LOG.debug( "Nb of PageIOs read : {}", listPages.size() );

        // Return
        return listPages.toArray( new PageIO[]
            {} );
    }


    /**
     * Check the offset to be sure it's a valid one :
     * <ul>
     * <li>It's >= 0</li>
     * <li>It's below the end of the file</li>
     * <li>It's a multiple of the pageSize
     * </ul>
     * @param offset The offset to check
     * @throws InvalidOffsetException If the offset is not valid
     */
    /* no qualifier */ void checkOffset( long offset )
    {
        if ( ( offset < 0 ) || ( offset > endOfFileOffset ) || ( ( offset % pageSize ) != 0 ) )
        {
            throw new InvalidOffsetException( "Bad Offset : " + offset );
        }
    }


    /**
     * Read a B-tree from the disk. The meta-data are at the given position in the list of pages.
     * We load a B-tree in two steps : first, we load the B-tree header, then the common informations
     *
     * @param pageIos The list of pages containing the meta-data
     * @param btree The B-tree we have to initialize
     * @throws InstantiationException
     * @throws IllegalAccessException
     * @throws ClassNotFoundException
     * @throws NoSuchFieldException
     * @throws SecurityException
     * @throws IllegalArgumentException
     */
    private <K, V> void loadBtree( PageIO[] pageIos, BTree<K, V> btree ) throws EndOfFileExceededException,
        IOException, ClassNotFoundException, IllegalAccessException, InstantiationException, IllegalArgumentException, SecurityException, NoSuchFieldException
    {
        loadBtree( pageIos, btree, null );
    }


    /**
     * Read a B-tree from the disk. The meta-data are at the given position in the list of pages.
     * We load a B-tree in two steps : first, we load the B-tree header, then the common informations
     *
     * @param pageIos The list of pages containing the meta-data
     * @param btree The B-tree we have to initialize
     * @throws InstantiationException
     * @throws IllegalAccessException
     * @throws ClassNotFoundException
     * @throws NoSuchFieldException
     * @throws SecurityException
     * @throws IllegalArgumentException
     */
    /* no qualifier */ <K, V> void loadBtree( PageIO[] pageIos, BTree btree, BTree<K, V> parentBTree ) throws EndOfFileExceededException,
        IOException, ClassNotFoundException, IllegalAccessException, InstantiationException, IllegalArgumentException, SecurityException, NoSuchFieldException
    {
        long dataPos = 0L;

        // Process the B-tree header
        BTreeHeader<K, V> btreeHeader = new BTreeHeader<K, V>();
        btreeHeader.setBtree( btree );

        // The BtreeHeader offset
        btreeHeader.setBTreeHeaderOffset( pageIos[0].getOffset() );

        // The B-tree current revision
        long revision = readLong( pageIos, dataPos );
        btreeHeader.setRevision( revision );
        dataPos += LONG_SIZE;

        // The nb elems in the tree
        long nbElems = readLong( pageIos, dataPos );
        btreeHeader.setNbElems( nbElems );
        dataPos += LONG_SIZE;

        // The B-tree rootPage offset
        long rootPageOffset = readLong( pageIos, dataPos );
        btreeHeader.setRootPageOffset( rootPageOffset );
        dataPos += LONG_SIZE;

        // The B-tree information offset
        long btreeInfoOffset = readLong( pageIos, dataPos );

        // Now, process the common informations
        PageIO[] infoPageIos = readPageIOs( btreeInfoOffset, Long.MAX_VALUE );
        ((PersistedBTree<K, V>)btree).setBtreeInfoOffset( infoPageIos[0].getOffset() );
        dataPos = 0L;

        // The B-tree page size
        int btreePageSize = readInt( infoPageIos, dataPos );
        BTreeFactory.setPageSize( btree, btreePageSize );
        dataPos += INT_SIZE;

        // The tree name
        ByteBuffer btreeNameBytes = readBytes( infoPageIos, dataPos );
        dataPos += INT_SIZE + btreeNameBytes.limit();
        String btreeName = Strings.utf8ToString( btreeNameBytes );
        BTreeFactory.setName( btree, btreeName );

        // The keySerializer FQCN
        ByteBuffer keySerializerBytes = readBytes( infoPageIos, dataPos );
        dataPos += INT_SIZE + keySerializerBytes.limit();

        String keySerializerFqcn = "";

        if ( keySerializerBytes != null )
        {
            keySerializerFqcn = Strings.utf8ToString( keySerializerBytes );
        }

        BTreeFactory.setKeySerializer( btree, keySerializerFqcn );

        // The valueSerialier FQCN
        ByteBuffer valueSerializerBytes = readBytes( infoPageIos, dataPos );

        String valueSerializerFqcn = "";
        dataPos += INT_SIZE + valueSerializerBytes.limit();

        if ( valueSerializerBytes != null )
        {
            valueSerializerFqcn = Strings.utf8ToString( valueSerializerBytes );
        }

        BTreeFactory.setValueSerializer( btree, valueSerializerFqcn );

        // The B-tree allowDuplicates flag
        int allowDuplicates = readInt( infoPageIos, dataPos );
        ( ( PersistedBTree<K, V> ) btree ).setAllowDuplicates( allowDuplicates != 0 );
        dataPos += INT_SIZE;

        // Set the recordManager in the btree
        ( ( PersistedBTree<K, V> ) btree ).setRecordManager( this );

        // Set the current revision to the one stored in the B-tree header
        // Here, we have to tell the BTree to keep this revision in the
        // btreeRevisions Map, thus the 'true' parameter at the end.
        ((PersistedBTree<K, V>)btree).storeRevision( btreeHeader, true );

        // Now, init the B-tree
        ( ( PersistedBTree<K, V> ) btree ).init( parentBTree );
        
        // Update the BtreeHeaders Maps
        currentBTreeHeaders.put( btree.getName(), ( ( PersistedBTree<K, V> ) btree ).getBtreeHeader() );
        newBTreeHeaders.put( btree.getName(), ( ( PersistedBTree<K, V> ) btree ).getBtreeHeader() );

        // Read the rootPage pages on disk
        PageIO[] rootPageIos = readPageIOs( rootPageOffset, Long.MAX_VALUE );

        Page<K, V> btreeRoot = readPage( btree, rootPageIos );
        BTreeFactory.setRecordManager( btree, this );

        BTreeFactory.setRootPage( btree, btreeRoot );
    }


    /**
     * Deserialize a Page from a B-tree at a give position
     *
     * @param btree The B-tree we want to read a Page from
     * @param offset The position in the file for this page
     * @return The read page
     * @throws EndOfFileExceededException If we have reached the end of the file while reading the page
     */
    public <K, V> Page<K, V> deserialize( BTree<K, V> btree, long offset ) throws EndOfFileExceededException,
        IOException
    {
        checkOffset( offset );
        PageIO[] rootPageIos = readPageIOs( offset, Long.MAX_VALUE );

        Page<K, V> page = readPage( btree, rootPageIos );

        return page;
    }


    /**
     * Read a page from some PageIO for a given B-tree
     * @param btree The B-tree we want to read a page for
     * @param pageIos The PageIO containing the raw data
     * @return The read Page if successful
     * @throws IOException If the deserialization failed
     */
    private <K, V> Page<K, V> readPage( BTree<K, V> btree, PageIO[] pageIos ) throws IOException
    {
        // Deserialize the rootPage now
        long position = 0L;

        // The revision
        long revision = readLong( pageIos, position );
        position += LONG_SIZE;

        // The number of elements in the page
        int nbElems = readInt( pageIos, position );
        position += INT_SIZE;

        // The size of the data containing the keys and values
        Page<K, V> page = null;

        // Reads the bytes containing all the keys and values, if we have some
        // We read  big blog of data into  ByteBuffer, then we will process
        // this ByteBuffer
        ByteBuffer byteBuffer = readBytes( pageIos, position );

        // Now, deserialize the data block. If the number of elements
        // is positive, it's a Leaf, otherwise it's a Node
        // Note that only a leaf can have 0 elements, and it's the root page then.
        if ( nbElems >= 0 )
        {
            // It's a leaf
            page = readLeafKeysAndValues( btree, nbElems, revision, byteBuffer, pageIos );
        }
        else
        {
            // It's a node
            page = readNodeKeysAndValues( btree, -nbElems, revision, byteBuffer, pageIos );
        }

        ( ( AbstractPage<K, V> ) page ).setOffset( pageIos[0].getOffset() );
        if ( pageIos.length > 1 )
        {
            ( ( AbstractPage<K, V> ) page ).setLastOffset( pageIos[pageIos.length - 1].getOffset() );
        }

        return page;
    }


    /**
     * Deserialize a Leaf from some PageIOs
     */
    private <K, V> PersistedLeaf<K, V> readLeafKeysAndValues( BTree<K, V> btree, int nbElems, long revision,
        ByteBuffer byteBuffer, PageIO[] pageIos )
    {
        // Its a leaf, create it
        PersistedLeaf<K, V> leaf = ( PersistedLeaf<K, V> ) BTreeFactory.createLeaf( btree, revision, nbElems );

        // Store the page offset on disk
        leaf.setOffset( pageIos[0].getOffset() );
        leaf.setLastOffset( pageIos[pageIos.length - 1].getOffset() );

        int[] keyLengths = new int[nbElems];
        int[] valueLengths = new int[nbElems];

        boolean isNotSubTree = ( btree.getType() != BTreeTypeEnum.PERSISTED_SUB );

        // Read each key and value
        for ( int i = 0; i < nbElems; i++ )
        {
            if ( isNotSubTree )
            {
                // Read the number of values
                int nbValues = byteBuffer.getInt();
                PersistedValueHolder<V> valueHolder = null;
                
                if ( nbValues < 0 )
                {
                    // This is a sub-btree
                    byte[] btreeOffsetBytes = new byte[LONG_SIZE];
                    byteBuffer.get( btreeOffsetBytes );
                    
                    // Create the valueHolder. As the number of values is negative, we have to switch
                    // to a positive value but as we start at -1 for 0 value, add 1.
                    valueHolder = new PersistedValueHolder<V>( btree, 1 - nbValues, btreeOffsetBytes );
                }
                else
                {
                    // This is an array
                    // Read the value's array length
                    valueLengths[i] = byteBuffer.getInt();
                    
                    // This is an Array of values, read the byte[] associated with it
                    byte[] arrayBytes = new byte[valueLengths[i]];
                    byteBuffer.get( arrayBytes );
                    valueHolder = new PersistedValueHolder<V>( btree, nbValues, arrayBytes );
                }
                
                BTreeFactory.setValue( btree, leaf, i, valueHolder );
            }

            keyLengths[i] = byteBuffer.getInt();
            byte[] data = new byte[keyLengths[i]];
            byteBuffer.get( data );
            BTreeFactory.setKey( btree, leaf, i, data );
        }

        return leaf;
    }


    /**
     * Deserialize a Node from some PageIos
     */
    private <K, V> PersistedNode<K, V> readNodeKeysAndValues( BTree<K, V> btree, int nbElems, long revision,
        ByteBuffer byteBuffer, PageIO[] pageIos ) throws IOException
    {
        PersistedNode<K, V> node = ( PersistedNode<K, V> ) BTreeFactory.createNode( btree, revision, nbElems );

        // Read each value and key
        for ( int i = 0; i < nbElems; i++ )
        {
            // This is an Offset
            long offset = LongSerializer.INSTANCE.deserialize( byteBuffer );
            long lastOffset = LongSerializer.INSTANCE.deserialize( byteBuffer );

            PersistedPageHolder<K, V> valueHolder = new PersistedPageHolder<K, V>( btree, null, offset, lastOffset );
            node.setValue( i, valueHolder );

            // Read the key length
            int keyLength = byteBuffer.getInt();

            int currentPosition = byteBuffer.position();

            // and the key value
            K key = btree.getKeySerializer().deserialize( byteBuffer );

            // Set the new position now
            byteBuffer.position( currentPosition + keyLength );

            BTreeFactory.setKey( btree, node, i, key );
        }

        // and read the last value, as it's a node
        long offset = LongSerializer.INSTANCE.deserialize( byteBuffer );
        long lastOffset = LongSerializer.INSTANCE.deserialize( byteBuffer );

        PersistedPageHolder<K, V> valueHolder = new PersistedPageHolder<K, V>( btree, null, offset, lastOffset );
        node.setValue( nbElems, valueHolder );

        return node;
    }


    /**
     * Read a byte[] from pages.
     *
     * @param pageIos The pages we want to read the byte[] from
     * @param position The position in the data stored in those pages
     * @return The byte[] we have read
     */
    /* no qualifier */ ByteBuffer readBytes( PageIO[] pageIos, long position )
    {
        // Read the byte[] length first
        int length = readInt( pageIos, position );
        position += INT_SIZE;

        // Compute the page in which we will store the data given the
        // current position
        int pageNb = computePageNb( position );

        // Compute the position in the current page
        int pagePos = ( int ) ( position + ( pageNb + 1 ) * LONG_SIZE + INT_SIZE ) - pageNb * pageSize;

        ByteBuffer pageData = pageIos[pageNb].getData();
        int remaining = pageData.capacity() - pagePos;

        if ( length == 0 )
        {
            // No bytes to read : return null;
            return null;
        }
        else
        {
            ByteBuffer bytes = ByteBuffer.allocate( length );

            while ( length > 0 )
            {
                if ( length <= remaining )
                {
                    pageData.mark();
                    pageData.position( pagePos );
                    int oldLimit = pageData.limit();
                    pageData.limit( pagePos + length );
                    bytes.put( pageData );
                    pageData.limit( oldLimit );
                    pageData.reset();
                    bytes.rewind();

                    return bytes;
                }

                pageData.mark();
                pageData.position( pagePos );
                int oldLimit = pageData.limit();
                pageData.limit( pagePos + remaining );
                bytes.put( pageData );
                pageData.limit( oldLimit );
                pageData.reset();
                pageNb++;
                pagePos = LINK_SIZE;
                pageData = pageIos[pageNb].getData();
                length -= remaining;
                remaining = pageData.capacity() - pagePos;
            }

            bytes.rewind();

            return bytes;
        }
    }


    /**
     * Read an int from pages
     * @param pageIos The pages we want to read the int from
     * @param position The position in the data stored in those pages
     * @return The int we have read
     */
    /* no qualifier */ int readInt( PageIO[] pageIos, long position )
    {
        // Compute the page in which we will store the data given the
        // current position
        int pageNb = computePageNb( position );

        // Compute the position in the current page
        int pagePos = ( int ) ( position + ( pageNb + 1 ) * LONG_SIZE + INT_SIZE ) - pageNb * pageSize;

        ByteBuffer pageData = pageIos[pageNb].getData();
        int remaining = pageData.capacity() - pagePos;
        int value = 0;

        if ( remaining >= INT_SIZE )
        {
            value = pageData.getInt( pagePos );
        }
        else
        {
            value = 0;

            switch ( remaining )
            {
                case 3:
                    value += ( ( pageData.get( pagePos + 2 ) & 0x00FF ) << 8 );
                    // Fallthrough !!!

                case 2:
                    value += ( ( pageData.get( pagePos + 1 ) & 0x00FF ) << 16 );
                    // Fallthrough !!!

                case 1:
                    value += ( pageData.get( pagePos ) << 24 );
                    break;
            }

            // Now deal with the next page
            pageData = pageIos[pageNb + 1].getData();
            pagePos = LINK_SIZE;

            switch ( remaining )
            {
                case 1:
                    value += ( pageData.get( pagePos ) & 0x00FF ) << 16;
                    // fallthrough !!!

                case 2:
                    value += ( pageData.get( pagePos + 2 - remaining ) & 0x00FF ) << 8;
                    // fallthrough !!!

                case 3:
                    value += ( pageData.get( pagePos + 3 - remaining ) & 0x00FF );
                    break;
            }
        }

        return value;
    }


    /**
     * Read a byte from pages
     * @param pageIos The pages we want to read the byte from
     * @param position The position in the data stored in those pages
     * @return The byte we have read
     */
    private byte readByte( PageIO[] pageIos, long position )
    {
        // Compute the page in which we will store the data given the
        // current position
        int pageNb = computePageNb( position );

        // Compute the position in the current page
        int pagePos = ( int ) ( position + ( pageNb + 1 ) * LONG_SIZE + INT_SIZE ) - pageNb * pageSize;

        ByteBuffer pageData = pageIos[pageNb].getData();
        byte value = 0;

        value = pageData.get( pagePos );

        return value;
    }


    /**
     * Read a long from pages
     * @param pageIos The pages we want to read the long from
     * @param position The position in the data stored in those pages
     * @return The long we have read
     */
    /* no qualifier */ long readLong( PageIO[] pageIos, long position )
    {
        // Compute the page in which we will store the data given the
        // current position
        int pageNb = computePageNb( position );

        // Compute the position in the current page
        int pagePos = ( int ) ( position + ( pageNb + 1 ) * LONG_SIZE + INT_SIZE ) - pageNb * pageSize;

        ByteBuffer pageData = pageIos[pageNb].getData();
        int remaining = pageData.capacity() - pagePos;
        long value = 0L;

        if ( remaining >= LONG_SIZE )
        {
            value = pageData.getLong( pagePos );
        }
        else
        {
            switch ( remaining )
            {
                case 7:
                    value += ( ( ( long ) pageData.get( pagePos + 6 ) & 0x00FF ) << 8 );
                    // Fallthrough !!!

                case 6:
                    value += ( ( ( long ) pageData.get( pagePos + 5 ) & 0x00FF ) << 16 );
                    // Fallthrough !!!

                case 5:
                    value += ( ( ( long ) pageData.get( pagePos + 4 ) & 0x00FF ) << 24 );
                    // Fallthrough !!!

                case 4:
                    value += ( ( ( long ) pageData.get( pagePos + 3 ) & 0x00FF ) << 32 );
                    // Fallthrough !!!

                case 3:
                    value += ( ( ( long ) pageData.get( pagePos + 2 ) & 0x00FF ) << 40 );
                    // Fallthrough !!!

                case 2:
                    value += ( ( ( long ) pageData.get( pagePos + 1 ) & 0x00FF ) << 48 );
                    // Fallthrough !!!

                case 1:
                    value += ( ( long ) pageData.get( pagePos ) << 56 );
                    break;
            }

            // Now deal with the next page
            pageData = pageIos[pageNb + 1].getData();
            pagePos = LINK_SIZE;

            switch ( remaining )
            {
                case 1:
                    value += ( ( long ) pageData.get( pagePos ) & 0x00FF ) << 48;
                    // fallthrough !!!

                case 2:
                    value += ( ( long ) pageData.get( pagePos + 2 - remaining ) & 0x00FF ) << 40;
                    // fallthrough !!!

                case 3:
                    value += ( ( long ) pageData.get( pagePos + 3 - remaining ) & 0x00FF ) << 32;
                    // fallthrough !!!

                case 4:
                    value += ( ( long ) pageData.get( pagePos + 4 - remaining ) & 0x00FF ) << 24;
                    // fallthrough !!!

                case 5:
                    value += ( ( long ) pageData.get( pagePos + 5 - remaining ) & 0x00FF ) << 16;
                    // fallthrough !!!

                case 6:
                    value += ( ( long ) pageData.get( pagePos + 6 - remaining ) & 0x00FF ) << 8;
                    // fallthrough !!!

                case 7:
                    value += ( ( long ) pageData.get( pagePos + 7 - remaining ) & 0x00FF );
                    break;
            }
        }

        return value;
    }


    /**
     * Manage a B-tree. The btree will be added and managed by this RecordManager. We will create a
     * new RootPage for this added B-tree, which will contain no data.<br/>
     * This method is threadsafe.
     *
     * @param btree The new B-tree to manage.
     * @throws BTreeAlreadyManagedException if the B-tree is already managed
     * @throws IOException if there was a problem while accessing the file
     */
    public synchronized <K, V> void manage( BTree<K, V> btree ) throws BTreeAlreadyManagedException, IOException
    {
        beginTransaction();

        manage( ( BTree<Object, Object> ) btree, NORMAL_BTREE );

        commit();
    }


    /**
     * Managing a btree is a matter of storing an reference to the managed B-tree in the B-tree Of B-trees.
     * We store a tuple of NameRevision (where revision is 0L) and a offset to the B-tree header.
     * At the same time, we keep a track of the managed B-trees in a Map.
     *
     * @param btree The new B-tree to manage.
     * @param treeType flag indicating if this is an internal tree
     *
     * @throws BTreeAlreadyManagedException If the B-tree is already managed
     * @throws IOException
     */
    public synchronized <K, V> void manage( BTree<K, V> btree, boolean treeType )
        throws BTreeAlreadyManagedException, IOException
    {
        LOG.debug( "Managing the btree {} which is an internam tree : {}", btree.getName(), treeType );
        BTreeFactory.setRecordManager( btree, this );

        String name = btree.getName();

        if ( managedBtrees.containsKey( name ) )
        {
            // There is already a B-tree with this name in the recordManager...
            LOG.error( "There is already a B-tree named '{}' managed by this recordManager", name );
            throw new BTreeAlreadyManagedException( name );
        }

        // Now, write the B-tree informations
        long btreeInfoOffset = writeBtreeInfo( btree );
        BTreeHeader<K, V> btreeHeader = ((AbstractBTree<K,V>)btree).getBtreeHeader();
        ((PersistedBTree<K, V>)btree).setBtreeInfoOffset( btreeInfoOffset );

        // Serialize the B-tree root page
        Page<K, V> rootPage = btreeHeader.getRootPage();

        PageIO[] rootPageIos = serializePage( btree, btreeHeader.getRevision(), rootPage );

        // Get the reference on the first page
        long rootPageOffset =  rootPageIos[0].getOffset();

        // Store the rootPageOffset into the Btree header and into the rootPage
        btreeHeader.setRootPageOffset( rootPageOffset );
        ( ( PersistedLeaf<K, V> ) rootPage ).setOffset( rootPageOffset );

        LOG.debug( "Flushing the newly managed '{}' btree rootpage", btree.getName() );
        flushPages( rootPageIos );

        // And the B-tree header
        long btreeHeaderOffset = writeBtreeHeader( btree, btreeHeader );

        // Now, if this is a new B-tree, add it to the B-tree of B-trees
        if ( treeType != INTERNAL_BTREE )
        {
            // Add the btree into the map of managed B-trees
            managedBtrees.put( name, ( BTree<Object, Object> ) btree );
            
            // And in the Map of currentBtreeHeaders and newBtreeHeaders
            currentBTreeHeaders.put( name, btreeHeader );
            newBTreeHeaders.put( name, btreeHeader );

            // We can safely increment the number of managed B-trees
            nbBtree++;

            // Create the new RevisionAndHeaderOffset
            RevisionAndHeaderOffset revisionAndHeader = new RevisionAndHeaderOffset( 0L, btreeHeaderOffset );

            // Inject it into the B-tree of B-tree
            btreeOfBtrees.insert( name, revisionAndHeader );
        }
    }


    /**
     * Serialize a new Page. It will contain the following data :<br/>
     * <ul>
     * <li>the revision : a long</li>
     * <li>the number of elements : an int (if <= 0, it's a Node, otherwise it's a Leaf)</li>
     * <li>the size of the values/keys when serialized
     * <li>the keys : an array of serialized keys</li>
     * <li>the values : an array of references to the children pageIO offset (stored as long)
     * if it's a Node, or a list of values if it's a Leaf</li>
     * <li></li>
     * </ul>
     *
     * @param revision The node revision
     * @param keys The keys to serialize
     * @param children The references to the children
     * @return An array of pages containing the serialized node
     * @throws IOException
     */
    private <K, V> PageIO[] serializePage( BTree<K, V> btree, long revision, Page<K, V> page ) throws IOException
    {
        int nbElems = page.getNbElems();

        boolean isNotSubTree = ( btree.getType() != BTreeTypeEnum.PERSISTED_SUB );
        
        if ( nbElems == 0 )
        {
            return serializeRootPage( revision );
        }
        else
        {
            // Prepare a list of byte[] that will contain the serialized page
            int nbBuffers = 1 + 1 + 1 + nbElems * 3;
            int dataSize = 0;
            int serializedSize = 0;

            if ( page.isNode() )
            {
                // A Node has one more value to store
                nbBuffers++;
            }

            // Now, we can create the list with the right size
            List<byte[]> serializedData = new ArrayList<byte[]>( nbBuffers );

            // The revision
            byte[] buffer = LongSerializer.serialize( revision );
            serializedData.add( buffer );
            serializedSize += buffer.length;

            // The number of elements
            // Make it a negative value if it's a Node
            int pageNbElems = nbElems;

            if ( page.isNode() )
            {
                pageNbElems = -nbElems;
            }

            buffer = IntSerializer.serialize( pageNbElems );
            serializedData.add( buffer );
            serializedSize += buffer.length;

            // Iterate on the keys and values. We first serialize the value, then the key
            // until we are done with all of them. If we are serializing a page, we have
            // to serialize one more value
            for ( int pos = 0; pos < nbElems; pos++ )
            {
                // Start with the value
                if ( page.isNode() )
                {
                    dataSize += serializeNodeValue( ( PersistedNode<K, V> ) page, pos, serializedData );
                    dataSize += serializeNodeKey( ( PersistedNode<K, V> ) page, pos, serializedData );
                }
                else
                {
                    if ( isNotSubTree )
                    {
                        dataSize += serializeLeafValue( ( PersistedLeaf<K, V> ) page, pos, serializedData );
                    }
                    
                    dataSize += serializeLeafKey( ( PersistedLeaf<K, V> ) page, pos, serializedData );
                }
            }

            // Nodes have one more value to serialize
            if ( page.isNode() )
            {
                dataSize += serializeNodeValue( ( PersistedNode<K, V> ) page, nbElems, serializedData );
            }

            // Store the data size
            buffer = IntSerializer.serialize( dataSize );
            serializedData.add( 2, buffer );
            serializedSize += buffer.length;

            serializedSize += dataSize;

            // We are done. Allocate the pages we need to store the data
            PageIO[] pageIos = getFreePageIOs( serializedSize );

            // And store the data into those pages
            long position = 0L;

            for ( byte[] bytes : serializedData )
            {
                position = storeRaw( position, bytes, pageIos );
            }

            return pageIos;
        }
    }


    /**
     * Serialize a Node's key
     */
    private <K, V> int serializeNodeKey( PersistedNode<K, V> node, int pos, List<byte[]> serializedData )
    {
        KeyHolder<K> holder = node.getKeyHolder( pos );
        byte[] buffer = ( ( PersistedKeyHolder<K> ) holder ).getRaw();

        // We have to store the serialized key length
        byte[] length = IntSerializer.serialize( buffer.length );
        serializedData.add( length );

        // And store the serialized key now if not null
        if ( buffer.length != 0 )
        {
            serializedData.add( buffer );
        }

        return buffer.length + INT_SIZE;
    }


    /**
     * Serialize a Node's Value. We store the two offsets of the child page.
     */
    private <K, V> int serializeNodeValue( PersistedNode<K, V> node, int pos, List<byte[]> serializedData )
        throws IOException
    {
        // For a node, we just store the children's offsets
        Page<K, V> child = node.getReference( pos );

        // The first offset
        byte[] buffer = LongSerializer.serialize( ( ( AbstractPage<K, V> ) child ).getOffset() );
        serializedData.add( buffer );
        int dataSize = buffer.length;

        // The last offset
        buffer = LongSerializer.serialize( ( ( AbstractPage<K, V> ) child ).getLastOffset() );
        serializedData.add( buffer );
        dataSize += buffer.length;

        return dataSize;
    }


    /**
     * Serialize a Leaf's key
     */
    private <K, V> int serializeLeafKey( PersistedLeaf<K, V> leaf, int pos, List<byte[]> serializedData )
    {
        int dataSize = 0;
        KeyHolder<K> keyHolder = leaf.getKeyHolder( pos );
        byte[] keyData = ( ( PersistedKeyHolder<K> ) keyHolder ).getRaw();

        if ( keyData != null )
        {
            // We have to store the serialized key length
            byte[] length = IntSerializer.serialize( keyData.length );
            serializedData.add( length );

            // And the key data
            serializedData.add( keyData );
            dataSize += keyData.length + INT_SIZE;
        }
        else
        {
            serializedData.add( IntSerializer.serialize( 0 ) );
            dataSize += INT_SIZE;
        }

        return dataSize;
    }


    /**
     * Serialize a Leaf's Value.
     */
    private <K, V> int serializeLeafValue( PersistedLeaf<K, V> leaf, int pos, List<byte[]> serializedData )
        throws IOException
    {
        // The value can be an Array or a sub-btree, but we don't care
        // we just iterate on all the values
        ValueHolder<V> valueHolder = leaf.getValue( pos );
        int dataSize = 0;
        int nbValues = valueHolder.size();

        if ( !valueHolder.isSubBtree() )
        {
            // Write the nb elements first
            byte[] buffer = IntSerializer.serialize( nbValues );
            serializedData.add( buffer );
            dataSize = INT_SIZE;

            // We have a serialized value. Just flush it
            byte[] data = ( ( PersistedValueHolder<V> ) valueHolder ).getRaw();
            dataSize += data.length;

            // Store the data size
            buffer = IntSerializer.serialize( data.length );
            serializedData.add( buffer );
            dataSize += INT_SIZE;

            // and add the data if it's not 0
            if ( data.length > 0 )
            {
                serializedData.add( data );
            }
        }
        else
        {
            if ( nbValues == 0 )
            {
                // No value.
                byte[] buffer = IntSerializer.serialize( nbValues );
                serializedData.add( buffer );

                return buffer.length;
            }

            if ( valueHolder.isSubBtree() )
            {
                // Store the nbVlues as a negative number. We add 1 so that 0 is not confused with an Array value
                byte[] buffer = IntSerializer.serialize( -( nbValues + 1 ) );
                serializedData.add( buffer );
                dataSize += buffer.length;

                // the B-tree offset
                buffer = LongSerializer.serialize( ( ( PersistedValueHolder<V> ) valueHolder ).getOffset() );
                serializedData.add( buffer );
                dataSize += buffer.length;
            }
            else
            {
                // This is an array, store the nb of values as a positive number
                byte[] buffer = IntSerializer.serialize( nbValues );
                serializedData.add( buffer );
                dataSize += buffer.length;

                // Now store each value
                byte[] data = ( ( PersistedValueHolder<V> ) valueHolder ).getRaw();
                buffer = IntSerializer.serialize( data.length );
                serializedData.add( buffer );
                dataSize += buffer.length;

                if ( data.length > 0 )
                {
                    serializedData.add( data );
                }

                dataSize += data.length;
            }
        }

        return dataSize;
    }


    /**
     * Write a root page with no elements in it
     */
    private PageIO[] serializeRootPage( long revision ) throws IOException
    {
        // We will have 1 single page if we have no elements
        PageIO[] pageIos = new PageIO[1];

        // This is either a new root page or a new page that will be filled later
        PageIO newPage = fetchNewPage();

        // We need first to create a byte[] that will contain all the data
        // For the root page, this is easy, as we only have to store the revision,
        // and the number of elements, which is 0.
        long position = 0L;

        position = store( position, revision, newPage );
        position = store( position, 0, newPage );

        // Update the page size now
        newPage.setSize( ( int ) position );

        // Insert the result into the array of PageIO
        pageIos[0] = newPage;

        return pageIos;
    }


    /**
     * Update the RecordManager header, injecting the following data :
     *
     * <pre>
     * +---------------------+
     * | PageSize            | 4 bytes : The size of a physical page (default to 4096)
     * +---------------------+
     * | NbTree              | 4 bytes : The number of managed B-trees (at least 1)
     * +---------------------+
     * | FirstFree           | 8 bytes : The offset of the first free page
     * +---------------------+
     * | current BoB offset  | 8 bytes : The offset of the current B-tree of B-trees
     * +---------------------+
     * | previous BoB offset | 8 bytes : The offset of the previous B-tree of B-trees
     * +---------------------+
     * | current CP offset   | 8 bytes : The offset of the current CopiedPages B-tree
     * +---------------------+
     * | previous CP offset  | 8 bytes : The offset of the previous CopiedPages B-tree
     * +---------------------+
     * </pre>
     */
    public void updateRecordManagerHeader()
    {
        // The page size
        int position = writeData( RECORD_MANAGER_HEADER_BYTES, 0, pageSize );

        // The number of managed B-tree
        position = writeData( RECORD_MANAGER_HEADER_BYTES, position, nbBtree );

        // The first free page
        position = writeData( RECORD_MANAGER_HEADER_BYTES, position, firstFreePage );

        // The offset of the current B-tree of B-trees
        position = writeData( RECORD_MANAGER_HEADER_BYTES, position, currentBtreeOfBtreesOffset );

        // The offset of the copied pages B-tree
        position = writeData( RECORD_MANAGER_HEADER_BYTES, position, previousBtreeOfBtreesOffset );

        // Write the RecordManager header on disk
        RECORD_MANAGER_HEADER_BUFFER.put( RECORD_MANAGER_HEADER_BYTES );
        RECORD_MANAGER_HEADER_BUFFER.flip();

        LOG.debug( "Update RM header" );

        if ( LOG_PAGES.isDebugEnabled() )
        {
            StringBuilder sb = new StringBuilder();

            sb.append( "First free page     : 0x" ).append( Long.toHexString( firstFreePage ) ).append( "\n" );
            sb.append( "Current BOB header  : 0x" ).append( Long.toHexString( currentBtreeOfBtreesOffset ) ).append( "\n" );
            sb.append( "Previous BOB header : 0x" ).append( Long.toHexString( previousBtreeOfBtreesOffset ) ).append( "\n" );

            if ( firstFreePage != NO_PAGE )
            {
                long freePage = firstFreePage;
                sb.append( "free pages list : " );

                boolean isFirst = true;

                while ( freePage != NO_PAGE )
                {
                    if ( isFirst )
                    {
                        isFirst = false;
                    }
                    else
                    {
                        sb.append( " -> " );
                    }

                    sb.append( "0x" ).append( Long.toHexString( freePage ) );

                    try
                    {
                        PageIO[] freePageIO = readPageIOs( freePage, 8 );

                        freePage = freePageIO[0].getNextPage();
                    }
                    catch ( EndOfFileExceededException e )
                    {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    catch ( IOException e )
                    {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }

            }

            LOG_PAGES.debug( "Update RM Header : \n{}", sb.toString() );
        }

        try
        {
            fileChannel.write( RECORD_MANAGER_HEADER_BUFFER, 0 );
        }
        catch ( IOException ioe )
        {
            throw new FileException( ioe.getMessage() );
        }

        RECORD_MANAGER_HEADER_BUFFER.clear();

        // Reset the old versions
        previousBtreeOfBtreesOffset = -1L;

        nbUpdateRMHeader.incrementAndGet();
    }


    /**
     * Update the RecordManager header, injecting the following data :
     *
     * <pre>
     * +---------------------+
     * | PageSize            | 4 bytes : The size of a physical page (default to 4096)
     * +---------------------+
     * | NbTree              | 4 bytes : The number of managed B-trees (at least 1)
     * +---------------------+
     * | FirstFree           | 8 bytes : The offset of the first free page
     * +---------------------+
     * | current BoB offset  | 8 bytes : The offset of the current B-tree of B-trees
     * +---------------------+
     * | previous BoB offset | 8 bytes : The offset of the previous B-tree of B-trees
     * +---------------------+
     * | current CP offset   | 8 bytes : The offset of the current CopiedPages B-tree
     * +---------------------+
     * | previous CP offset  | 8 bytes : The offset of the previous CopiedPages B-tree
     * +---------------------+
     * </pre>
     */
    public void updateRecordManagerHeader( long newBtreeOfBtreesOffset, long newCopiedPageBtreeOffset )
    {
        if ( newBtreeOfBtreesOffset != -1L )
        {
            previousBtreeOfBtreesOffset = currentBtreeOfBtreesOffset;
            currentBtreeOfBtreesOffset = newBtreeOfBtreesOffset;
        }
    }


    /**
     * Inject an int into a byte[] at a given position.
     */
    private int writeData( byte[] buffer, int position, int value )
    {
        RECORD_MANAGER_HEADER_BYTES[position] = ( byte ) ( value >>> 24 );
        RECORD_MANAGER_HEADER_BYTES[position+1] = ( byte ) ( value >>> 16 );
        RECORD_MANAGER_HEADER_BYTES[position+2] = ( byte ) ( value >>> 8 );
        RECORD_MANAGER_HEADER_BYTES[position+3] = ( byte ) ( value );

        return position + 4;
    }


    /**
     * Inject a long into a byte[] at a given position.
     */
    private int writeData( byte[] buffer, int position, long value )
    {
        RECORD_MANAGER_HEADER_BYTES[position] = ( byte ) ( value >>> 56 );
        RECORD_MANAGER_HEADER_BYTES[position+1] = ( byte ) ( value >>> 48 );
        RECORD_MANAGER_HEADER_BYTES[position+2] = ( byte ) ( value >>> 40 );
        RECORD_MANAGER_HEADER_BYTES[position+3] = ( byte ) ( value >>> 32 );
        RECORD_MANAGER_HEADER_BYTES[position+4] = ( byte ) ( value >>> 24 );
        RECORD_MANAGER_HEADER_BYTES[position+5] = ( byte ) ( value >>> 16 );
        RECORD_MANAGER_HEADER_BYTES[position+6] = ( byte ) ( value >>> 8 );
        RECORD_MANAGER_HEADER_BYTES[position+7] = ( byte ) ( value );

        return position + 8;
    }


    /**
     * Add a new <btree, revision> tuple into the B-tree of B-trees.
     *
     * @param name The B-tree name
     * @param revision The B-tree revision
     * @param btreeHeaderOffset The B-tree offset
     * @throws IOException If the update failed
     */
    /* no qualifier */ <K, V> void addInBtreeOfBtrees( String name, long revision, long btreeHeaderOffset ) throws IOException
    {
        checkOffset( btreeHeaderOffset );
        RevisionAndHeaderOffset revisionAndHeader = new RevisionAndHeaderOffset( revision, btreeHeaderOffset );

        btreeOfBtrees.insert( name, revisionAndHeader );

        // Update the B-tree of B-trees offset
        currentBtreeOfBtreesOffset = getNewBTreeHeader( BTREE_OF_BTREES_NAME ).getBTreeHeaderOffset();
    }


    /**
     * Add a new <btree, revision> tuple into the CopiedPages B-tree.
     *
     * @param name The B-tree name
     * @param revision The B-tree revision
     * @param btreeHeaderOffset The B-tree offset
     * @throws IOException If the update failed
     */
    /* no qualifier */ <K, V> void addInCopiedPagesBtree( String name, long revision, List<Page<K, V>> pages ) throws IOException
    {
        RevisionName revisionName = new RevisionName( revision, name );

        long[] pageOffsets = new long[pages.size()];
        int pos = 0;

        for ( Page<K, V> page : pages )
        {
            pageOffsets[pos++] = ((AbstractPage<K, V>)page).getOffset();
        }

        copiedPageMap.put( revisionName, pageOffsets );
    }


    /**
     * Internal method used to update the B-tree of B-trees offset
     * @param btreeOfBtreesOffset The new offset
     */
    /* no qualifier */ void setBtreeOfBtreesOffset( long btreeOfBtreesOffset )
    {
        checkOffset( btreeOfBtreesOffset );
        this.currentBtreeOfBtreesOffset = btreeOfBtreesOffset;
    }


    /**
     * Write the B-tree header on disk. We will write the following informations :
     * <pre>
     * +------------+
     * | revision   | The B-tree revision
     * +------------+
     * | nbElems    | The B-tree number of elements
     * +------------+
     * | rootPage   | The root page offset
     * +------------+
     * | BtreeInfo  | The B-tree info offset
     * +------------+
     * </pre>
     * @param btree The B-tree which header has to be written
     * @param btreeInfoOffset The offset of the B-tree informations
     * @return The B-tree header offset
     * @throws IOException If we weren't able to write the B-tree header
     */
    /* no qualifier */ <K, V> long writeBtreeHeader( BTree<K, V> btree, BTreeHeader<K, V> btreeHeader ) throws IOException
    {
        int bufferSize =
            LONG_SIZE +                     // The revision
            LONG_SIZE +                     // the number of element
            LONG_SIZE +                     // The root page offset
            LONG_SIZE;                      // The B-tree info page offset

        // Get the pageIOs we need to store the data. We may need more than one.
        PageIO[] btreeHeaderPageIos = getFreePageIOs( bufferSize );

        // Store the B-tree header Offset into the B-tree
        long btreeHeaderOffset = btreeHeaderPageIos[0].getOffset();

        // Now store the B-tree data in the pages :
        // - the B-tree revision
        // - the B-tree number of elements
        // - the B-tree root page offset
        // - the B-tree info page offset
        // Starts at 0
        long position = 0L;

        // The B-tree current revision
        position = store( position, btreeHeader.getRevision(), btreeHeaderPageIos );

        // The nb elems in the tree
        position = store( position, btreeHeader.getNbElems(), btreeHeaderPageIos );


        // Now, we can inject the B-tree rootPage offset into the B-tree header
        position = store( position, btreeHeader.getRootPageOffset(), btreeHeaderPageIos );

        // The B-tree info page offset
        position = store( position, ((PersistedBTree<K, V>)btree).getBtreeInfoOffset(), btreeHeaderPageIos );

        // And flush the pages to disk now
        LOG.debug( "Flushing the newly managed '{}' btree header", btree.getName() );

        if ( LOG_PAGES.isDebugEnabled() )
        {
            LOG_PAGES.debug( "Writing BTreeHeader revision {} for {}", btreeHeader.getRevision(), btree.getName() );
            StringBuilder sb = new StringBuilder();

            sb.append( "Offset : " ).append( Long.toHexString( btreeHeaderOffset ) ).append( "\n" );
            sb.append( "    Revision : " ).append( btreeHeader.getRevision() ).append( "\n" );
            sb.append( "    NbElems  : " ).append( btreeHeader.getNbElems() ).append( "\n" );
            sb.append( "    RootPage : 0x" ).append( Long.toHexString( btreeHeader.getRootPageOffset() ) ).append( "\n" );
            sb.append( "    Info     : 0x" ).append( Long.toHexString( ((PersistedBTree<K, V>)btree).getBtreeInfoOffset() ) ).append( "\n" );

            LOG_PAGES.debug( "Btree Header[{}]\n{}", btreeHeader.getRevision(), sb.toString() );
        }

        flushPages( btreeHeaderPageIos );

        btreeHeader.setBTreeHeaderOffset( btreeHeaderOffset );

        return btreeHeaderOffset;
    }


    /**
     * Write the B-tree informations on disk. We will write the following informations :
     * <pre>
     * +------------+
     * | pageSize   | The B-tree page size (ie, the number of elements per page max)
     * +------------+
     * | nameSize   | The B-tree name size
     * +------------+
     * | name       | The B-tree name
     * +------------+
     * | keySerSize | The keySerializer FQCN size
     * +------------+
     * | keySerFQCN | The keySerializer FQCN
     * +------------+
     * | valSerSize | The Value serializer FQCN size
     * +------------+
     * | valSerKQCN | The valueSerializer FQCN
     * +------------+
     * | dups       | The flags that tell if the dups are allowed
     * +------------+
     * </pre>
     * @param btree The B-tree which header has to be written
     * @return The B-tree header offset
     * @throws IOException If we weren't able to write the B-tree header
     */
    private <K, V> long writeBtreeInfo( BTree<K, V> btree ) throws IOException
    {
        // We will add the newly managed B-tree at the end of the header.
        byte[] btreeNameBytes = Strings.getBytesUtf8( btree.getName() );
        byte[] keySerializerBytes = Strings.getBytesUtf8( btree.getKeySerializerFQCN() );
        byte[] valueSerializerBytes = Strings.getBytesUtf8( btree.getValueSerializerFQCN() );

        int bufferSize =
            INT_SIZE +                      // The page size
            INT_SIZE +                      // The name size
            btreeNameBytes.length +         // The name
            INT_SIZE +                      // The keySerializerBytes size
            keySerializerBytes.length +     // The keySerializerBytes
            INT_SIZE +                      // The valueSerializerBytes size
            valueSerializerBytes.length +   // The valueSerializerBytes
            INT_SIZE;                       // The allowDuplicates flag

        // Get the pageIOs we need to store the data. We may need more than one.
        PageIO[] btreeHeaderPageIos = getFreePageIOs( bufferSize );

        // Keep the B-tree header Offset into the B-tree
        long btreeInfoOffset = btreeHeaderPageIos[0].getOffset();

        // Now store the B-tree information data in the pages :
        // - the B-tree page size
        // - the B-tree name
        // - the keySerializer FQCN
        // - the valueSerializer FQCN
        // - the flags that tell if the dups are allowed
        // Starts at 0
        long position = 0L;

        // The B-tree page size
        position = store( position, btree.getPageSize(), btreeHeaderPageIos );

        // The tree name
        position = store( position, btreeNameBytes, btreeHeaderPageIos );

        // The keySerializer FQCN
        position = store( position, keySerializerBytes, btreeHeaderPageIos );

        // The valueSerialier FQCN
        position = store( position, valueSerializerBytes, btreeHeaderPageIos );

        // The allowDuplicates flag
        position = store( position, ( btree.isAllowDuplicates() ? 1 : 0 ), btreeHeaderPageIos );

        // And flush the pages to disk now
        LOG.debug( "Flushing the newly managed '{}' btree header", btree.getName() );
        flushPages( btreeHeaderPageIos );

        return btreeInfoOffset;
    }


    /**
     * Update the B-tree header after a B-tree modification. This will make the latest modification
     * visible.<br/>
     * We update the following fields :
     * <ul>
     * <li>the revision</li>
     * <li>the number of elements</li>
     * <li>the B-tree root page offset</li>
     * </ul>
     * <br/>
     * As a result, a new version of the BtreHeader will be created, which will replace the previous
     * B-tree header
     * @param btree TheB-tree to update
     * @param btreeHeaderOffset The offset of the modified btree header
     * @return The offset of the new B-tree Header
     * @throws IOException If we weren't able to write the file on disk
     * @throws EndOfFileExceededException If we tried to write after the end of the file
     */
    /* no qualifier */ <K, V> long updateBtreeHeader( BTree<K, V> btree, long btreeHeaderOffset )
        throws EndOfFileExceededException, IOException
    {
        return updateBtreeHeader( btree, btreeHeaderOffset, false );
    }


    /**
     * Update the B-tree header after a B-tree modification. This will make the latest modification
     * visible.<br/>
     * We update the following fields :
     * <ul>
     * <li>the revision</li>
     * <li>the number of elements</li>
     * <li>the reference to the current B-tree revisions</li>
     * <li>the reference to the old B-tree revisions</li>
     * </ul>
     * <br/>
     * As a result, we new version of the BtreHeader will be created
     * @param btree The B-tree to update
     * @param btreeHeaderOffset The offset of the modified btree header
     * @return The offset of the new B-tree Header if it has changed (ie, when the onPlace flag is set to true)
     * @throws IOException
     * @throws EndOfFileExceededException
     */
    /* no qualifier */ <K, V> void updateBtreeHeaderOnPlace( BTree<K, V> btree, long btreeHeaderOffset )
        throws EndOfFileExceededException,
        IOException
    {
        updateBtreeHeader( btree, btreeHeaderOffset, true );
    }


    /**
     * Update the B-tree header after a B-tree modification. This will make the latest modification
     * visible.<br/>
     * We update the following fields :
     * <ul>
     * <li>the revision</li>
     * <li>the number of elements</li>
     * <li>the reference to the current B-tree revisions</li>
     * <li>the reference to the old B-tree revisions</li>
     * </ul>
     * <br/>
     * As a result, a new version of the BtreHeader will be created, which may replace the previous
     * B-tree header (if the onPlace flag is set to true) or a new set of pageIos will contain the new
     * version.
     *
     * @param btree The B-tree to update
     * @param rootPageOffset The offset of the modified rootPage
     * @param onPlace Tells if we modify the B-tree on place, or if we create a copy
     * @return The offset of the new B-tree Header if it has changed (ie, when the onPlace flag is set to true)
     * @throws EndOfFileExceededException If we tried to write after the end of the file
     * @throws IOException If tehre were some error while writing the data on disk
     */
    private <K, V> long updateBtreeHeader( BTree<K, V> btree, long btreeHeaderOffset, boolean onPlace )
        throws EndOfFileExceededException, IOException
    {
        // Read the pageIOs associated with this B-tree
        PageIO[] pageIos;
        long newBtreeHeaderOffset = NO_PAGE;
        long offset = ( ( PersistedBTree<K, V> ) btree ).getBtreeOffset();

        if ( onPlace )
        {
            // We just have to update the existing BTreeHeader
            long headerSize = LONG_SIZE + LONG_SIZE + LONG_SIZE;

            pageIos = readPageIOs( offset, headerSize );

            // Now, update the revision
            long position = 0;

            position = store( position, btree.getRevision(), pageIos );
            position = store( position, btree.getNbElems(), pageIos );
            position = store( position, btreeHeaderOffset, pageIos );

            // Write the pages on disk
            if ( LOG.isDebugEnabled() )
            {
                LOG.debug( "-----> Flushing the '{}' B-treeHeader", btree.getName() );
                LOG.debug( "  revision : " + btree.getRevision() + ", NbElems : " + btree.getNbElems() + ", btreeHeader offset : 0x"
                    + Long.toHexString( btreeHeaderOffset ) );
            }

            // Get new place on disk to store the modified BTreeHeader if it's not onPlace
            // Rewrite the pages at the same place
            LOG.debug( "Rewriting the B-treeHeader on place for B-tree " + btree.getName() );
            flushPages( pageIos );
        }
        else
        {
            // We have to read and copy the existing BTreeHeader and to create a new one
            pageIos = readPageIOs( offset, Long.MAX_VALUE );

            // Now, copy every read page
            PageIO[] newPageIOs = new PageIO[pageIos.length];
            int pos = 0;

            for ( PageIO pageIo : pageIos )
            {
                // Fetch a free page
                newPageIOs[pos] = fetchNewPage();

                // keep a track of the allocated and copied pages so that we can
                // free them when we do a commit or rollback, if the btree is an management one
                if ( btree.getType() == BTreeTypeEnum.BTREE_OF_BTREES )
                {
                    freedPages.add( pageIo );
                    allocatedPages.add( newPageIOs[pos] );
                }

                pageIo.copy( newPageIOs[pos] );

                if ( pos > 0 )
                {
                    newPageIOs[pos - 1].setNextPage( newPageIOs[pos].getOffset() );
                }

                pos++;
            }

            // store the new btree header offset
            // and update the revision
            long position = 0;

            position = store( position, btree.getRevision(), newPageIOs );
            position = store( position, btree.getNbElems(), newPageIOs );
            position = store( position, btreeHeaderOffset, newPageIOs );

            // Get new place on disk to store the modified BTreeHeader if it's not onPlace
            // Flush the new B-treeHeader on disk
            LOG.debug( "Rewriting the B-treeHeader on place for B-tree " + btree.getName() );
            flushPages( newPageIOs );

            newBtreeHeaderOffset = newPageIOs[0].getOffset();
        }

        nbUpdateBtreeHeader.incrementAndGet();

        if ( LOG_CHECK.isDebugEnabled() )
        {
            MavibotInspector.check( this );
        }

        return newBtreeHeaderOffset;
    }


    /**
     * Write the pages on disk, either at the end of the file, or at
     * the position they were taken from.
     *
     * @param pageIos The list of pages to write
     * @throws IOException If the write failed
     */
    private void flushPages( PageIO... pageIos ) throws IOException
    {
        if ( LOG.isDebugEnabled() )
        {
            for ( PageIO pageIo : pageIos )
            {
                dump( pageIo );
            }
        }

        for ( PageIO pageIo : pageIos )
        {
            pageIo.getData().rewind();

            if ( fileChannel.size() < ( pageIo.getOffset() + pageSize ) )
            {
                LOG.debug( "Adding a page at the end of the file" );
                // This is a page we have to add to the file
                fileChannel.write( pageIo.getData(), fileChannel.size() );
                //fileChannel.force( false );
            }
            else
            {
                LOG.debug( "Writing a page at position {}", pageIo.getOffset() );
                fileChannel.write( pageIo.getData(), pageIo.getOffset() );
                //fileChannel.force( false );
            }

            nbUpdatePageIOs.incrementAndGet();

            pageIo.getData().rewind();
        }
    }


    /**
     * Compute the page in which we will store data given an offset, when
     * we have a list of pages.
     *
     * @param offset The position in the data
     * @return The page number in which the offset will start
     */
    private int computePageNb( long offset )
    {
        long pageNb = 0;

        offset -= pageSize - LINK_SIZE - PAGE_DATA_SIZE_LENGTH;

        if ( offset < 0 )
        {
            return ( int ) pageNb;
        }

        pageNb = 1 + offset / ( pageSize - LINK_SIZE );

        return ( int ) pageNb;
    }


    /**
     * Stores a byte[] into one ore more pageIO (depending if the long is stored
     * across a boundary or not)
     *
     * @param position The position in a virtual byte[] if all the pages were contiguous
     * @param bytes The byte[] to serialize
     * @param pageIos The pageIOs we have to store the data in
     * @return The new offset
     */
    private long store( long position, byte[] bytes, PageIO... pageIos )
    {
        if ( bytes != null )
        {
            // Write the bytes length
            position = store( position, bytes.length, pageIos );

            // Compute the page in which we will store the data given the
            // current position
            int pageNb = computePageNb( position );

            // Get back the buffer in this page
            ByteBuffer pageData = pageIos[pageNb].getData();

            // Compute the position in the current page
            int pagePos = ( int ) ( position + ( pageNb + 1 ) * LONG_SIZE + INT_SIZE ) - pageNb * pageSize;

            // Compute the remaining size in the page
            int remaining = pageData.capacity() - pagePos;
            int nbStored = bytes.length;

            // And now, write the bytes until we have none
            while ( nbStored > 0 )
            {
                if ( remaining > nbStored )
                {
                    pageData.mark();
                    pageData.position( pagePos );
                    pageData.put( bytes, bytes.length - nbStored, nbStored );
                    pageData.reset();
                    nbStored = 0;
                }
                else
                {
                    pageData.mark();
                    pageData.position( pagePos );
                    pageData.put( bytes, bytes.length - nbStored, remaining );
                    pageData.reset();
                    pageNb++;
                    pageData = pageIos[pageNb].getData();
                    pagePos = LINK_SIZE;
                    nbStored -= remaining;
                    remaining = pageData.capacity() - pagePos;
                }
            }

            // We are done
            position += bytes.length;
        }
        else
        {
            // No bytes : write 0 and return
            position = store( position, 0, pageIos );
        }

        return position;
    }


    /**
     * Stores a byte[] into one ore more pageIO (depending if the long is stored
     * across a boundary or not). We don't add the byte[] size, it's already present
     * in the received byte[].
     *
     * @param position The position in a virtual byte[] if all the pages were contiguous
     * @param bytes The byte[] to serialize
     * @param pageIos The pageIOs we have to store the data in
     * @return The new offset
     */
    private long storeRaw( long position, byte[] bytes, PageIO... pageIos )
    {
        if ( bytes != null )
        {
            // Compute the page in which we will store the data given the
            // current position
            int pageNb = computePageNb( position );

            // Get back the buffer in this page
            ByteBuffer pageData = pageIos[pageNb].getData();

            // Compute the position in the current page
            int pagePos = ( int ) ( position + ( pageNb + 1 ) * LONG_SIZE + INT_SIZE ) - pageNb * pageSize;

            // Compute the remaining size in the page
            int remaining = pageData.capacity() - pagePos;
            int nbStored = bytes.length;

            // And now, write the bytes until we have none
            while ( nbStored > 0 )
            {
                if ( remaining > nbStored )
                {
                    pageData.mark();
                    pageData.position( pagePos );
                    pageData.put( bytes, bytes.length - nbStored, nbStored );
                    pageData.reset();
                    nbStored = 0;
                }
                else
                {
                    pageData.mark();
                    pageData.position( pagePos );
                    pageData.put( bytes, bytes.length - nbStored, remaining );
                    pageData.reset();
                    pageNb++;

                    if ( pageNb == pageIos.length )
                    {
                        // We can stop here : we have reach the end of the page
                        break;
                    }

                    pageData = pageIos[pageNb].getData();
                    pagePos = LINK_SIZE;
                    nbStored -= remaining;
                    remaining = pageData.capacity() - pagePos;
                }
            }

            // We are done
            position += bytes.length;
        }
        else
        {
            // No bytes : write 0 and return
            position = store( position, 0, pageIos );
        }

        return position;
    }


    /**
     * Stores an Integer into one ore more pageIO (depending if the int is stored
     * across a boundary or not)
     *
     * @param position The position in a virtual byte[] if all the pages were contiguous
     * @param value The int to serialize
     * @param pageIos The pageIOs we have to store the data in
     * @return The new offset
     */
    private long store( long position, int value, PageIO... pageIos )
    {
        // Compute the page in which we will store the data given the
        // current position
        int pageNb = computePageNb( position );

        // Compute the position in the current page
        int pagePos = ( int ) ( position + ( pageNb + 1 ) * LONG_SIZE + INT_SIZE ) - pageNb * pageSize;

        // Get back the buffer in this page
        ByteBuffer pageData = pageIos[pageNb].getData();

        // Compute the remaining size in the page
        int remaining = pageData.capacity() - pagePos;

        if ( remaining < INT_SIZE )
        {
            // We have to copy the serialized length on two pages

            switch ( remaining )
            {
                case 3:
                    pageData.put( pagePos + 2, ( byte ) ( value >>> 8 ) );
                    // Fallthrough !!!

                case 2:
                    pageData.put( pagePos + 1, ( byte ) ( value >>> 16 ) );
                    // Fallthrough !!!

                case 1:
                    pageData.put( pagePos, ( byte ) ( value >>> 24 ) );
                    break;
            }

            // Now deal with the next page
            pageData = pageIos[pageNb + 1].getData();
            pagePos = LINK_SIZE;

            switch ( remaining )
            {
                case 1:
                    pageData.put( pagePos, ( byte ) ( value >>> 16 ) );
                    // fallthrough !!!

                case 2:
                    pageData.put( pagePos + 2 - remaining, ( byte ) ( value >>> 8 ) );
                    // fallthrough !!!

                case 3:
                    pageData.put( pagePos + 3 - remaining, ( byte ) ( value ) );
                    break;
            }
        }
        else
        {
            // Store the value in the page at the selected position
            pageData.putInt( pagePos, value );
        }

        // Increment the position to reflect the addition of an Int (4 bytes)
        position += INT_SIZE;

        return position;
    }


    /**
     * Stores a Long into one ore more pageIO (depending if the long is stored
     * across a boundary or not)
     *
     * @param position The position in a virtual byte[] if all the pages were contiguous
     * @param value The long to serialize
     * @param pageIos The pageIOs we have to store the data in
     * @return The new offset
     */
    private long store( long position, long value, PageIO... pageIos )
    {
        // Compute the page in which we will store the data given the
        // current position
        int pageNb = computePageNb( position );

        // Compute the position in the current page
        int pagePos = ( int ) ( position + ( pageNb + 1 ) * LONG_SIZE + INT_SIZE ) - pageNb * pageSize;

        // Get back the buffer in this page
        ByteBuffer pageData = pageIos[pageNb].getData();

        // Compute the remaining size in the page
        int remaining = pageData.capacity() - pagePos;

        if ( remaining < LONG_SIZE )
        {
            // We have to copy the serialized length on two pages

            switch ( remaining )
            {
                case 7:
                    pageData.put( pagePos + 6, ( byte ) ( value >>> 8 ) );
                    // Fallthrough !!!

                case 6:
                    pageData.put( pagePos + 5, ( byte ) ( value >>> 16 ) );
                    // Fallthrough !!!

                case 5:
                    pageData.put( pagePos + 4, ( byte ) ( value >>> 24 ) );
                    // Fallthrough !!!

                case 4:
                    pageData.put( pagePos + 3, ( byte ) ( value >>> 32 ) );
                    // Fallthrough !!!

                case 3:
                    pageData.put( pagePos + 2, ( byte ) ( value >>> 40 ) );
                    // Fallthrough !!!

                case 2:
                    pageData.put( pagePos + 1, ( byte ) ( value >>> 48 ) );
                    // Fallthrough !!!

                case 1:
                    pageData.put( pagePos, ( byte ) ( value >>> 56 ) );
                    break;
            }

            // Now deal with the next page
            pageData = pageIos[pageNb + 1].getData();
            pagePos = LINK_SIZE;

            switch ( remaining )
            {
                case 1:
                    pageData.put( pagePos, ( byte ) ( value >>> 48 ) );
                    // fallthrough !!!

                case 2:
                    pageData.put( pagePos + 2 - remaining, ( byte ) ( value >>> 40 ) );
                    // fallthrough !!!

                case 3:
                    pageData.put( pagePos + 3 - remaining, ( byte ) ( value >>> 32 ) );
                    // fallthrough !!!

                case 4:
                    pageData.put( pagePos + 4 - remaining, ( byte ) ( value >>> 24 ) );
                    // fallthrough !!!

                case 5:
                    pageData.put( pagePos + 5 - remaining, ( byte ) ( value >>> 16 ) );
                    // fallthrough !!!

                case 6:
                    pageData.put( pagePos + 6 - remaining, ( byte ) ( value >>> 8 ) );
                    // fallthrough !!!

                case 7:
                    pageData.put( pagePos + 7 - remaining, ( byte ) ( value ) );
                    break;
            }
        }
        else
        {
            // Store the value in the page at the selected position
            pageData.putLong( pagePos, value );
        }

        // Increment the position to reflect the addition of an Long (8 bytes)
        position += LONG_SIZE;

        return position;
    }


    /**
     * Write the page in a serialized form.
     *
     * @param btree The persistedBtree we will create a new PageHolder for
     * @param newPage The page to write on disk
     * @param newRevision The page's revision
     * @return A PageHolder containing the copied page
     * @throws IOException If the page can't be written on disk
     */
    /* No qualifier*/<K, V> PageHolder<K, V> writePage( BTree<K, V> btree, Page<K, V> newPage,
        long newRevision ) throws IOException
    {
        // We first need to save the new page on disk
        PageIO[] pageIos = serializePage( btree, newRevision, newPage );

        if ( LOG_PAGES.isDebugEnabled() )
        {
            LOG_PAGES.debug( "Write data for '{}' btree", btree.getName()  );

            logPageIos( pageIos );
        }

        // Write the page on disk
        flushPages( pageIos );

        // Build the resulting reference
        long offset = pageIos[0].getOffset();
        long lastOffset = pageIos[pageIos.length - 1].getOffset();
        PersistedPageHolder<K, V> pageHolder = new PersistedPageHolder<K, V>( btree, newPage, offset,
            lastOffset );

        return pageHolder;
    }


    /* No qualifier */ static void logPageIos( PageIO[] pageIos )
    {
        int pageNb = 0;

        for ( PageIO pageIo : pageIos )
        {
            StringBuilder sb = new StringBuilder();
            sb.append( "PageIO[" ).append( pageNb ).append( "]:0x" );
            sb.append( Long.toHexString( pageIo.getOffset() ) ).append( "/");
            sb.append( pageIo.getSize() );
            pageNb++;

            ByteBuffer data = pageIo.getData();

            int position = data.position();
            int dataLength = (int)pageIo.getSize() + 12;
            
            if ( dataLength > data.limit() )
            {
                dataLength = data.limit();
            }
            
            byte[] bytes = new byte[dataLength];

            data.get( bytes );
            data.position( position );
            int pos = 0;

            for ( byte b : bytes )
            {
                int mod = pos%16;

                switch ( mod )
                {
                    case 0:
                        sb.append( "\n    " );
                        // No break
                    case 4:
                    case 8:
                    case 12:
                        sb.append( " " );
                    case 1:
                    case 2:
                    case 3:
                    case 5:
                    case 6:
                    case 7:
                    case 9:
                    case 10:
                    case 11:
                    case 13:
                    case 14:
                    case 15:
                        sb.append( Strings.dumpByte( b ) ).append( " " );
                }
                pos++;
            }

            LOG_PAGES.debug( sb.toString() );
        }
    }


    /**
     * Compute the number of pages needed to store some specific size of data.
     *
     * @param dataSize The size of the data we want to store in pages
     * @return The number of pages needed
     */
    private int computeNbPages( int dataSize )
    {
        if ( dataSize <= 0 )
        {
            return 0;
        }

        // Compute the number of pages needed.
        // Considering that each page can contain PageSize bytes,
        // but that the first 8 bytes are used for links and we
        // use 4 bytes to store the data size, the number of needed
        // pages is :
        // NbPages = ( (dataSize - (PageSize - 8 - 4 )) / (PageSize - 8) ) + 1
        // NbPages += ( if (dataSize - (PageSize - 8 - 4 )) % (PageSize - 8) > 0 : 1 : 0 )
        int availableSize = ( pageSize - LONG_SIZE );
        int nbNeededPages = 1;

        // Compute the number of pages that will be full but the first page
        if ( dataSize > availableSize - INT_SIZE )
        {
            int remainingSize = dataSize - ( availableSize - INT_SIZE );
            nbNeededPages += remainingSize / availableSize;
            int remain = remainingSize % availableSize;

            if ( remain > 0 )
            {
                nbNeededPages++;
            }
        }

        return nbNeededPages;
    }


    /**
     * Get as many pages as needed to store the data of the given size. The returned
     * PageIOs are all linked together.
     *
     * @param dataSize The data size
     * @return An array of pages, enough to store the full data
     */
    private PageIO[] getFreePageIOs( int dataSize ) throws IOException
    {
        if ( dataSize == 0 )
        {
            return new PageIO[]
                {};
        }

        int nbNeededPages = computeNbPages( dataSize );

        PageIO[] pageIOs = new PageIO[nbNeededPages];

        // The first page : set the size
        pageIOs[0] = fetchNewPage();
        pageIOs[0].setSize( dataSize );

        for ( int i = 1; i < nbNeededPages; i++ )
        {
            pageIOs[i] = fetchNewPage();

            // Create the link
            pageIOs[i - 1].setNextPage( pageIOs[i].getOffset() );
        }

        return pageIOs;
    }


    /**
     * Return a new Page. We take one of the existing free pages, or we create
     * a new page at the end of the file.
     *
     * @return The fetched PageIO
     */
    private PageIO fetchNewPage() throws IOException
    {
        //dumpFreePages( firstFreePage );

        if ( firstFreePage == NO_PAGE )
        {
            nbCreatedPages.incrementAndGet();

            // We don't have any free page. Reclaim some new page at the end
            // of the file
            PageIO newPage = new PageIO( endOfFileOffset );

            endOfFileOffset += pageSize;

            ByteBuffer data = ByteBuffer.allocateDirect( pageSize );

            newPage.setData( data );
            newPage.setNextPage( NO_PAGE );
            newPage.setSize( 0 );

            LOG.debug( "Requiring a new page at offset {}", newPage.getOffset() );

            return newPage;
        }
        else
        {
            nbReusedPages.incrementAndGet();

            freePageLock.lock();
            
            // We have some existing free page. Fetch it from disk
            PageIO pageIo = fetchPage( firstFreePage );

            // Update the firstFreePage pointer
            firstFreePage = pageIo.getNextPage();

            freePageLock.unlock();
            
            // overwrite the data of old page
            ByteBuffer data = ByteBuffer.allocateDirect( pageSize );
            pageIo.setData( data );

            pageIo.setNextPage( NO_PAGE );
            pageIo.setSize( 0 );

            LOG.debug( "Reused page at offset {}", pageIo.getOffset() );

            return pageIo;
        }
    }


    /**
     * fetch a page from disk, knowing its position in the file.
     *
     * @param offset The position in the file
     * @return The found page
     */
    /* no qualifier */ PageIO fetchPage( long offset ) throws IOException, EndOfFileExceededException
    {
        checkOffset( offset );

        if ( fileChannel.size() < offset + pageSize )
        {
            // Error : we are past the end of the file
            throw new EndOfFileExceededException( "We are fetching a page on " + offset +
                " when the file's size is " + fileChannel.size() );
        }
        else
        {
            // Read the page
            fileChannel.position( offset );

            ByteBuffer data = ByteBuffer.allocate( pageSize );
            fileChannel.read( data );
            data.rewind();

            PageIO readPage = new PageIO( offset );
            readPage.setData( data );

            return readPage;
        }
    }


    /**
     * @return the pageSize
     */
    public int getPageSize()
    {
        return pageSize;
    }


    /**
     * Set the page size, ie the number of bytes a page can store.
     *
     * @param pageSize The number of bytes for a page
     */
    /* no qualifier */ void setPageSize( int pageSize )
    {
        if ( this.pageSize >= 13 )
        {
            this.pageSize = pageSize;
        }
        else
        {
            this.pageSize = DEFAULT_PAGE_SIZE;
        }
    }


    /**
     * Close the RecordManager and flush everything on disk
     */
    public void close() throws IOException
    {
        beginTransaction();

        // Close all the managed B-trees
        for ( BTree<Object, Object> tree : managedBtrees.values() )
        {
            tree.close();
        }

        // Close the management B-trees
        btreeOfBtrees.close();

        managedBtrees.clear();

        // Write the data
        fileChannel.force( true );

        // And close the channel
        fileChannel.close();

        reclaimer.storeCopiedPageMap( file.getParentFile() );
        
        commit();
    }


    /** Hex chars */
    private static final byte[] HEX_CHAR = new byte[]
        { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F' };


    public static String dump( byte octet )
    {
        return new String( new byte[]
            { HEX_CHAR[( octet & 0x00F0 ) >> 4], HEX_CHAR[octet & 0x000F] } );
    }


    /**
     * Dump a pageIO
     */
    private void dump( PageIO pageIo )
    {
        ByteBuffer buffer = pageIo.getData();
        buffer.mark();
        byte[] longBuffer = new byte[LONG_SIZE];
        byte[] intBuffer = new byte[INT_SIZE];

        // get the next page offset
        buffer.get( longBuffer );
        long nextOffset = LongSerializer.deserialize( longBuffer );

        // Get the data size
        buffer.get( intBuffer );
        int size = IntSerializer.deserialize( intBuffer );

        buffer.reset();

        System.out.println( "PageIO[" + Long.toHexString( pageIo.getOffset() ) + "], size = " + size + ", NEXT PageIO:"
            + Long.toHexString( nextOffset ) );
        System.out.println( " 0  1  2  3  4  5  6  7  8  9  A  B  C  D  E  F " );
        System.out.println( "+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+" );

        for ( int i = 0; i < buffer.limit(); i += 16 )
        {
            System.out.print( "|" );

            for ( int j = 0; j < 16; j++ )
            {
                System.out.print( dump( buffer.get() ) );

                if ( j == 15 )
                {
                    System.out.println( "|" );
                }
                else
                {
                    System.out.print( " " );
                }
            }
        }

        System.out.println( "+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+" );

        buffer.reset();
    }


    /**
     * Dump the RecordManager file
     * @throws IOException
     */
    public void dump()
    {
        System.out.println( "/---------------------------- Dump ----------------------------\\" );

        try
        {
            RandomAccessFile randomFile = new RandomAccessFile( file, "r" );
            FileChannel fileChannel = randomFile.getChannel();

            ByteBuffer recordManagerHeader = ByteBuffer.allocate( RECORD_MANAGER_HEADER_SIZE );

            // load the RecordManager header
            fileChannel.read( recordManagerHeader );

            recordManagerHeader.rewind();

            // The page size
            long fileSize = fileChannel.size();
            int pageSize = recordManagerHeader.getInt();
            long nbPages = fileSize / pageSize;

            // The number of managed B-trees
            int nbBtree = recordManagerHeader.getInt();

            // The first free page
            long firstFreePage = recordManagerHeader.getLong();

            // The current B-tree of B-trees
            long currentBtreeOfBtreesPage = recordManagerHeader.getLong();

            // The previous B-tree of B-trees
            long previousBtreeOfBtreesPage = recordManagerHeader.getLong();

            // The current CopiedPages B-tree
            long currentCopiedPagesBtreePage = recordManagerHeader.getLong();

            // The previous CopiedPages B-tree
            long previousCopiedPagesBtreePage = recordManagerHeader.getLong();

            System.out.println( "  RecordManager" );
            System.out.println( "  -------------" );
            System.out.println( "  Size = 0x" + Long.toHexString( fileSize ) );
            System.out.println( "  NbPages = " + nbPages );
            System.out.println( "    Header " );
            System.out.println( "      page size : " + pageSize );
            System.out.println( "      nbTree : " + nbBtree );
            System.out.println( "      firstFreePage : 0x" + Long.toHexString( firstFreePage ) );
            System.out.println( "      current BOB : 0x" + Long.toHexString( currentBtreeOfBtreesPage ) );
            System.out.println( "      previous BOB : 0x" + Long.toHexString( previousBtreeOfBtreesPage ) );
            System.out.println( "      current CopiedPages : 0x" + Long.toHexString( currentCopiedPagesBtreePage ) );
            System.out.println( "      previous CopiedPages : 0x" + Long.toHexString( previousCopiedPagesBtreePage ) );

            // Dump the Free pages list
            dumpFreePages( firstFreePage );

            // Dump the B-tree of B-trees
            dumpBtreeHeader( currentBtreeOfBtreesPage );

            // Dump the previous B-tree of B-trees if any
            if ( previousBtreeOfBtreesPage != NO_PAGE )
            {
                dumpBtreeHeader( previousBtreeOfBtreesPage );
            }

            // Dump the CopiedPages B-tree
            dumpBtreeHeader( currentCopiedPagesBtreePage );


            // Dump the previous B-tree of B-trees if any
            if ( previousCopiedPagesBtreePage != NO_PAGE )
            {
                dumpBtreeHeader( previousCopiedPagesBtreePage );
            }

            // Dump all the user's B-tree
            randomFile.close();
            System.out.println( "\\---------------------------- Dump ----------------------------/" );
        }
        catch ( IOException ioe )
        {
            System.out.println( "Exception while dumping the file : " + ioe.getMessage() );
        }
    }


    /**
     * Dump the free pages
     */
    private void dumpFreePages( long freePageOffset ) throws EndOfFileExceededException, IOException
    {
        System.out.println( "\n  FreePages : " );
        int pageNb = 1;

        while ( freePageOffset != NO_PAGE )
        {
            PageIO pageIo = fetchPage( freePageOffset );

            System.out.println( "    freePage[" + pageNb + "] : 0x" + Long.toHexString( pageIo.getOffset() ) );

            freePageOffset = pageIo.getNextPage();
            pageNb++;
        }
    }


    /**
     * Dump a B-tree Header
     */
    private long dumpBtreeHeader( long btreeOffset ) throws EndOfFileExceededException, IOException
    {
        // First read the B-tree header
        PageIO[] pageIos = readPageIOs( btreeOffset, Long.MAX_VALUE );

        long dataPos = 0L;

        // The B-tree current revision
        long revision = readLong( pageIos, dataPos );
        dataPos += LONG_SIZE;

        // The nb elems in the tree
        long nbElems = readLong( pageIos, dataPos );
        dataPos += LONG_SIZE;

        // The B-tree rootPage offset
        long rootPageOffset = readLong( pageIos, dataPos );
        dataPos += LONG_SIZE;

        // The B-tree page size
        int btreePageSize = readInt( pageIos, dataPos );
        dataPos += INT_SIZE;

        // The tree name
        ByteBuffer btreeNameBytes = readBytes( pageIos, dataPos );
        dataPos += INT_SIZE + btreeNameBytes.limit();
        String btreeName = Strings.utf8ToString( btreeNameBytes );

        // The keySerializer FQCN
        ByteBuffer keySerializerBytes = readBytes( pageIos, dataPos );
        dataPos += INT_SIZE + keySerializerBytes.limit();

        String keySerializerFqcn = "";

        if ( keySerializerBytes != null )
        {
            keySerializerFqcn = Strings.utf8ToString( keySerializerBytes );
        }

        // The valueSerialier FQCN
        ByteBuffer valueSerializerBytes = readBytes( pageIos, dataPos );

        String valueSerializerFqcn = "";
        dataPos += INT_SIZE + valueSerializerBytes.limit();

        if ( valueSerializerBytes != null )
        {
            valueSerializerFqcn = Strings.utf8ToString( valueSerializerBytes );
        }

        // The B-tree allowDuplicates flag
        int allowDuplicates = readInt( pageIos, dataPos );
        boolean dupsAllowed = allowDuplicates != 0;

        dataPos += INT_SIZE;

//        System.out.println( "\n  B-Tree " + btreeName );
//        System.out.println( "  ------------------------- " );

//        System.out.println( "    nbPageIOs[" + pageIos.length + "] = " + pageIoList );
        if ( LOG.isDebugEnabled() )
        {
            StringBuilder sb = new StringBuilder();
            boolean isFirst = true;

            for ( PageIO pageIo : pageIos )
            {
                if ( isFirst )
                {
                    isFirst = false;
                }
                else
                {
                    sb.append( ", " );
                }

                sb.append( "0x" ).append( Long.toHexString( pageIo.getOffset() ) );
            }

            String pageIoList = sb.toString();

            LOG.debug( "    PageIOs[{}] = {}", pageIos.length, pageIoList );

//        System.out.println( "    dataSize = "+ pageIos[0].getSize() );
            LOG.debug( "    dataSize = {}", pageIos[0].getSize() );

            LOG.debug( "    B-tree '{}'", btreeName );
            LOG.debug( "    revision : {}", revision );
            LOG.debug( "    nbElems : {}", nbElems );
            LOG.debug( "    rootPageOffset : 0x{}", Long.toHexString( rootPageOffset ) );
            LOG.debug( "    B-tree page size : {}", btreePageSize );
            LOG.debug( "    keySerializer : '{}'", keySerializerFqcn );
            LOG.debug( "    valueSerializer : '{}'", valueSerializerFqcn );
            LOG.debug( "    dups allowed : {}", dupsAllowed );
//
//        System.out.println( "    B-tree '" + btreeName + "'" );
//        System.out.println( "    revision : " + revision );
//        System.out.println( "    nbElems : " + nbElems );
//        System.out.println( "    rootPageOffset : 0x" + Long.toHexString( rootPageOffset ) );
//        System.out.println( "    B-tree page size : " + btreePageSize );
//        System.out.println( "    keySerializer : " + keySerializerFqcn );
//        System.out.println( "    valueSerializer : " + valueSerializerFqcn );
//        System.out.println( "    dups allowed : " + dupsAllowed );
        }

        return rootPageOffset;
    }


    /**
     * Get the number of managed trees. We don't count the CopiedPage B-tree and the B-tree of B-trees
     *
     * @return The number of managed B-trees
     */
    public int getNbManagedTrees()
    {
        return nbBtree;
    }


    /**
     * Get the managed B-trees. We don't return the CopiedPage B-tree nor the B-tree of B-trees.
     *
     * @return The managed B-trees
     */
    public Set<String> getManagedTrees()
    {
        Set<String> btrees = new HashSet<String>( managedBtrees.keySet() );

        return btrees;
    }


    /**
     * Stores the copied pages into the CopiedPages B-tree
     *
     * @param name The B-tree name
     * @param revision The revision
     * @param copiedPages The pages that have been copied while creating this revision
     * @throws IOException If we weren't able to store the data on disk
     */
    /* No Qualifier */ void storeCopiedPages( String name, long revision, long[] copiedPages ) throws IOException
    {
        RevisionName revisionName = new RevisionName( revision, name );

        copiedPageMap.put( revisionName, copiedPages );
    }


    /**
     * Fetch the rootPage of a given B-tree for a given revision.
     *
     * @param btree The B-tree we are interested in
     * @param revision The revision we want to get back
     * @return The rootPage for this B-tree and this revision, if any
     * @throws KeyNotFoundException If we can't find the rootPage for this revision and this B-tree
     * @throws IOException If we had an ise while accessing the data on disk
     */
    /* No qualifier */<K, V> Page<K, V> getRootPage( BTree<K, V> btree, long revision ) throws KeyNotFoundException,
        IOException
    {
        if ( btree.getRevision() == revision )
        {
            // We are asking for the current revision
            return btree.getRootPage();
        }

        // Get the B-tree header offset
        RevisionAndHeaderOffset revisionAndHeader = null;
        ValueCursor<RevisionAndHeaderOffset> cursor = btreeOfBtrees.getValues( btree.getName() );
        while( cursor.hasNext() )
        {
            revisionAndHeader = cursor.next();
            if( revisionAndHeader.getRevision() == revision )
            {
                break;
            }
        }

        cursor.close();
        
        if ( revisionAndHeader == null )
        {
            return null;
        }
        
        // get the B-tree rootPage
        Page<K, V> btreeRoot = readRootPage( btree, revisionAndHeader.getOffset() );

        return btreeRoot;
    }


    /**
     * Read a root page from the B-tree header offset
     */
    private <K, V> Page<K, V> readRootPage( BTree<K, V> btree, long btreeHeaderOffset ) throws EndOfFileExceededException, IOException
    {
        // Read the B-tree header pages on disk
        PageIO[] btreeHeaderPageIos = readPageIOs( btreeHeaderOffset, Long.MAX_VALUE );
        long dataPos = LONG_SIZE + LONG_SIZE;

        // The B-tree rootPage offset
        long rootPageOffset = readLong( btreeHeaderPageIos, dataPos );

        // Read the rootPage pages on disk
        PageIO[] rootPageIos = readPageIOs( rootPageOffset, Long.MAX_VALUE );

        // Now, convert it to a Page
        Page<K, V> btreeRoot = readPage( btree, rootPageIos );

        return btreeRoot;
    }


    /**
     * Get one managed trees, knowing its name.
     *
     * @param name The B-tree name we are looking for
     * @return The managed B-trees
     */
    public <K, V> BTree<K, V> getManagedTree( String name )
    {
        return ( BTree<K, V> ) managedBtrees.get( name );
    }


    /**
     * Move a list of pages to the free page list. A logical page is associated with one
     * or more physical PageIOs, which are on the disk. We have to move all those PagIO instances
     * to the free list, and do the same in memory (we try to keep a reference to a set of
     * free pages.
     *
     * @param btree The B-tree which were owning the pages
     * @param revision The current revision
     * @param pages The pages to free
     * @throws IOException If we had a problem while updating the file
     * @throws EndOfFileExceededException If we tried to write after the end of the file
     */
    /* Package protected */<K, V> void freePages( BTree<K, V> btree, long revision, List<Page<K, V>> pages )
        throws EndOfFileExceededException, IOException
    {
        if ( ( pages == null ) || pages.isEmpty() )
        {
            return;
        }

        if ( !keepRevisions )
        {
            // if the B-tree doesn't keep revisions, we can safely move
            // the pages to the freed page list.
            if ( LOG.isDebugEnabled() )
            {
                LOG.debug( "Freeing the following pages :" );

                for ( Page<K, V> page : pages )
                {
                    LOG.debug(  "    {}", page );
                }
            }

            for ( Page<K, V> page : pages )
            {
                long pageOffset = ((AbstractPage<K, V>)page).getOffset();

                PageIO[] pageIos = readPageIOs( pageOffset, Long.MAX_VALUE );

                for ( PageIO pageIo : pageIos )
                {
                    freedPages.add( pageIo );
                }
            }
        }
        else
        {
            // We are keeping revisions of standard B-trees, so we move the pages to the CopiedPages B-tree
            // but only for non managed B-trees
            if ( LOG.isDebugEnabled() )
            {
                LOG.debug( "Moving the following pages to the CopiedBtree :" );

                for ( Page<K, V> page : pages )
                {
                    LOG.debug(  "    {}", page );
                }
            }

            long[] pageOffsets = new long[pages.size()];
            int pos = 0;

            for ( Page<K, V> page : pages )
            {
                pageOffsets[pos++] = ((AbstractPage<K, V>)page).offset;
            }

            if ( btree.getType() != BTreeTypeEnum.BTREE_OF_BTREES )
            {
                // Deal with standard B-trees
                RevisionName revisionName = new RevisionName( revision, btree.getName() );

                copiedPageMap.put( revisionName, pageOffsets );
            }
            else
            {
                // Managed B-trees : we simply free the copied pages
                for ( long pageOffset : pageOffsets )
                {
                    PageIO[] pageIos = readPageIOs( pageOffset, Long.MAX_VALUE );

                    for ( PageIO pageIo : pageIos )
                    {
                        freedPages.add( pageIo );
                    }
                }
            }
        }
    }


    /**
     * Add a PageIO to the list of free PageIOs
     *
     * @param pageIo The page to free
     * @throws IOException If we weren't capable of updating the file
     */
    private void free( PageIO pageIo ) throws IOException
    {
        freePageLock.lock();
        
        // We add the Page's PageIOs before the
        // existing free pages.
        // Link it to the first free page
        pageIo.setNextPage( firstFreePage );

        LOG.debug( "Flushing the first free page" );

        // And flush it to disk
        //FIXME can be flushed last after releasing the lock
        pageIo.markFree();
        flushPages( pageIo );

        // We can update the firstFreePage offset
        firstFreePage = pageIo.getOffset();
        
        freePageLock.unlock();
    }


    /**
     * Add an array of PageIOs to the list of free PageIOs
     *
     * @param offsets The offsets of the pages whose associated PageIOs will be fetched and freed.
     * @throws IOException If we weren't capable of updating the file
     */
    public void free( long... offsets ) throws IOException
    {
        List<PageIO> pageIos = new ArrayList<PageIO>();
        int pageIndex = 0;
        for( int i=0; i < offsets.length; i++ )
        {
            PageIO[] ios = readPageIOs( offsets[i], Long.MAX_VALUE );
            for( PageIO io : ios )
            {
                pageIos.add( io );
                io.markFree();
                
                if( pageIndex > 0 )
                {
                    pageIos.get( pageIndex - 1 ).setNextPage( io.getOffset() );
                }
                
                pageIndex++;
            }
        }

        if( pageIos.isEmpty() )
        {
            return;
        }
        
        freePageLock.lock();
        
        // We add the Page's PageIOs before the
        // existing free pages.
        // Link it to the first free page
        pageIos.get( pageIndex -1 ).setNextPage( firstFreePage );

        LOG.debug( "Flushing the first free page" );

        // And flush it to disk
        //FIXME can be flushed last after releasing the lock
        flushPages( pageIos.toArray( new PageIO[0] ) );

        // We can update the firstFreePage offset
        firstFreePage = pageIos.get( 0 ).getOffset();
        
        freePageLock.unlock();
    }

    
    /**
     * @return the keepRevisions flag
     */
    public boolean isKeepRevisions()
    {
        return keepRevisions;
    }


    /**
     * @param keepRevisions the keepRevisions flag to set
     */
    public void setKeepRevisions( boolean keepRevisions )
    {
        this.keepRevisions = keepRevisions;
    }


    /**
     * Creates a B-tree and automatically adds it to the list of managed btrees
     *
     * @param name the name of the B-tree
     * @param keySerializer key serializer
     * @param valueSerializer value serializer
     * @param allowDuplicates flag for allowing duplicate keys
     * @return a managed B-tree
     * @throws IOException If we weren't able to update the file on disk
     * @throws BTreeAlreadyManagedException If the B-tree is already managed
     */
    @SuppressWarnings("all")
    public <K, V> BTree<K, V> addBTree( String name, ElementSerializer<K> keySerializer,
        ElementSerializer<V> valueSerializer, boolean allowDuplicates )
            throws IOException, BTreeAlreadyManagedException
    {
        PersistedBTreeConfiguration config = new PersistedBTreeConfiguration();

        config.setName( name );
        config.setKeySerializer( keySerializer );
        config.setValueSerializer( valueSerializer );
        config.setAllowDuplicates( allowDuplicates );

        BTree btree = new PersistedBTree( config );
        manage( btree );

        if ( LOG_CHECK.isDebugEnabled() )
        {
            MavibotInspector.check( this );
        }

        return btree;
    }

    
    /**
     * Add a newly closd transaction into the closed transaction queue
     */
    /* no qualifier */ <K, V> void releaseTransaction( ReadTransaction<K, V> readTransaction )
    {
        RevisionName revisionName = new RevisionName( 
            readTransaction.getRevision(), 
            readTransaction.getBtreeHeader().getBtree().getName() );
        //closedTransactionsQueue.add( revisionName );
    }
    
    
    /**
     * Get the current BTreeHeader for a given Btree. It might not exist
     */
    public BTreeHeader getBTreeHeader( String name )
    {
        // Get a lock
        btreeHeadersLock.readLock().lock();
        
        // get the current BTree Header for this BTree and revision
        BTreeHeader<?, ?> btreeHeader = currentBTreeHeaders.get( name );
        
        // And unlock 
        btreeHeadersLock.readLock().unlock();

        return btreeHeader;
    }
    
    
    /**
     * Get the new BTreeHeader for a given Btree. It might not exist
     */
    public BTreeHeader getNewBTreeHeader( String name )
    {
        // get the current BTree Header for this BTree and revision
        BTreeHeader<?, ?> btreeHeader = newBTreeHeaders.get( name );

        return btreeHeader;
    }
    
    
    /**
     * {@inheritDoc}
     */
    public void updateNewBTreeHeaders( BTreeHeader btreeHeader )
    {
        newBTreeHeaders.put( btreeHeader.getBtree().getName(), btreeHeader );
    }
    
    
    /**
     * Swap the current BtreeHeader map with the new one. This method will only
     * be called in a single trhead, when the current transaction will be committed.
     */
    private void swapCurrentBtreeHeaders()
    {
        // Copy the reference to the current BtreeHeader Map
        Map<String, BTreeHeader<?, ?>> tmp = currentBTreeHeaders;
        
        // Get a write lock
        btreeHeadersLock.writeLock().lock();

        // Swap the new BTreeHeader Map
        currentBTreeHeaders = newBTreeHeaders;
        
        // And unlock 
        btreeHeadersLock.writeLock().unlock();

        // Last, not least, clear the Map and reinject the latest revision in it
        tmp.clear();
        tmp.putAll( currentBTreeHeaders );

        // And update the new BTreeHeader map
        newBTreeHeaders = tmp;
    }
    
    
    /**
     * revert the new BTreeHeaders Map to the current BTreeHeader Map. This method
     * is called when we have to rollback a transaction.
     */
    private void revertBtreeHeaders()
    {
        // Clean up teh new BTreeHeaders Map
        newBTreeHeaders.clear();
        
        // Reinject the latest revision in it
        newBTreeHeaders.putAll( currentBTreeHeaders );
    }

    
    /**
     * Loads a B-tree holding the values of a duplicate key
     * This tree is also called as dups tree or sub tree
     *
     * @param offset the offset of the B-tree header
     * @return the deserialized B-tree
     */
    /* No qualifier */<K, V> BTree<V, V> loadDupsBtree( long btreeHeaderOffset, BTree<K, V> parentBtree )
    {
        try
        {
            PageIO[] pageIos = readPageIOs( btreeHeaderOffset, Long.MAX_VALUE );

            BTree<V, V> subBtree = BTreeFactory.<V, V> createPersistedBTree( BTreeTypeEnum.PERSISTED_SUB );
            loadBtree( pageIos, subBtree, parentBtree );
            

            return subBtree;
        }
        catch ( Exception e )
        {
            // should not happen
            throw new BTreeCreationException( e );
        }
    }


    /**
     * Deletes a BTree and any sub-btrees associated with it from the disk.
     * 
     * @param name the name of the managed B-tree
     * @return the number of AbstractPages that were freed after deleting
     */
    @SuppressWarnings("all")
    public int deleteBTree( String name )
    {
        PersistedBTree tree = ( PersistedBTree ) getManagedTree( name );

        if ( tree == null )
        {
            LOG.warn( "No managed btree exists with the name {}", name );
            return -1; 
        }
        
        managedBtrees.remove( name );
        
        int pageCount = 0;
        pageCount = _deleteBTree( tree, pageCount );
        
        // now remove the revisions from BoB
        try
        {
            ValueCursor<RevisionAndHeaderOffset> cursor = btreeOfBtrees.getValues( name );
            
            if ( cursor instanceof ValueBTreeCursor )
            {
                PersistedBTree subBtree = ( PersistedBTree ) ( ( ValueBTreeCursor ) cursor ).getValueBTree();
                pageCount += _deleteBTree( subBtree, pageCount );
            }
            
            cursor.close();
            
            btreeOfBtrees.delete( name );
        }
        catch( Exception e )
        {
            //ignore
        }
        
        System.out.println( "Freed " + pageCount + " AbstractPages" );
        
        return pageCount;
    }
    
    
    /**
     * Deletes the given BTree from the disk and updates the pageCount
     * with the number of AbstractPages freed.
     * 
     * @param tree the BTree to be deleted
     * @param pageCount variable holding the number of AbstractPages freed
     * @return the number of AbstractPages that were freed after deleting
     */
    @SuppressWarnings("all")
    private int _deleteBTree( PersistedBTree tree, int pageCount )
    {
        Stack<AbstractPage> stack = new Stack<AbstractPage>();
        
        // use post-order traversal to free all the pages
        AbstractPage root = ( AbstractPage ) tree.getRootPage();
        stack.push( root );
        
        try
        {
            while( !stack.isEmpty() )
            {
                AbstractPage page = stack.peek();
                
                if( page instanceof PersistedLeaf )
                {
                    stack.pop();
                    pageCount++;
                    
                    PersistedLeaf leaf = ( PersistedLeaf ) page;
                    if( tree.isAllowDuplicates() )
                    {
                        for( ValueHolder vh : leaf.values )
                        {
                            if( vh.isSubBtree() )
                            {
                                pageCount += _deleteBTree( ( PersistedBTree ) ( ( PersistedValueHolder ) vh ).valueBtree, pageCount );
                            }
                        }
                    }
                    
                    free( page.offset );
                    
                    if ( !stack.isEmpty() )
                    {
                        stack.peek().nbElems--;
                    }
                }
                else if ( page instanceof PersistedNode )
                {
                    if ( page.nbElems <= 0 )
                    {
                        stack.pop();
                        pageCount++;
                        
                        free( page.offset );
                    }
                    else
                    {
                        // the will be nbElems + 1 children
                        for ( int i = page.nbElems; i >= 0; i-- )
                        {
                            stack.push( ( AbstractPage ) page.children[i].getValue() );
                        }
                    }
                }
            }
            
            free( tree.getBtreeHeader().getBTreeHeaderOffset() );
            pageCount++;
        }
        catch( IOException e )
        {
            LOG.warn( "Failed to free the pages of the tree {}", tree.getName() );
            LOG.warn( "", e );
            
            return pageCount;
        }

        return pageCount;
    }
    
    
    @SuppressWarnings("all")
    /* No qualifier */boolean chopTree( PersistedBTree tree, Object key)
    {
        AbstractPage root = ( AbstractPage ) tree.getRootPage();
        
        try
        {
            int pageCount = 0;
            Stack<ParentPos> stack = new Stack<ParentPos>();
            AbstractPage page = root;
            
            while( true )
            {
                int pos = page.findPos( key );
                
                ParentPos pp = new ParentPos( page, pos );
                stack.push( pp );
                
                if( pos < 0 )
                {
                    if( page instanceof PersistedNode )
                    {
                        page = ( AbstractPage ) page.children[-(pos+1) + 1].getValue();
                        pos = page.findPos( key );
                        stack.push( new ParentPos( page, pos ) );
                    }
                    
                    break;
                }

                // no key found
                if( page instanceof PersistedLeaf )
                {
                    return false;
                }
                
                page = ( AbstractPage ) page.children[pos].getValue();
            }

            long keysToRemove = countKeysUpto( key, tree );
            
            boolean isSubTree = ( tree.getType() == PERSISTED_SUB );
            
            PersistedPageHolder pageHolder = null;
            
            Object leftKey = null;
            
            long revision = tree.getRevision();
            
            while( !stack.isEmpty() )
            {
                ParentPos pp = stack.pop();
                
                page = ( AbstractPage ) pp.page;
                
                if( page instanceof PersistedLeaf )
                {
                    pageCount++;
                    
                    PersistedLeaf leaf = ( PersistedLeaf ) page;
                    int actPos = -( pp.pos + 1 );

                    int nbElems = leaf.nbElems - actPos - 1;
                    
                    if( nbElems != 0 )
                    {
                        PersistedLeaf newLeaf = ( PersistedLeaf ) BTreeFactory.createLeaf( tree, revision, nbElems );
                        // Copy the keys and the values
                        System.arraycopy( leaf.keys, actPos + 1, newLeaf.keys, 0, nbElems );
                        
                        if ( !isSubTree )
                        {
                            for ( int i = 0; i < nbElems; i++ )
                            {
                                try
                                {
                                    newLeaf.values[i] = leaf.values[actPos + i + 1].clone();
                                }
                                catch ( CloneNotSupportedException e )
                                {
                                    // shouldn't happen
                                    e.printStackTrace();
                                }
                            }    
                        }
                        
                        pageHolder = ( PersistedPageHolder ) writePage( tree, newLeaf, revision );
                        leftKey = newLeaf.keys[0];
                    }
                    else
                    {
                        if ( page.offset == root.offset )
                        {
                            PersistedLeaf newLeaf = ( PersistedLeaf ) BTreeFactory.createLeaf( tree, revision, nbElems );
                            pageHolder = ( PersistedPageHolder ) writePage( tree, newLeaf, revision );
                        }
                    }
                    
                    if( tree.isAllowDuplicates() )
                    {
                        for ( int i = 0; i <= actPos; i++ )
                        {
                            ValueHolder vh = leaf.values[i];
                            
                            if( vh.isSubBtree() )
                            {
                                pageCount += _deleteBTree( ( PersistedBTree ) ( ( PersistedValueHolder ) vh ).valueBtree, pageCount );
                            }
                        }
                    }
                    
                    free( page.offset );
                }
                else if ( page instanceof PersistedNode )
                {
                    int actPos = pp.pos;
                    
                    //TODO not completed yet
                    PersistedNode node = ( PersistedNode ) page;
                    if ( pp.pos < 0 )
                    {
                        actPos = -( actPos + 1 );
                    }
                    
                    int nbElems = node.nbElems;
                    int copyStartPos = actPos;
                    
                    // no child to point to
                    if( pageHolder == null )
                    {
                        nbElems--;
                        copyStartPos = actPos + 1;
                    }
                    else if ( pp.pos < 0 )
                    {
                        nbElems = node.nbElems - actPos - 1;
                        copyStartPos = actPos + 1;
                    }
                    
                    if ( ( pageHolder != null ) && ( nbElems == 0 ) )
                    {
                        if( node.children.length == 2 )
                        {
                            nbElems = 1;
                        }
                    }
                    
                    if( nbElems != 0 )
                    {
                        PersistedNode newNode = ( PersistedNode ) BTreeFactory.createNode( tree, revision, nbElems );
                        // Copy the keys and the values
                        System.arraycopy( node.keys, copyStartPos, newNode.keys, 0, nbElems );
                        
                        System.arraycopy( node.children, copyStartPos, newNode.children, 0, nbElems + 1 );
                        
                        if( pageHolder != null )
                        {
                            newNode.children[0] = pageHolder;
                        }
                        
                        pageHolder = ( PersistedPageHolder ) writePage( tree, newNode, revision );
                        leftKey = newNode.keys[0];
                        
                        free( page.offset );
                    }
                    else
                    {
                        free( page.offset );
                        
                        if ( page.offset == root.offset )
                        {
                            pageHolder = new PersistedPageHolder( tree, page.children[actPos + 1].getValue() );
                        }
                        else
                        {
                            PersistedNode newNode = ( PersistedNode ) BTreeFactory.createNode( tree, revision, 1 );
                            //newNode.keys[0].key = leftKey;
                        }
                    }
                }
            }
            
            //update btree header with the new root-page's offset
            BTreeHeader header = tree.getBtreeHeader();
            
            header.setRootPage( pageHolder.getValue() );
            header.setRevision( tree.getRevision() );
            
            // compute 
            header.setNbElems( tree.getNbElems() - keysToRemove );
            
            writeBtreeHeader( tree, header );
        }
        catch( IOException e )
        {
            LOG.warn( "Failed to free the pages of the tree {}", tree.getName() );
            LOG.warn( "", e );
            return false;
        }
        
        return true;
    }
    

    /**
     * counts the number of keys present upto and including the given key
     */
    /** no qualifier */ long countKeysUpto( Object key, BTree tree )
    {
        // count number of keys to be removed
        long numKeysToRemove = 0;
        
        Stack<AbstractPage> pageStack = new Stack<AbstractPage>();
        
        AbstractPage root = ( AbstractPage ) tree.getRootPage();
        pageStack.push( root );
        
        int childCount = 0;
        
        while( !pageStack.isEmpty() )
        {
            AbstractPage abstractPage = pageStack.peek();
            
            if( abstractPage instanceof PersistedLeaf )
            {
                pageStack.pop();
                
                PersistedLeaf leaf = ( PersistedLeaf ) abstractPage;
                
                int pos = leaf.findPos( key );
                boolean found = ( pos < 0 ? true : false );
                if ( found )
                {
                    pos = -(pos + 1);
                }
                
                if( tree.isAllowDuplicates() )
                {
                    for( int i=0; i < leaf.values.length; i++ )
                    {
                        if( found && ( i > pos ) )
                        {
                            break;
                        }
                        ValueHolder vh = leaf.values[i];
                        numKeysToRemove += ( ( PersistedValueHolder ) vh ).size();
                    }
                }
                else
                {
                    if( found )
                    {
                        numKeysToRemove += (pos + 1);
                    }
                    else
                    {
                        numKeysToRemove += leaf.nbElems;
                    }
                }
                
                if ( found )
                {
                    break;
                }
                
                if ( !pageStack.isEmpty() )
                {
                    childCount++;
                }
            }
            else if ( abstractPage instanceof PersistedNode )
            {
                if ( abstractPage.nbElems == ( childCount - 1 ) )
                {
                    pageStack.pop();
                    childCount = 0;
                }
                else
                {
                    for ( int i = abstractPage.nbElems; i >= 0; i-- )
                    {
                        pageStack.push( ( AbstractPage ) abstractPage.children[i].getValue() );
                    }
                }
            }
        }
        
        return numKeysToRemove;
    }
    
    
    /**
     * @see Object#toString()
     */
    public String toString()
    {
        StringBuilder sb = new StringBuilder();

        sb.append( "RM free pages : [" );

        if ( firstFreePage != NO_PAGE )
        {
            long current = firstFreePage;
            boolean isFirst = true;

            while ( current != NO_PAGE )
            {
                if ( isFirst )
                {
                    isFirst = false;
                }
                else
                {
                    sb.append( ", " );
                }

                PageIO pageIo;

                try
                {
                    pageIo = fetchPage( current );
                    sb.append( pageIo.getOffset() );
                    current = pageIo.getNextPage();
                }
                catch ( EndOfFileExceededException e )
                {
                    e.printStackTrace();
                }
                catch ( IOException e )
                {
                    e.printStackTrace();
                }

            }
        }

        sb.append( "]" );

        return sb.toString();
    }
}
