| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| package org.apache.directory.mavibot.btree; |
| |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.io.RandomAccessFile; |
| import java.nio.ByteBuffer; |
| import java.nio.channels.FileChannel; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.LinkedHashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Queue; |
| import java.util.Set; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.concurrent.locks.ReadWriteLock; |
| import java.util.concurrent.locks.ReentrantLock; |
| import java.util.concurrent.locks.ReentrantReadWriteLock; |
| |
| import org.apache.directory.mavibot.btree.exception.BTreeAlreadyManagedException; |
| import org.apache.directory.mavibot.btree.exception.BTreeCreationException; |
| import org.apache.directory.mavibot.btree.exception.EndOfFileExceededException; |
| import org.apache.directory.mavibot.btree.exception.FileException; |
| import org.apache.directory.mavibot.btree.exception.InvalidOffsetException; |
| import org.apache.directory.mavibot.btree.exception.KeyNotFoundException; |
| import org.apache.directory.mavibot.btree.exception.RecordManagerException; |
| import org.apache.directory.mavibot.btree.serializer.ElementSerializer; |
| import org.apache.directory.mavibot.btree.serializer.IntSerializer; |
| import org.apache.directory.mavibot.btree.serializer.LongSerializer; |
| import org.apache.directory.mavibot.btree.util.Strings; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| |
| /** |
| * The RecordManager is used to manage the file in which we will store the B-trees. |
| * A RecordManager will manage more than one B-tree.<br/> |
| * |
| * It stores data in fixed size pages (default size is 512 bytes), which may be linked one to |
| * the other if the data we want to store is too big for a page. |
| * |
| * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a> |
| */ |
| public class RecordManager extends AbstractTransactionManager |
| { |
| /** The LoggerFactory used by this class */ |
| protected static final Logger LOG = LoggerFactory.getLogger( RecordManager.class ); |
| |
| /** The LoggerFactory used to trace TXN operations */ |
| protected static final Logger TXN_LOG = LoggerFactory.getLogger( "TXN_LOG" ); |
| |
| /** The LoggerFactory used by this class */ |
| protected static final Logger LOG_PAGES = LoggerFactory.getLogger( "org.apache.directory.mavibot.LOG_PAGES" ); |
| |
| /** A dedicated logger for the check */ |
| protected static final Logger LOG_CHECK = LoggerFactory.getLogger( "org.apache.directory.mavibot.LOG_CHECK" ); |
| |
| /** The associated file */ |
| private File file; |
| |
| /** The channel used to read and write data */ |
| /* no qualifier */FileChannel fileChannel; |
| |
| /** The number of managed B-trees */ |
| /* no qualifier */int nbBtree; |
| |
| /** The first and last free page */ |
| /* no qualifier */long firstFreePage; |
| |
| /** Some counters to track the number of free pages */ |
| public AtomicLong nbFreedPages = new AtomicLong( 0 ); |
| public AtomicLong nbCreatedPages = new AtomicLong( 0 ); |
| public AtomicLong nbReusedPages = new AtomicLong( 0 ); |
| public AtomicLong nbUpdateRMHeader = new AtomicLong( 0 ); |
| public AtomicLong nbUpdateBtreeHeader = new AtomicLong( 0 ); |
| public AtomicLong nbUpdatePageIOs = new AtomicLong( 0 ); |
| |
| /** The offset of the end of the file */ |
| private long endOfFileOffset; |
| |
| /** |
| * A Map used to hold the pages that were copied in a new version. |
| * Those pages can be reclaimed when the associated version is dead. |
| * |
| * Note: the offsets are of AbstractPageS' while freeing the associated |
| * PageIOs will be fetched and freed. |
| **/ |
| /* no qualifier */Map<RevisionName, long[]> copiedPageMap = null; |
| |
| /** A constant for an offset on a non existing page */ |
| public static final long NO_PAGE = -1L; |
| |
| /** The number of element we can store in a page */ |
| private static final int PAGE_SIZE = 4; |
| |
| /** The size of the link to next page */ |
| private static final int LINK_SIZE = 8; |
| |
| /** Some constants */ |
| private static final int BYTE_SIZE = 1; |
| /* no qualifier */static final int INT_SIZE = 4; |
| /* no qualifier */static final int LONG_SIZE = 8; |
| |
| /** The default page size */ |
| public static final int DEFAULT_PAGE_SIZE = 512; |
| |
| /** The minimal page size. Can't be below 64, as we have to store many thing sin the RMHeader */ |
| private static final int MIN_PAGE_SIZE = 64; |
| |
| /** The RecordManager header size */ |
| /* no qualifier */static int RECORD_MANAGER_HEADER_SIZE = DEFAULT_PAGE_SIZE; |
| |
| /** A global buffer used to store the RecordManager header */ |
| private ByteBuffer RECORD_MANAGER_HEADER_BUFFER; |
| |
| /** A static buffer used to store the RecordManager header */ |
| private byte[] RECORD_MANAGER_HEADER_BYTES; |
| |
| /** The length of an Offset, as a negative value */ |
| private byte[] LONG_LENGTH = new byte[] |
| { ( byte ) 0xFF, ( byte ) 0xFF, ( byte ) 0xFF, ( byte ) 0xF8 }; |
| |
| /** The RecordManager underlying page size. */ |
| /* no qualifier */int pageSize = DEFAULT_PAGE_SIZE; |
| |
| /** The set of managed B-trees */ |
| private Map<String, BTree<Object, Object>> managedBtrees; |
| |
| /** The queue of recently closed transactions */ |
| private Queue<RevisionName> closedTransactionsQueue = new LinkedBlockingQueue<RevisionName>(); |
| |
| /** The default file name */ |
| private static final String DEFAULT_FILE_NAME = "mavibot.db"; |
| |
| /** A flag set to true if we want to keep old revisions */ |
| private boolean keepRevisions; |
| |
| /** A flag used by internal btrees */ |
| public static final boolean INTERNAL_BTREE = true; |
| |
| /** A flag used by internal btrees */ |
| public static final boolean NORMAL_BTREE = false; |
| |
| /** The B-tree of B-trees */ |
| private BTree<NameRevision, Long> btreeOfBtrees; |
| |
| /** The B-tree of B-trees management btree name */ |
| /* no qualifier */static final String BTREE_OF_BTREES_NAME = "_btree_of_btrees_"; |
| |
| /** The CopiedPages management btree name */ |
| /* no qualifier */static final String COPIED_PAGE_BTREE_NAME = "_copiedPageBtree_"; |
| |
| /** The current B-tree of B-trees header offset */ |
| /* no qualifier */long currentBtreeOfBtreesOffset; |
| |
| /** The previous B-tree of B-trees header offset */ |
| private long previousBtreeOfBtreesOffset = NO_PAGE; |
| |
| /** A lock to protect the transaction handling */ |
| private ReentrantLock transactionLock = new ReentrantLock(); |
| |
| /** A ThreadLocalStorage used to store the current transaction */ |
| private static final ThreadLocal<Integer> context = new ThreadLocal<Integer>(); |
| |
| /** The list of PageIO that can be freed after a commit */ |
| List<PageIO> freedPages = new ArrayList<PageIO>(); |
| |
| /** The list of PageIO that can be freed after a roolback */ |
| private List<PageIO> allocatedPages = new ArrayList<PageIO>(); |
| |
| /** A Map keeping the latest revisions for each managed BTree */ |
| private Map<String, BTreeHeader<?, ?>> currentBTreeHeaders = new HashMap<String, BTreeHeader<?, ?>>(); |
| |
| /** A Map storing the new revisions when some change have been made in some BTrees */ |
| private Map<String, BTreeHeader<?, ?>> newBTreeHeaders = new HashMap<String, BTreeHeader<?, ?>>(); |
| |
| /** A lock to protect the BtreeHeader maps */ |
| private ReadWriteLock btreeHeadersLock = new ReentrantReadWriteLock(); |
| |
| /** A value stored into the transaction context for rollbacked transactions */ |
| private static final int ROLLBACKED_TXN = 0; |
| |
| /** A lock to protect the freepage pointers */ |
| private ReentrantLock freePageLock = new ReentrantLock(); |
| |
| /** the space reclaimer */ |
| private SpaceReclaimer reclaimer; |
| |
| /** variable to keep track of the write commit count */ |
| private int commitCount = 0; |
| |
| /** the threshold at which the SpaceReclaimer will be run to free the copied pages */ |
| private int spaceReclaimerThreshold = 200; |
| |
| public Map<Long, Integer> writeCounter = new HashMap<Long, Integer>(); |
| |
| |
| /** |
| * Create a Record manager which will either create the underlying file |
| * or load an existing one. If a folder is provided, then we will create |
| * a file with a default name : mavibot.db |
| * |
| * @param name The file name, or a folder name |
| */ |
| public RecordManager( String fileName ) |
| { |
| this( fileName, DEFAULT_PAGE_SIZE ); |
| } |
| |
| |
| /** |
| * Create a Record manager which will either create the underlying file |
| * or load an existing one. If a folder is provider, then we will create |
| * a file with a default name : mavibot.db |
| * |
| * @param name The file name, or a folder name |
| * @param pageSize the size of a page on disk, in bytes |
| */ |
| public RecordManager( String fileName, int pageSize ) |
| { |
| managedBtrees = new LinkedHashMap<String, BTree<Object, Object>>(); |
| |
| if ( pageSize < MIN_PAGE_SIZE ) |
| { |
| this.pageSize = MIN_PAGE_SIZE; |
| } |
| else |
| { |
| this.pageSize = pageSize; |
| } |
| |
| RECORD_MANAGER_HEADER_BUFFER = ByteBuffer.allocate( this.pageSize ); |
| RECORD_MANAGER_HEADER_BYTES = new byte[this.pageSize]; |
| RECORD_MANAGER_HEADER_SIZE = this.pageSize; |
| |
| // Open the file or create it |
| File tmpFile = new File( fileName ); |
| |
| if ( tmpFile.isDirectory() ) |
| { |
| // It's a directory. Check that we don't have an existing mavibot file |
| tmpFile = new File( tmpFile, DEFAULT_FILE_NAME ); |
| } |
| |
| // We have to create a new file, if it does not already exist |
| boolean isNewFile = createFile( tmpFile ); |
| |
| try |
| { |
| RandomAccessFile randomFile = new RandomAccessFile( file, "rw" ); |
| fileChannel = randomFile.getChannel(); |
| |
| // get the current end of file offset |
| endOfFileOffset = fileChannel.size(); |
| |
| if ( isNewFile ) |
| { |
| initRecordManager(); |
| } |
| else |
| { |
| loadRecordManager(); |
| } |
| |
| reclaimer = new SpaceReclaimer( this ); |
| |
| copiedPageMap = reclaimer.readCopiedPageMap( file.getParentFile() ); |
| runReclaimer(); |
| } |
| catch ( Exception e ) |
| { |
| LOG.error( "Error while initializing the RecordManager : {}", e.getMessage() ); |
| LOG.error( "", e ); |
| throw new RecordManagerException( e ); |
| } |
| } |
| |
| |
| /** |
| * runs the SpaceReclaimer to free the copied pages |
| */ |
| private void runReclaimer() |
| { |
| try |
| { |
| commitCount = 0; |
| reclaimer.reclaim(); |
| } |
| catch ( Exception e ) |
| { |
| LOG.warn( "SpaceReclaimer failed to free the pages", e ); |
| } |
| } |
| |
| |
| /** |
| * Create the mavibot file if it does not exist |
| */ |
| private boolean createFile( File mavibotFile ) |
| { |
| try |
| { |
| boolean creation = mavibotFile.createNewFile(); |
| |
| file = mavibotFile; |
| |
| if ( mavibotFile.length() == 0 ) |
| { |
| return true; |
| } |
| else |
| { |
| return creation; |
| } |
| } |
| catch ( IOException ioe ) |
| { |
| LOG.error( "Cannot create the file {}", mavibotFile.getName() ); |
| return false; |
| } |
| } |
| |
| |
| /** |
| * We will create a brand new RecordManager file, containing nothing, but the RecordManager header, |
| * a B-tree to manage the old revisions we want to keep and |
| * a B-tree used to manage pages associated with old versions. |
| * <br/> |
| * The RecordManager header contains the following details : |
| * <pre> |
| * +--------------------------+ |
| * | PageSize | 4 bytes : The size of a physical page (default to 4096) |
| * +--------------------------+ |
| * | NbTree | 4 bytes : The number of managed B-trees (at least 1) |
| * +--------------------------+ |
| * | FirstFree | 8 bytes : The offset of the first free page |
| * +--------------------------+ |
| * | current BoB offset | 8 bytes : The offset of the current BoB |
| * +--------------------------+ |
| * | previous BoB offset | 8 bytes : The offset of the previous BoB |
| * +--------------------------+ |
| * | current CP btree offset | 8 bytes : The offset of the current BoB |
| * +--------------------------+ |
| * | previous CP btree offset | 8 bytes : The offset of the previous BoB |
| * +--------------------------+ |
| * </pre> |
| * |
| * We then store the B-tree managing the pages that have been copied when we have added |
| * or deleted an element in the B-tree. They are associated with a version. |
| * |
| * Last, we add the bTree that keep a track on each revision we can have access to. |
| */ |
| private void initRecordManager() throws IOException |
| { |
| // Create a new Header |
| nbBtree = 0; |
| firstFreePage = NO_PAGE; |
| currentBtreeOfBtreesOffset = NO_PAGE; |
| |
| updateRecordManagerHeader(); |
| |
| // Set the offset of the end of the file |
| endOfFileOffset = fileChannel.size(); |
| |
| // First, create the btree of btrees <NameRevision, Long> |
| createBtreeOfBtrees(); |
| |
| // Inject these B-trees into the RecordManager. They are internal B-trees. |
| try |
| { |
| manageSubBtree( btreeOfBtrees ); |
| |
| currentBtreeOfBtreesOffset = ( ( PersistedBTree<NameRevision, Long> ) btreeOfBtrees ).getBtreeHeader() |
| .getBTreeHeaderOffset(); |
| updateRecordManagerHeader(); |
| |
| // Inject the BtreeOfBtrees into the currentBtreeHeaders map |
| currentBTreeHeaders.put( BTREE_OF_BTREES_NAME, |
| ( ( PersistedBTree<NameRevision, Long> ) btreeOfBtrees ).getBtreeHeader() ); |
| newBTreeHeaders.put( BTREE_OF_BTREES_NAME, |
| ( ( PersistedBTree<NameRevision, Long> ) btreeOfBtrees ).getBtreeHeader() ); |
| } |
| catch ( BTreeAlreadyManagedException btame ) |
| { |
| // Can't happen here. |
| } |
| |
| // We are all set ! Verify the file |
| if ( LOG_CHECK.isDebugEnabled() ) |
| { |
| MavibotInspector.check( this ); |
| } |
| |
| } |
| |
| |
| /** |
| * Create the B-treeOfBtrees |
| */ |
| private void createBtreeOfBtrees() |
| { |
| PersistedBTreeConfiguration<NameRevision, Long> configuration = new PersistedBTreeConfiguration<NameRevision, Long>(); |
| configuration.setKeySerializer( NameRevisionSerializer.INSTANCE ); |
| configuration.setName( BTREE_OF_BTREES_NAME ); |
| configuration.setValueSerializer( LongSerializer.INSTANCE ); |
| configuration.setBtreeType( BTreeTypeEnum.BTREE_OF_BTREES ); |
| configuration.setCacheSize( PersistedBTree.DEFAULT_CACHE_SIZE ); |
| |
| btreeOfBtrees = BTreeFactory.createPersistedBTree( configuration ); |
| } |
| |
| |
| /** |
| * Load the BTrees from the disk. |
| * |
| * @throws InstantiationException |
| * @throws IllegalAccessException |
| * @throws ClassNotFoundException |
| * @throws NoSuchFieldException |
| * @throws SecurityException |
| * @throws IllegalArgumentException |
| */ |
| private void loadRecordManager() throws IOException, ClassNotFoundException, IllegalAccessException, |
| InstantiationException, IllegalArgumentException, SecurityException, NoSuchFieldException, KeyNotFoundException |
| { |
| if ( fileChannel.size() != 0 ) |
| { |
| ByteBuffer recordManagerHeader = ByteBuffer.allocate( RECORD_MANAGER_HEADER_SIZE ); |
| |
| // The file exists, we have to load the data now |
| fileChannel.read( recordManagerHeader ); |
| |
| recordManagerHeader.rewind(); |
| |
| // read the RecordManager Header : |
| // +---------------------+ |
| // | PageSize | 4 bytes : The size of a physical page (default to 4096) |
| // +---------------------+ |
| // | NbTree | 4 bytes : The number of managed B-trees (at least 1) |
| // +---------------------+ |
| // | FirstFree | 8 bytes : The offset of the first free page |
| // +---------------------+ |
| // | current BoB offset | 8 bytes : The offset of the current B-tree of B-trees |
| // +---------------------+ |
| // | previous BoB offset | 8 bytes : The offset of the previous B-tree of B-trees |
| // +---------------------+ |
| // | current CP offset | 8 bytes : The offset of the current Copied Pages B-tree |
| // +---------------------+ |
| // | previous CP offset | 8 bytes : The offset of the previous Copied Pages B-tree |
| // +---------------------+ |
| |
| // The page size |
| pageSize = recordManagerHeader.getInt(); |
| |
| // The number of managed B-trees |
| nbBtree = recordManagerHeader.getInt(); |
| |
| // The first and last free page |
| firstFreePage = recordManagerHeader.getLong(); |
| |
| // Read all the free pages |
| checkFreePages(); |
| |
| // The current BOB offset |
| currentBtreeOfBtreesOffset = recordManagerHeader.getLong(); |
| |
| // The previous BOB offset |
| previousBtreeOfBtreesOffset = recordManagerHeader.getLong(); |
| |
| // read the B-tree of B-trees |
| PageIO[] bobHeaderPageIos = readPageIOs( currentBtreeOfBtreesOffset, Long.MAX_VALUE ); |
| |
| btreeOfBtrees = BTreeFactory.<NameRevision, Long> createPersistedBTree( BTreeTypeEnum.BTREE_OF_BTREES ); |
| //BTreeFactory.<NameRevision, Long> setBtreeHeaderOffset( ( PersistedBTree<NameRevision, Long> )btreeOfBtrees, currentBtreeOfBtreesOffset ); |
| |
| loadBtree( bobHeaderPageIos, btreeOfBtrees ); |
| |
| // Now, read all the B-trees from the btree of btrees |
| TupleCursor<NameRevision, Long> btreeCursor = btreeOfBtrees.browse(); |
| Map<String, Long> loadedBtrees = new HashMap<String, Long>(); |
| |
| // loop on all the btrees we have, and keep only the latest revision |
| long currentRevision = -1L; |
| |
| while ( btreeCursor.hasNext() ) |
| { |
| Tuple<NameRevision, Long> btreeTuple = btreeCursor.next(); |
| NameRevision nameRevision = btreeTuple.getKey(); |
| long btreeOffset = btreeTuple.getValue(); |
| long revision = nameRevision.getValue(); |
| |
| // Check if we already have processed this B-tree |
| Long loadedBtreeRevision = loadedBtrees.get( nameRevision.getName() ); |
| |
| if ( loadedBtreeRevision != null ) |
| { |
| // The btree has already been loaded. The revision is necessarily higher |
| if ( revision > currentRevision ) |
| { |
| // We have a newer revision : switch to the new revision (we keep the offset atm) |
| loadedBtrees.put( nameRevision.getName(), btreeOffset ); |
| currentRevision = revision; |
| } |
| } |
| else |
| { |
| // This is a new B-tree |
| loadedBtrees.put( nameRevision.getName(), btreeOffset ); |
| currentRevision = nameRevision.getRevision(); |
| } |
| } |
| |
| // TODO : clean up the old revisions... |
| |
| // Now, we can load the real btrees using the offsets |
| for ( String btreeName : loadedBtrees.keySet() ) |
| { |
| long btreeOffset = loadedBtrees.get( btreeName ); |
| |
| PageIO[] btreePageIos = readPageIOs( btreeOffset, Long.MAX_VALUE ); |
| |
| BTree<?, ?> btree = BTreeFactory.<NameRevision, Long> createPersistedBTree(); |
| //( ( PersistedBTree<NameRevision, Long> ) btree ).setBtreeHeaderOffset( btreeOffset ); |
| loadBtree( btreePageIos, btree ); |
| |
| // Add the btree into the map of managed B-trees |
| managedBtrees.put( btreeName, ( BTree<Object, Object> ) btree ); |
| } |
| |
| // We are done ! Let's finish with the last initialization parts |
| endOfFileOffset = fileChannel.size(); |
| } |
| } |
| |
| |
| /** |
| * Starts a transaction |
| */ |
| public void beginTransaction() |
| { |
| if ( TXN_LOG.isDebugEnabled() ) |
| { |
| TXN_LOG.debug( "Begining a new transaction on thread {}, TxnLevel {}", |
| Thread.currentThread().getName(), getTxnLevel() ); |
| } |
| |
| // First, take the lock if it's not already taken |
| if ( !( ( ReentrantLock ) transactionLock ).isHeldByCurrentThread() ) |
| { |
| TXN_LOG.debug( "--> Lock taken" ); |
| transactionLock.lock(); |
| } |
| else |
| { |
| TXN_LOG.debug( "..o The current thread already holds the lock" ); |
| } |
| |
| // Now, check the TLS state |
| incrementTxnLevel(); |
| } |
| |
| |
| /** |
| * Commits a transaction |
| */ |
| public void commit() |
| { |
| // We *must* own the transactionLock |
| if ( !transactionLock.isHeldByCurrentThread() ) |
| { |
| TXN_LOG.error( "This thread does not hold the transactionLock" ); |
| throw new RecordManagerException( "This thread does not hold the transactionLock" ); |
| } |
| |
| if ( TXN_LOG.isDebugEnabled() ) |
| { |
| TXN_LOG.debug( "Committing a transaction on thread {}, TxnLevel {}", |
| Thread.currentThread().getName(), getTxnLevel() ); |
| } |
| |
| if ( !fileChannel.isOpen() ) |
| { |
| // Still we have to decrement the TransactionLevel |
| int txnLevel = decrementTxnLevel(); |
| |
| if ( txnLevel == 0 ) |
| { |
| // We can safely release the lock |
| // The file has been closed, nothing remains to commit, let's get out |
| transactionLock.unlock(); |
| } |
| |
| return; |
| } |
| |
| int nbTxnStarted = context.get(); |
| |
| switch ( nbTxnStarted ) |
| { |
| case ROLLBACKED_TXN: |
| // The transaction was rollbacked, quit immediatelly |
| transactionLock.unlock(); |
| |
| return; |
| |
| case 1: |
| // We are done with the transaction, we can update the RMHeader and swap the BTreeHeaders |
| // First update the RMHeader to be sure that we have a way to restore from a crash |
| updateRecordManagerHeader(); |
| |
| // Swap the BtreeHeaders maps |
| swapCurrentBtreeHeaders(); |
| |
| // We can now free pages |
| for ( PageIO pageIo : freedPages ) |
| { |
| try |
| { |
| free( pageIo ); |
| } |
| catch ( IOException ioe ) |
| { |
| throw new RecordManagerException( ioe.getMessage() ); |
| } |
| } |
| |
| // Release the allocated and freed pages list |
| freedPages.clear(); |
| allocatedPages.clear(); |
| |
| // And update the RMHeader again, removing the old references to BOB and CPB b-tree headers |
| // here, we have to erase the old references to keep only the new ones. |
| updateRecordManagerHeader(); |
| |
| commitCount++; |
| |
| if ( commitCount >= spaceReclaimerThreshold ) |
| { |
| runReclaimer(); |
| } |
| |
| // Finally, decrement the number of started transactions |
| // and release the global lock if possible |
| int txnLevel = decrementTxnLevel(); |
| |
| if ( txnLevel == 0 ) |
| { |
| transactionLock.unlock(); |
| } |
| |
| return; |
| |
| default: |
| // We are inner an existing transaction. Just update the necessary elements |
| // Update the RMHeader to be sure that we have a way to restore from a crash |
| updateRecordManagerHeader(); |
| |
| // Swap the BtreeHeaders maps |
| //swapCurrentBtreeHeaders(); |
| |
| // We can now free pages |
| for ( PageIO pageIo : freedPages ) |
| { |
| try |
| { |
| free( pageIo ); |
| } |
| catch ( IOException ioe ) |
| { |
| throw new RecordManagerException( ioe.getMessage() ); |
| } |
| } |
| |
| // Release the allocated and freed pages list |
| freedPages.clear(); |
| allocatedPages.clear(); |
| |
| // And update the RMHeader again, removing the old references to BOB and CPB b-tree headers |
| // here, we have to erase the old references to keep only the new ones. |
| updateRecordManagerHeader(); |
| |
| commitCount++; |
| |
| if ( commitCount >= spaceReclaimerThreshold ) |
| { |
| runReclaimer(); |
| } |
| |
| // Finally, decrement the number of started transactions |
| // and release the global lock |
| txnLevel = decrementTxnLevel(); |
| |
| if ( txnLevel == 0 ) |
| { |
| transactionLock.unlock(); |
| } |
| |
| return; |
| } |
| } |
| |
| |
| public boolean isContextOk() |
| { |
| return ( context == null ? true : ( context.get() == 0 ) ); |
| } |
| |
| |
| /** |
| * Get the transactionLevel, ie the number of encapsulated update ops |
| */ |
| private int getTxnLevel() |
| { |
| Integer nbTxnLevel = context.get(); |
| |
| if ( nbTxnLevel == null ) |
| { |
| return -1; |
| } |
| |
| return nbTxnLevel; |
| } |
| |
| |
| /** |
| * Increment the transactionLevel |
| */ |
| private void incrementTxnLevel() |
| { |
| Integer nbTxnLevel = context.get(); |
| |
| if ( nbTxnLevel == null ) |
| { |
| context.set( 1 ); |
| } |
| else |
| { |
| // And increment the counter of inner txn. |
| context.set( nbTxnLevel + 1 ); |
| } |
| |
| if ( TXN_LOG.isDebugEnabled() ) |
| { |
| TXN_LOG.debug( "Incrementing the TxnLevel : {}", context.get() ); |
| } |
| } |
| |
| |
| /** |
| * Decrement the transactionLevel |
| */ |
| private int decrementTxnLevel() |
| { |
| int nbTxnStarted = context.get() - 1; |
| |
| context.set( nbTxnStarted ); |
| |
| if ( TXN_LOG.isDebugEnabled() ) |
| { |
| TXN_LOG.debug( "Decrementing the TxnLevel : {}", context.get() ); |
| } |
| |
| return nbTxnStarted; |
| } |
| |
| |
| /** |
| * Rollback a transaction |
| */ |
| public void rollback() |
| { |
| // We *must* own the transactionLock |
| if ( !transactionLock.isHeldByCurrentThread() ) |
| { |
| TXN_LOG.error( "This thread does not hold the transactionLock" ); |
| throw new RecordManagerException( "This thread does not hold the transactionLock" ); |
| } |
| |
| if ( TXN_LOG.isDebugEnabled() ) |
| { |
| TXN_LOG.debug( "Rollbacking a new transaction on thread {}, TxnLevel {}", |
| Thread.currentThread().getName(), getTxnLevel() ); |
| } |
| |
| // Reset the counter |
| context.set( ROLLBACKED_TXN ); |
| |
| // We can now free allocated pages, this is the end of the transaction |
| for ( PageIO pageIo : allocatedPages ) |
| { |
| try |
| { |
| free( pageIo ); |
| } |
| catch ( IOException ioe ) |
| { |
| throw new RecordManagerException( ioe.getMessage() ); |
| } |
| } |
| |
| // Release the allocated and freed pages list |
| freedPages.clear(); |
| allocatedPages.clear(); |
| |
| // And update the RMHeader |
| updateRecordManagerHeader(); |
| |
| // And restore the BTreeHeaders new Map to the current state |
| revertBtreeHeaders(); |
| |
| // This is an all-of-nothing operation : we can't have a transaction within |
| // a transaction that would survive an inner transaction rollback. |
| transactionLock.unlock(); |
| } |
| |
| |
| /** |
| * Reads all the PageIOs that are linked to the page at the given position, including |
| * the first page. |
| * |
| * @param position The position of the first page |
| * @param limit The maximum bytes to read. Set this value to -1 when the size is unknown. |
| * @return An array of pages |
| */ |
| /*no qualifier*/PageIO[] readPageIOs( long position, long limit ) throws IOException, EndOfFileExceededException |
| { |
| LOG.debug( "Read PageIOs at position {}", position ); |
| |
| if ( limit <= 0 ) |
| { |
| limit = Long.MAX_VALUE; |
| } |
| |
| PageIO firstPage = fetchPage( position ); |
| firstPage.setSize(); |
| List<PageIO> listPages = new ArrayList<PageIO>(); |
| listPages.add( firstPage ); |
| long dataRead = pageSize - LONG_SIZE - INT_SIZE; |
| |
| // Iterate on the pages, if needed |
| long nextPage = firstPage.getNextPage(); |
| |
| if ( ( dataRead < limit ) && ( nextPage != NO_PAGE ) ) |
| { |
| while ( dataRead < limit ) |
| { |
| PageIO page = fetchPage( nextPage ); |
| listPages.add( page ); |
| nextPage = page.getNextPage(); |
| dataRead += pageSize - LONG_SIZE; |
| |
| if ( nextPage == NO_PAGE ) |
| { |
| page.setNextPage( NO_PAGE ); |
| break; |
| } |
| } |
| } |
| |
| LOG.debug( "Nb of PageIOs read : {}", listPages.size() ); |
| |
| // Return |
| return listPages.toArray( new PageIO[] |
| {} ); |
| } |
| |
| |
| /** |
| * Check the offset to be sure it's a valid one : |
| * <ul> |
| * <li>It's >= 0</li> |
| * <li>It's below the end of the file</li> |
| * <li>It's a multipl of the pageSize |
| * </ul> |
| * @param offset The offset to check |
| * @throws InvalidOffsetException If the offset is not valid |
| */ |
| /* no qualifier */void checkOffset( long offset ) |
| { |
| if ( ( offset < 0 ) || ( offset > endOfFileOffset ) || ( ( offset % pageSize ) != 0 ) ) |
| { |
| throw new InvalidOffsetException( "Bad Offset : " + offset ); |
| } |
| } |
| |
| |
| /** |
| * Read a B-tree from the disk. The meta-data are at the given position in the list of pages. |
| * We load a B-tree in two steps : first, we load the B-tree header, then the common informations |
| * |
| * @param pageIos The list of pages containing the meta-data |
| * @param btree The B-tree we have to initialize |
| * @throws InstantiationException |
| * @throws IllegalAccessException |
| * @throws ClassNotFoundException |
| * @throws NoSuchFieldException |
| * @throws SecurityException |
| * @throws IllegalArgumentException |
| */ |
| private <K, V> void loadBtree( PageIO[] pageIos, BTree<K, V> btree ) throws EndOfFileExceededException, |
| IOException, ClassNotFoundException, IllegalAccessException, InstantiationException, IllegalArgumentException, |
| SecurityException, NoSuchFieldException |
| { |
| loadBtree( pageIos, btree, null ); |
| } |
| |
| |
| /** |
| * Read a B-tree from the disk. The meta-data are at the given position in the list of pages. |
| * We load a B-tree in two steps : first, we load the B-tree header, then the common informations |
| * |
| * @param pageIos The list of pages containing the meta-data |
| * @param btree The B-tree we have to initialize |
| * @throws InstantiationException |
| * @throws IllegalAccessException |
| * @throws ClassNotFoundException |
| * @throws NoSuchFieldException |
| * @throws SecurityException |
| * @throws IllegalArgumentException |
| */ |
| /* no qualifier */<K, V> void loadBtree( PageIO[] pageIos, BTree btree, BTree<K, V> parentBTree ) |
| throws EndOfFileExceededException, |
| IOException, ClassNotFoundException, IllegalAccessException, InstantiationException, IllegalArgumentException, |
| SecurityException, NoSuchFieldException |
| { |
| long dataPos = 0L; |
| |
| // Process the B-tree header |
| BTreeHeader<K, V> btreeHeader = new BTreeHeader<K, V>(); |
| btreeHeader.setBtree( btree ); |
| |
| // The BtreeHeader offset |
| btreeHeader.setBTreeHeaderOffset( pageIos[0].getOffset() ); |
| |
| // The B-tree current revision |
| long revision = readLong( pageIos, dataPos ); |
| btreeHeader.setRevision( revision ); |
| dataPos += LONG_SIZE; |
| |
| // The nb elems in the tree |
| long nbElems = readLong( pageIos, dataPos ); |
| btreeHeader.setNbElems( nbElems ); |
| dataPos += LONG_SIZE; |
| |
| // The B-tree rootPage offset |
| long rootPageOffset = readLong( pageIos, dataPos ); |
| btreeHeader.setRootPageOffset( rootPageOffset ); |
| dataPos += LONG_SIZE; |
| |
| // The B-tree information offset |
| long btreeInfoOffset = readLong( pageIos, dataPos ); |
| |
| // Now, process the common informations |
| PageIO[] infoPageIos = readPageIOs( btreeInfoOffset, Long.MAX_VALUE ); |
| ( ( PersistedBTree<K, V> ) btree ).setBtreeInfoOffset( infoPageIos[0].getOffset() ); |
| dataPos = 0L; |
| |
| // The B-tree page size |
| int btreePageSize = readInt( infoPageIos, dataPos ); |
| BTreeFactory.setPageSize( btree, btreePageSize ); |
| dataPos += INT_SIZE; |
| |
| // The tree name |
| ByteBuffer btreeNameBytes = readBytes( infoPageIos, dataPos ); |
| dataPos += INT_SIZE + btreeNameBytes.limit(); |
| String btreeName = Strings.utf8ToString( btreeNameBytes ); |
| BTreeFactory.setName( btree, btreeName ); |
| |
| // The keySerializer FQCN |
| ByteBuffer keySerializerBytes = readBytes( infoPageIos, dataPos ); |
| dataPos += INT_SIZE + keySerializerBytes.limit(); |
| |
| String keySerializerFqcn = ""; |
| |
| if ( keySerializerBytes != null ) |
| { |
| keySerializerFqcn = Strings.utf8ToString( keySerializerBytes ); |
| } |
| |
| BTreeFactory.setKeySerializer( btree, keySerializerFqcn ); |
| |
| // The valueSerialier FQCN |
| ByteBuffer valueSerializerBytes = readBytes( infoPageIos, dataPos ); |
| |
| String valueSerializerFqcn = ""; |
| dataPos += INT_SIZE + valueSerializerBytes.limit(); |
| |
| if ( valueSerializerBytes != null ) |
| { |
| valueSerializerFqcn = Strings.utf8ToString( valueSerializerBytes ); |
| } |
| |
| BTreeFactory.setValueSerializer( btree, valueSerializerFqcn ); |
| |
| // The B-tree allowDuplicates flag |
| int allowDuplicates = readInt( infoPageIos, dataPos ); |
| ( ( PersistedBTree<K, V> ) btree ).setAllowDuplicates( allowDuplicates != 0 ); |
| dataPos += INT_SIZE; |
| |
| // Set the recordManager in the btree |
| ( ( PersistedBTree<K, V> ) btree ).setRecordManager( this ); |
| |
| // Set the current revision to the one stored in the B-tree header |
| // Here, we have to tell the BTree to keep this revision in the |
| // btreeRevisions Map, thus the 'true' parameter at the end. |
| ( ( PersistedBTree<K, V> ) btree ).storeRevision( btreeHeader, true ); |
| |
| // Now, init the B-tree |
| ( ( PersistedBTree<K, V> ) btree ).init( parentBTree ); |
| |
| // Update the BtreeHeaders Maps |
| currentBTreeHeaders.put( btree.getName(), ( ( PersistedBTree<K, V> ) btree ).getBtreeHeader() ); |
| newBTreeHeaders.put( btree.getName(), ( ( PersistedBTree<K, V> ) btree ).getBtreeHeader() ); |
| |
| // Read the rootPage pages on disk |
| PageIO[] rootPageIos = readPageIOs( rootPageOffset, Long.MAX_VALUE ); |
| |
| Page<K, V> btreeRoot = readPage( btree, rootPageIos ); |
| BTreeFactory.setRecordManager( btree, this ); |
| |
| BTreeFactory.setRootPage( btree, btreeRoot ); |
| } |
| |
| |
| /** |
| * Deserialize a Page from a B-tree at a give position |
| * |
| * @param btree The B-tree we want to read a Page from |
| * @param offset The position in the file for this page |
| * @return The read page |
| * @throws EndOfFileExceededException If we have reached the end of the file while reading the page |
| */ |
| public <K, V> Page<K, V> deserialize( BTree<K, V> btree, long offset ) throws EndOfFileExceededException, |
| IOException |
| { |
| checkOffset( offset ); |
| PageIO[] rootPageIos = readPageIOs( offset, Long.MAX_VALUE ); |
| |
| Page<K, V> page = readPage( btree, rootPageIos ); |
| |
| return page; |
| } |
| |
| |
| /** |
| * Read a page from some PageIO for a given B-tree |
| * @param btree The B-tree we want to read a page for |
| * @param pageIos The PageIO containing the raw data |
| * @return The read Page if successful |
| * @throws IOException If the deserialization failed |
| */ |
| private <K, V> Page<K, V> readPage( BTree<K, V> btree, PageIO[] pageIos ) throws IOException |
| { |
| // Deserialize the rootPage now |
| long position = 0L; |
| |
| // The revision |
| long revision = readLong( pageIos, position ); |
| position += LONG_SIZE; |
| |
| // The number of elements in the page |
| int nbElems = readInt( pageIos, position ); |
| position += INT_SIZE; |
| |
| // The size of the data containing the keys and values |
| Page<K, V> page = null; |
| |
| // Reads the bytes containing all the keys and values, if we have some |
| // We read big blog of data into ByteBuffer, then we will process |
| // this ByteBuffer |
| ByteBuffer byteBuffer = readBytes( pageIos, position ); |
| |
| // Now, deserialize the data block. If the number of elements |
| // is positive, it's a Leaf, otherwise it's a Node |
| // Note that only a leaf can have 0 elements, and it's the root page then. |
| if ( nbElems >= 0 ) |
| { |
| // It's a leaf |
| page = readLeafKeysAndValues( btree, nbElems, revision, byteBuffer, pageIos ); |
| } |
| else |
| { |
| // It's a node |
| page = readNodeKeysAndValues( btree, -nbElems, revision, byteBuffer, pageIos ); |
| } |
| |
| ( ( AbstractPage<K, V> ) page ).setOffset( pageIos[0].getOffset() ); |
| if ( pageIos.length > 1 ) |
| { |
| ( ( AbstractPage<K, V> ) page ).setLastOffset( pageIos[pageIos.length - 1].getOffset() ); |
| } |
| |
| return page; |
| } |
| |
| |
| /** |
| * Deserialize a Leaf from some PageIOs |
| */ |
| private <K, V> PersistedLeaf<K, V> readLeafKeysAndValues( BTree<K, V> btree, int nbElems, long revision, |
| ByteBuffer byteBuffer, PageIO[] pageIos ) |
| { |
| // Its a leaf, create it |
| PersistedLeaf<K, V> leaf = ( PersistedLeaf<K, V> ) BTreeFactory.createLeaf( btree, revision, nbElems ); |
| |
| // Store the page offset on disk |
| leaf.setOffset( pageIos[0].getOffset() ); |
| leaf.setLastOffset( pageIos[pageIos.length - 1].getOffset() ); |
| |
| int[] keyLengths = new int[nbElems]; |
| int[] valueLengths = new int[nbElems]; |
| |
| boolean isNotSubTree = ( btree.getType() != BTreeTypeEnum.PERSISTED_SUB ); |
| |
| // Read each key and value |
| for ( int i = 0; i < nbElems; i++ ) |
| { |
| if ( isNotSubTree ) |
| { |
| // Read the number of values |
| int nbValues = byteBuffer.getInt(); |
| PersistedValueHolder<V> valueHolder = null; |
| |
| if ( nbValues < 0 ) |
| { |
| // This is a sub-btree |
| byte[] btreeOffsetBytes = new byte[LONG_SIZE]; |
| byteBuffer.get( btreeOffsetBytes ); |
| |
| // Create the valueHolder. As the number of values is negative, we have to switch |
| // to a positive value but as we start at -1 for 0 value, add 1. |
| valueHolder = new PersistedValueHolder<V>( btree, 1 - nbValues, btreeOffsetBytes ); |
| } |
| else |
| { |
| // This is an array |
| // Read the value's array length |
| valueLengths[i] = byteBuffer.getInt(); |
| |
| // This is an Array of values, read the byte[] associated with it |
| byte[] arrayBytes = new byte[valueLengths[i]]; |
| byteBuffer.get( arrayBytes ); |
| valueHolder = new PersistedValueHolder<V>( btree, nbValues, arrayBytes ); |
| } |
| |
| BTreeFactory.setValue( btree, leaf, i, valueHolder ); |
| } |
| |
| keyLengths[i] = byteBuffer.getInt(); |
| byte[] data = new byte[keyLengths[i]]; |
| byteBuffer.get( data ); |
| BTreeFactory.setKey( btree, leaf, i, data ); |
| } |
| |
| return leaf; |
| } |
| |
| |
| /** |
| * Deserialize a Node from some PageIos |
| */ |
| private <K, V> PersistedNode<K, V> readNodeKeysAndValues( BTree<K, V> btree, int nbElems, long revision, |
| ByteBuffer byteBuffer, PageIO[] pageIos ) throws IOException |
| { |
| PersistedNode<K, V> node = ( PersistedNode<K, V> ) BTreeFactory.createNode( btree, revision, nbElems ); |
| |
| // Read each value and key |
| for ( int i = 0; i < nbElems; i++ ) |
| { |
| // This is an Offset |
| long offset = LongSerializer.INSTANCE.deserialize( byteBuffer ); |
| long lastOffset = LongSerializer.INSTANCE.deserialize( byteBuffer ); |
| |
| PersistedPageHolder<K, V> valueHolder = new PersistedPageHolder<K, V>( btree, null, offset, lastOffset ); |
| node.setValue( i, valueHolder ); |
| |
| // Read the key length |
| int keyLength = byteBuffer.getInt(); |
| |
| int currentPosition = byteBuffer.position(); |
| |
| // and the key value |
| K key = btree.getKeySerializer().deserialize( byteBuffer ); |
| |
| // Set the new position now |
| byteBuffer.position( currentPosition + keyLength ); |
| |
| BTreeFactory.setKey( btree, node, i, key ); |
| } |
| |
| // and read the last value, as it's a node |
| long offset = LongSerializer.INSTANCE.deserialize( byteBuffer ); |
| long lastOffset = LongSerializer.INSTANCE.deserialize( byteBuffer ); |
| |
| PersistedPageHolder<K, V> valueHolder = new PersistedPageHolder<K, V>( btree, null, offset, lastOffset ); |
| node.setValue( nbElems, valueHolder ); |
| |
| return node; |
| } |
| |
| |
| /** |
| * Read a byte[] from pages. |
| * |
| * @param pageIos The pages we want to read the byte[] from |
| * @param position The position in the data stored in those pages |
| * @return The byte[] we have read |
| */ |
| /* no qualifier */ByteBuffer readBytes( PageIO[] pageIos, long position ) |
| { |
| // Read the byte[] length first |
| int length = readInt( pageIos, position ); |
| position += INT_SIZE; |
| |
| // Compute the page in which we will store the data given the |
| // current position |
| int pageNb = computePageNb( position ); |
| |
| // Compute the position in the current page |
| int pagePos = ( int ) ( position + ( pageNb + 1 ) * LONG_SIZE + INT_SIZE ) - pageNb * pageSize; |
| |
| // Check that the length is correct : it should fit in the provided pageIos |
| int pageEnd = computePageNb( position + length ); |
| |
| if ( pageEnd > pageIos.length ) |
| { |
| // This is wrong... |
| LOG.error( "Wrong size : {}, it's larger than the number of provided pages {}", length, pageIos.length ); |
| throw new ArrayIndexOutOfBoundsException(); |
| } |
| |
| ByteBuffer pageData = pageIos[pageNb].getData(); |
| int remaining = pageData.capacity() - pagePos; |
| |
| if ( length == 0 ) |
| { |
| // No bytes to read : return null; |
| return null; |
| } |
| else |
| { |
| ByteBuffer bytes = ByteBuffer.allocate( length ); |
| |
| while ( length > 0 ) |
| { |
| if ( length <= remaining ) |
| { |
| pageData.mark(); |
| pageData.position( pagePos ); |
| int oldLimit = pageData.limit(); |
| pageData.limit( pagePos + length ); |
| bytes.put( pageData ); |
| pageData.limit( oldLimit ); |
| pageData.reset(); |
| bytes.rewind(); |
| |
| return bytes; |
| } |
| |
| pageData.mark(); |
| pageData.position( pagePos ); |
| int oldLimit = pageData.limit(); |
| pageData.limit( pagePos + remaining ); |
| bytes.put( pageData ); |
| pageData.limit( oldLimit ); |
| pageData.reset(); |
| pageNb++; |
| pagePos = LINK_SIZE; |
| pageData = pageIos[pageNb].getData(); |
| length -= remaining; |
| remaining = pageData.capacity() - pagePos; |
| } |
| |
| bytes.rewind(); |
| |
| return bytes; |
| } |
| } |
| |
| |
| /** |
| * Read an int from pages |
| * @param pageIos The pages we want to read the int from |
| * @param position The position in the data stored in those pages |
| * @return The int we have read |
| */ |
| /* no qualifier */int readInt( PageIO[] pageIos, long position ) |
| { |
| // Compute the page in which we will store the data given the |
| // current position |
| int pageNb = computePageNb( position ); |
| |
| // Compute the position in the current page |
| int pagePos = ( int ) ( position + ( pageNb + 1 ) * LONG_SIZE + INT_SIZE ) - pageNb * pageSize; |
| |
| ByteBuffer pageData = pageIos[pageNb].getData(); |
| int remaining = pageData.capacity() - pagePos; |
| int value = 0; |
| |
| if ( remaining >= INT_SIZE ) |
| { |
| value = pageData.getInt( pagePos ); |
| } |
| else |
| { |
| value = 0; |
| |
| switch ( remaining ) |
| { |
| case 3: |
| value += ( ( pageData.get( pagePos + 2 ) & 0x00FF ) << 8 ); |
| // Fallthrough !!! |
| |
| case 2: |
| value += ( ( pageData.get( pagePos + 1 ) & 0x00FF ) << 16 ); |
| // Fallthrough !!! |
| |
| case 1: |
| value += ( pageData.get( pagePos ) << 24 ); |
| break; |
| } |
| |
| // Now deal with the next page |
| pageData = pageIos[pageNb + 1].getData(); |
| pagePos = LINK_SIZE; |
| |
| switch ( remaining ) |
| { |
| case 1: |
| value += ( pageData.get( pagePos ) & 0x00FF ) << 16; |
| // fallthrough !!! |
| |
| case 2: |
| value += ( pageData.get( pagePos + 2 - remaining ) & 0x00FF ) << 8; |
| // fallthrough !!! |
| |
| case 3: |
| value += ( pageData.get( pagePos + 3 - remaining ) & 0x00FF ); |
| break; |
| } |
| } |
| |
| return value; |
| } |
| |
| |
| /** |
| * Read a byte from pages |
| * @param pageIos The pages we want to read the byte from |
| * @param position The position in the data stored in those pages |
| * @return The byte we have read |
| */ |
| private byte readByte( PageIO[] pageIos, long position ) |
| { |
| // Compute the page in which we will store the data given the |
| // current position |
| int pageNb = computePageNb( position ); |
| |
| // Compute the position in the current page |
| int pagePos = ( int ) ( position + ( pageNb + 1 ) * LONG_SIZE + INT_SIZE ) - pageNb * pageSize; |
| |
| ByteBuffer pageData = pageIos[pageNb].getData(); |
| byte value = 0; |
| |
| value = pageData.get( pagePos ); |
| |
| return value; |
| } |
| |
| |
| /** |
| * Read a long from pages |
| * @param pageIos The pages we want to read the long from |
| * @param position The position in the data stored in those pages |
| * @return The long we have read |
| */ |
| /* no qualifier */long readLong( PageIO[] pageIos, long position ) |
| { |
| // Compute the page in which we will store the data given the |
| // current position |
| int pageNb = computePageNb( position ); |
| |
| // Compute the position in the current page |
| int pagePos = ( int ) ( position + ( pageNb + 1 ) * LONG_SIZE + INT_SIZE ) - pageNb * pageSize; |
| |
| ByteBuffer pageData = pageIos[pageNb].getData(); |
| int remaining = pageData.capacity() - pagePos; |
| long value = 0L; |
| |
| if ( remaining >= LONG_SIZE ) |
| { |
| value = pageData.getLong( pagePos ); |
| } |
| else |
| { |
| switch ( remaining ) |
| { |
| case 7: |
| value += ( ( ( long ) pageData.get( pagePos + 6 ) & 0x00FF ) << 8 ); |
| // Fallthrough !!! |
| |
| case 6: |
| value += ( ( ( long ) pageData.get( pagePos + 5 ) & 0x00FF ) << 16 ); |
| // Fallthrough !!! |
| |
| case 5: |
| value += ( ( ( long ) pageData.get( pagePos + 4 ) & 0x00FF ) << 24 ); |
| // Fallthrough !!! |
| |
| case 4: |
| value += ( ( ( long ) pageData.get( pagePos + 3 ) & 0x00FF ) << 32 ); |
| // Fallthrough !!! |
| |
| case 3: |
| value += ( ( ( long ) pageData.get( pagePos + 2 ) & 0x00FF ) << 40 ); |
| // Fallthrough !!! |
| |
| case 2: |
| value += ( ( ( long ) pageData.get( pagePos + 1 ) & 0x00FF ) << 48 ); |
| // Fallthrough !!! |
| |
| case 1: |
| value += ( ( long ) pageData.get( pagePos ) << 56 ); |
| break; |
| } |
| |
| // Now deal with the next page |
| pageData = pageIos[pageNb + 1].getData(); |
| pagePos = LINK_SIZE; |
| |
| switch ( remaining ) |
| { |
| case 1: |
| value += ( ( long ) pageData.get( pagePos ) & 0x00FF ) << 48; |
| // fallthrough !!! |
| |
| case 2: |
| value += ( ( long ) pageData.get( pagePos + 2 - remaining ) & 0x00FF ) << 40; |
| // fallthrough !!! |
| |
| case 3: |
| value += ( ( long ) pageData.get( pagePos + 3 - remaining ) & 0x00FF ) << 32; |
| // fallthrough !!! |
| |
| case 4: |
| value += ( ( long ) pageData.get( pagePos + 4 - remaining ) & 0x00FF ) << 24; |
| // fallthrough !!! |
| |
| case 5: |
| value += ( ( long ) pageData.get( pagePos + 5 - remaining ) & 0x00FF ) << 16; |
| // fallthrough !!! |
| |
| case 6: |
| value += ( ( long ) pageData.get( pagePos + 6 - remaining ) & 0x00FF ) << 8; |
| // fallthrough !!! |
| |
| case 7: |
| value += ( ( long ) pageData.get( pagePos + 7 - remaining ) & 0x00FF ); |
| break; |
| } |
| } |
| |
| return value; |
| } |
| |
| |
| /** |
| * Manage a B-tree. The btree will be added and managed by this RecordManager. We will create a |
| * new RootPage for this added B-tree, which will contain no data.<br/> |
| * This method is threadsafe. |
| * Managing a btree is a matter of storing an reference to the managed B-tree in the B-tree Of B-trees. |
| * We store a tuple of NameRevision (where revision is 0L) and a offset to the B-tree header. |
| * At the same time, we keep a track of the managed B-trees in a Map. |
| * |
| * @param btree The new B-tree to manage. |
| * @param treeType flag indicating if this is an internal tree |
| * |
| * @throws BTreeAlreadyManagedException If the B-tree is already managed |
| * @throws IOException if there was a problem while accessing the file |
| */ |
| public synchronized <K, V> void manage( BTree<K, V> btree ) throws BTreeAlreadyManagedException, IOException |
| { |
| beginTransaction(); |
| |
| try |
| { |
| LOG.debug( "Managing the btree {}", btree.getName() ); |
| BTreeFactory.setRecordManager( btree, this ); |
| |
| String name = btree.getName(); |
| |
| if ( managedBtrees.containsKey( name ) ) |
| { |
| // There is already a B-tree with this name in the recordManager... |
| LOG.error( "There is already a B-tree named '{}' managed by this recordManager", name ); |
| rollback(); |
| throw new BTreeAlreadyManagedException( name ); |
| } |
| |
| // Now, write the B-tree informations |
| long btreeInfoOffset = writeBtreeInfo( btree ); |
| BTreeHeader<K, V> btreeHeader = ( ( AbstractBTree<K, V> ) btree ).getBtreeHeader(); |
| ( ( PersistedBTree<K, V> ) btree ).setBtreeInfoOffset( btreeInfoOffset ); |
| |
| // Serialize the B-tree root page |
| Page<K, V> rootPage = btreeHeader.getRootPage(); |
| |
| PageIO[] rootPageIos = serializePage( btree, btreeHeader.getRevision(), rootPage ); |
| |
| // Get the reference on the first page |
| long rootPageOffset = rootPageIos[0].getOffset(); |
| |
| // Store the rootPageOffset into the Btree header and into the rootPage |
| btreeHeader.setRootPageOffset( rootPageOffset ); |
| ( ( PersistedLeaf<K, V> ) rootPage ).setOffset( rootPageOffset ); |
| |
| LOG.debug( "Flushing the newly managed '{}' btree rootpage", btree.getName() ); |
| flushPages( rootPageIos ); |
| |
| // And the B-tree header |
| long btreeHeaderOffset = writeBtreeHeader( btree, btreeHeader ); |
| |
| // Now, if this is a new B-tree, add it to the B-tree of B-trees |
| // Add the btree into the map of managed B-trees |
| managedBtrees.put( name, ( BTree<Object, Object> ) btree ); |
| |
| // And in the Map of currentBtreeHeaders and newBtreeHeaders |
| currentBTreeHeaders.put( name, btreeHeader ); |
| newBTreeHeaders.put( name, btreeHeader ); |
| |
| // We can safely increment the number of managed B-trees |
| nbBtree++; |
| |
| // Create the new NameRevision |
| NameRevision nameRevision = new NameRevision( name, 0L ); |
| |
| // Inject it into the B-tree of B-tree |
| btreeOfBtrees.insert( nameRevision, btreeHeaderOffset ); |
| commit(); |
| } |
| catch ( IOException ioe ) |
| { |
| rollback(); |
| throw ioe; |
| } |
| } |
| |
| |
| /** |
| * Managing a btree is a matter of storing an reference to the managed B-tree in the B-tree Of B-trees. |
| * We store a tuple of NameRevision (where revision is 0L) and a offset to the B-tree header. |
| * At the same time, we keep a track of the managed B-trees in a Map. |
| * |
| * @param btree The new B-tree to manage. |
| * @param treeType flag indicating if this is an internal tree |
| * |
| * @throws BTreeAlreadyManagedException If the B-tree is already managed |
| * @throws IOException |
| */ |
| public synchronized <K, V> void manageSubBtree( BTree<K, V> btree ) |
| throws BTreeAlreadyManagedException, IOException |
| { |
| LOG.debug( "Managing the sub-btree {}", btree.getName() ); |
| BTreeFactory.setRecordManager( btree, this ); |
| |
| String name = btree.getName(); |
| |
| if ( managedBtrees.containsKey( name ) ) |
| { |
| // There is already a subB-tree with this name in the recordManager... |
| LOG.error( "There is already a sub-B-tree named '{}' managed by this recordManager", name ); |
| throw new BTreeAlreadyManagedException( name ); |
| } |
| |
| // Now, write the subB-tree informations |
| long btreeInfoOffset = writeBtreeInfo( btree ); |
| BTreeHeader<K, V> btreeHeader = ( ( AbstractBTree<K, V> ) btree ).getBtreeHeader(); |
| ( ( PersistedBTree<K, V> ) btree ).setBtreeInfoOffset( btreeInfoOffset ); |
| |
| // Serialize the B-tree root page |
| Page<K, V> rootPage = btreeHeader.getRootPage(); |
| |
| PageIO[] rootPageIos = serializePage( btree, btreeHeader.getRevision(), rootPage ); |
| |
| // Get the reference on the first page |
| long rootPageOffset = rootPageIos[0].getOffset(); |
| |
| // Store the rootPageOffset into the Btree header and into the rootPage |
| btreeHeader.setRootPageOffset( rootPageOffset ); |
| |
| ( ( AbstractPage<K, V> ) rootPage ).setOffset( rootPageOffset ); |
| |
| LOG.debug( "Flushing the newly managed '{}' btree rootpage", btree.getName() ); |
| flushPages( rootPageIos ); |
| |
| // And the B-tree header |
| long btreeHeaderOffset = writeBtreeHeader( btree, btreeHeader ); |
| |
| // Now, if this is a new B-tree, add it to the B-tree of B-trees |
| // Add the btree into the map of managed B-trees |
| if ( ( btree.getType() != BTreeTypeEnum.BTREE_OF_BTREES ) && ( btree.getType() != BTreeTypeEnum.PERSISTED_SUB ) ) |
| { |
| managedBtrees.put( name, ( BTree<Object, Object> ) btree ); |
| } |
| |
| // And in the Map of currentBtreeHeaders and newBtreeHeaders |
| currentBTreeHeaders.put( name, btreeHeader ); |
| newBTreeHeaders.put( name, btreeHeader ); |
| |
| // Create the new NameRevision |
| NameRevision nameRevision = new NameRevision( name, 0L ); |
| |
| // Inject it into the B-tree of B-tree |
| if ( ( btree.getType() != BTreeTypeEnum.BTREE_OF_BTREES ) && ( btree.getType() != BTreeTypeEnum.PERSISTED_SUB ) ) |
| { |
| // We can safely increment the number of managed B-trees |
| nbBtree++; |
| |
| btreeOfBtrees.insert( nameRevision, btreeHeaderOffset ); |
| } |
| |
| updateRecordManagerHeader(); |
| } |
| |
| |
| /** |
| * Serialize a new Page. It will contain the following data :<br/> |
| * <ul> |
| * <li>the revision : a long</li> |
| * <li>the number of elements : an int (if <= 0, it's a Node, otherwise it's a Leaf)</li> |
| * <li>the size of the values/keys when serialized |
| * <li>the keys : an array of serialized keys</li> |
| * <li>the values : an array of references to the children pageIO offset (stored as long) |
| * if it's a Node, or a list of values if it's a Leaf</li> |
| * <li></li> |
| * </ul> |
| * |
| * @param revision The node revision |
| * @param keys The keys to serialize |
| * @param children The references to the children |
| * @return An array of pages containing the serialized node |
| * @throws IOException |
| */ |
| private <K, V> PageIO[] serializePage( BTree<K, V> btree, long revision, Page<K, V> page ) throws IOException |
| { |
| int nbElems = page.getNbElems(); |
| |
| boolean isNotSubTree = ( btree.getType() != BTreeTypeEnum.PERSISTED_SUB ); |
| |
| if ( nbElems == 0 ) |
| { |
| return serializeRootPage( revision ); |
| } |
| else |
| { |
| // Prepare a list of byte[] that will contain the serialized page |
| int nbBuffers = 1 + 1 + 1 + nbElems * 3; |
| int dataSize = 0; |
| int serializedSize = 0; |
| |
| if ( page.isNode() ) |
| { |
| // A Node has one more value to store |
| nbBuffers++; |
| } |
| |
| // Now, we can create the list with the right size |
| List<byte[]> serializedData = new ArrayList<byte[]>( nbBuffers ); |
| |
| // The revision |
| byte[] buffer = LongSerializer.serialize( revision ); |
| serializedData.add( buffer ); |
| serializedSize += buffer.length; |
| |
| // The number of elements |
| // Make it a negative value if it's a Node |
| int pageNbElems = nbElems; |
| |
| if ( page.isNode() ) |
| { |
| pageNbElems = -nbElems; |
| } |
| |
| buffer = IntSerializer.serialize( pageNbElems ); |
| serializedData.add( buffer ); |
| serializedSize += buffer.length; |
| |
| // Iterate on the keys and values. We first serialize the value, then the key |
| // until we are done with all of them. If we are serializing a page, we have |
| // to serialize one more value |
| for ( int pos = 0; pos < nbElems; pos++ ) |
| { |
| // Start with the value |
| if ( page.isNode() ) |
| { |
| dataSize += serializeNodeValue( ( PersistedNode<K, V> ) page, pos, serializedData ); |
| dataSize += serializeNodeKey( ( PersistedNode<K, V> ) page, pos, serializedData ); |
| } |
| else |
| { |
| if ( isNotSubTree ) |
| { |
| dataSize += serializeLeafValue( ( PersistedLeaf<K, V> ) page, pos, serializedData ); |
| } |
| |
| dataSize += serializeLeafKey( ( PersistedLeaf<K, V> ) page, pos, serializedData ); |
| } |
| } |
| |
| // Nodes have one more value to serialize |
| if ( page.isNode() ) |
| { |
| dataSize += serializeNodeValue( ( PersistedNode<K, V> ) page, nbElems, serializedData ); |
| } |
| |
| // Store the data size |
| buffer = IntSerializer.serialize( dataSize ); |
| serializedData.add( 2, buffer ); |
| serializedSize += buffer.length; |
| |
| serializedSize += dataSize; |
| |
| // We are done. Allocate the pages we need to store the data |
| PageIO[] pageIos = getFreePageIOs( serializedSize ); |
| |
| // And store the data into those pages |
| long position = 0L; |
| |
| for ( byte[] bytes : serializedData ) |
| { |
| position = storeRaw( position, bytes, pageIos ); |
| } |
| |
| return pageIos; |
| } |
| } |
| |
| |
| /** |
| * Serialize a Node's key |
| */ |
| private <K, V> int serializeNodeKey( PersistedNode<K, V> node, int pos, List<byte[]> serializedData ) |
| { |
| KeyHolder<K> holder = node.getKeyHolder( pos ); |
| byte[] buffer = ( ( PersistedKeyHolder<K> ) holder ).getRaw(); |
| |
| // We have to store the serialized key length |
| byte[] length = IntSerializer.serialize( buffer.length ); |
| serializedData.add( length ); |
| |
| // And store the serialized key now if not null |
| if ( buffer.length != 0 ) |
| { |
| serializedData.add( buffer ); |
| } |
| |
| return buffer.length + INT_SIZE; |
| } |
| |
| |
| /** |
| * Serialize a Node's Value. We store the two offsets of the child page. |
| */ |
| private <K, V> int serializeNodeValue( PersistedNode<K, V> node, int pos, List<byte[]> serializedData ) |
| throws IOException |
| { |
| // For a node, we just store the children's offsets |
| Page<K, V> child = node.getReference( pos ); |
| |
| // The first offset |
| byte[] buffer = LongSerializer.serialize( ( ( AbstractPage<K, V> ) child ).getOffset() ); |
| serializedData.add( buffer ); |
| int dataSize = buffer.length; |
| |
| // The last offset |
| buffer = LongSerializer.serialize( ( ( AbstractPage<K, V> ) child ).getLastOffset() ); |
| serializedData.add( buffer ); |
| dataSize += buffer.length; |
| |
| return dataSize; |
| } |
| |
| |
| /** |
| * Serialize a Leaf's key |
| */ |
| private <K, V> int serializeLeafKey( PersistedLeaf<K, V> leaf, int pos, List<byte[]> serializedData ) |
| { |
| int dataSize = 0; |
| KeyHolder<K> keyHolder = leaf.getKeyHolder( pos ); |
| byte[] keyData = ( ( PersistedKeyHolder<K> ) keyHolder ).getRaw(); |
| |
| if ( keyData != null ) |
| { |
| // We have to store the serialized key length |
| byte[] length = IntSerializer.serialize( keyData.length ); |
| serializedData.add( length ); |
| |
| // And the key data |
| serializedData.add( keyData ); |
| dataSize += keyData.length + INT_SIZE; |
| } |
| else |
| { |
| serializedData.add( IntSerializer.serialize( 0 ) ); |
| dataSize += INT_SIZE; |
| } |
| |
| return dataSize; |
| } |
| |
| |
| /** |
| * Serialize a Leaf's Value. |
| */ |
| private <K, V> int serializeLeafValue( PersistedLeaf<K, V> leaf, int pos, List<byte[]> serializedData ) |
| throws IOException |
| { |
| // The value can be an Array or a sub-btree, but we don't care |
| // we just iterate on all the values |
| ValueHolder<V> valueHolder = leaf.getValue( pos ); |
| int dataSize = 0; |
| int nbValues = valueHolder.size(); |
| |
| if ( nbValues == 0 ) |
| { |
| // No value. |
| byte[] buffer = IntSerializer.serialize( nbValues ); |
| serializedData.add( buffer ); |
| |
| return buffer.length; |
| } |
| |
| if ( !valueHolder.isSubBtree() ) |
| { |
| // Write the nb elements first |
| byte[] buffer = IntSerializer.serialize( nbValues ); |
| serializedData.add( buffer ); |
| dataSize = INT_SIZE; |
| |
| // We have a serialized value. Just flush it |
| byte[] data = ( ( PersistedValueHolder<V> ) valueHolder ).getRaw(); |
| dataSize += data.length; |
| |
| // Store the data size |
| buffer = IntSerializer.serialize( data.length ); |
| serializedData.add( buffer ); |
| dataSize += INT_SIZE; |
| |
| // and add the data if it's not 0 |
| if ( data.length > 0 ) |
| { |
| serializedData.add( data ); |
| } |
| } |
| else |
| { |
| // Store the nbVlues as a negative number. We add 1 so that 0 is not confused with an Array value |
| byte[] buffer = IntSerializer.serialize( -( nbValues + 1 ) ); |
| serializedData.add( buffer ); |
| dataSize += buffer.length; |
| |
| // the B-tree offset |
| buffer = LongSerializer.serialize( ( ( PersistedValueHolder<V> ) valueHolder ).getOffset() ); |
| serializedData.add( buffer ); |
| dataSize += buffer.length; |
| } |
| |
| return dataSize; |
| } |
| |
| |
| /** |
| * Write a root page with no elements in it |
| */ |
| private PageIO[] serializeRootPage( long revision ) throws IOException |
| { |
| // We will have 1 single page if we have no elements |
| PageIO[] pageIos = new PageIO[1]; |
| |
| // This is either a new root page or a new page that will be filled later |
| PageIO newPage = fetchNewPage(); |
| |
| // We need first to create a byte[] that will contain all the data |
| // For the root page, this is easy, as we only have to store the revision, |
| // and the number of elements, which is 0. |
| long position = 0L; |
| |
| position = store( position, revision, newPage ); |
| position = store( position, 0, newPage ); |
| |
| // Update the page size now |
| newPage.setSize( ( int ) position ); |
| |
| // Insert the result into the array of PageIO |
| pageIos[0] = newPage; |
| |
| return pageIos; |
| } |
| |
| |
| /** |
| * Update the RecordManager header, injecting the following data : |
| * |
| * <pre> |
| * +---------------------+ |
| * | PageSize | 4 bytes : The size of a physical page (default to 4096) |
| * +---------------------+ |
| * | NbTree | 4 bytes : The number of managed B-trees (at least 1) |
| * +---------------------+ |
| * | FirstFree | 8 bytes : The offset of the first free page |
| * +---------------------+ |
| * | current BoB offset | 8 bytes : The offset of the current B-tree of B-trees |
| * +---------------------+ |
| * | previous BoB offset | 8 bytes : The offset of the previous B-tree of B-trees |
| * +---------------------+ |
| * | current CP offset | 8 bytes : The offset of the current CopiedPages B-tree |
| * +---------------------+ |
| * | previous CP offset | 8 bytes : The offset of the previous CopiedPages B-tree |
| * +---------------------+ |
| * </pre> |
| */ |
| public void updateRecordManagerHeader() |
| { |
| // The page size |
| int position = writeData( RECORD_MANAGER_HEADER_BYTES, 0, pageSize ); |
| |
| // The number of managed B-tree |
| position = writeData( RECORD_MANAGER_HEADER_BYTES, position, nbBtree ); |
| |
| // The first free page |
| position = writeData( RECORD_MANAGER_HEADER_BYTES, position, firstFreePage ); |
| |
| // The offset of the current B-tree of B-trees |
| position = writeData( RECORD_MANAGER_HEADER_BYTES, position, currentBtreeOfBtreesOffset ); |
| |
| // The offset of the copied pages B-tree |
| position = writeData( RECORD_MANAGER_HEADER_BYTES, position, previousBtreeOfBtreesOffset ); |
| |
| // Write the RecordManager header on disk |
| RECORD_MANAGER_HEADER_BUFFER.put( RECORD_MANAGER_HEADER_BYTES ); |
| RECORD_MANAGER_HEADER_BUFFER.flip(); |
| |
| LOG.debug( "Update RM header" ); |
| |
| if ( LOG_PAGES.isDebugEnabled() ) |
| { |
| StringBuilder sb = new StringBuilder(); |
| |
| sb.append( "First free page : 0x" ).append( Long.toHexString( firstFreePage ) ).append( "\n" ); |
| sb.append( "Current BOB header : 0x" ).append( Long.toHexString( currentBtreeOfBtreesOffset ) ) |
| .append( "\n" ); |
| sb.append( "Previous BOB header : 0x" ).append( Long.toHexString( previousBtreeOfBtreesOffset ) ) |
| .append( "\n" ); |
| |
| if ( firstFreePage != NO_PAGE ) |
| { |
| long freePage = firstFreePage; |
| sb.append( "free pages list : " ); |
| |
| boolean isFirst = true; |
| |
| while ( freePage != NO_PAGE ) |
| { |
| if ( isFirst ) |
| { |
| isFirst = false; |
| } |
| else |
| { |
| sb.append( " -> " ); |
| } |
| |
| sb.append( "0x" ).append( Long.toHexString( freePage ) ); |
| |
| try |
| { |
| PageIO[] freePageIO = readPageIOs( freePage, 8 ); |
| |
| freePage = freePageIO[0].getNextPage(); |
| } |
| catch ( EndOfFileExceededException e ) |
| { |
| // TODO Auto-generated catch block |
| e.printStackTrace(); |
| } |
| catch ( IOException e ) |
| { |
| // TODO Auto-generated catch block |
| e.printStackTrace(); |
| } |
| } |
| |
| } |
| |
| LOG_PAGES.debug( "Update RM Header : \n{}", sb.toString() ); |
| } |
| |
| try |
| { |
| |
| Integer nbTxnStarted = context.get(); |
| |
| if ( ( nbTxnStarted == null ) || ( nbTxnStarted <= 1 ) ) |
| { |
| //System.out.println( "Writing page at 0000" ); |
| writeCounter.put( 0L, writeCounter.containsKey( 0L ) ? writeCounter.get( 0L ) + 1 : 1 ); |
| fileChannel.write( RECORD_MANAGER_HEADER_BUFFER, 0 ); |
| } |
| } |
| catch ( IOException ioe ) |
| { |
| throw new FileException( ioe.getMessage() ); |
| } |
| |
| RECORD_MANAGER_HEADER_BUFFER.clear(); |
| |
| // Reset the old versions |
| previousBtreeOfBtreesOffset = -1L; |
| |
| nbUpdateRMHeader.incrementAndGet(); |
| } |
| |
| |
| /** |
| * Update the RecordManager header, injecting the following data : |
| * |
| * <pre> |
| * +---------------------+ |
| * | PageSize | 4 bytes : The size of a physical page (default to 4096) |
| * +---------------------+ |
| * | NbTree | 4 bytes : The number of managed B-trees (at least 1) |
| * +---------------------+ |
| * | FirstFree | 8 bytes : The offset of the first free page |
| * +---------------------+ |
| * | current BoB offset | 8 bytes : The offset of the current B-tree of B-trees |
| * +---------------------+ |
| * | previous BoB offset | 8 bytes : The offset of the previous B-tree of B-trees |
| * +---------------------+ |
| * | current CP offset | 8 bytes : The offset of the current CopiedPages B-tree |
| * +---------------------+ |
| * | previous CP offset | 8 bytes : The offset of the previous CopiedPages B-tree |
| * +---------------------+ |
| * </pre> |
| */ |
| public void updateRecordManagerHeader( long newBtreeOfBtreesOffset, long newCopiedPageBtreeOffset ) |
| { |
| if ( newBtreeOfBtreesOffset != -1L ) |
| { |
| previousBtreeOfBtreesOffset = currentBtreeOfBtreesOffset; |
| currentBtreeOfBtreesOffset = newBtreeOfBtreesOffset; |
| } |
| } |
| |
| |
| /** |
| * Inject an int into a byte[] at a given position. |
| */ |
| private int writeData( byte[] buffer, int position, int value ) |
| { |
| RECORD_MANAGER_HEADER_BYTES[position] = ( byte ) ( value >>> 24 ); |
| RECORD_MANAGER_HEADER_BYTES[position + 1] = ( byte ) ( value >>> 16 ); |
| RECORD_MANAGER_HEADER_BYTES[position + 2] = ( byte ) ( value >>> 8 ); |
| RECORD_MANAGER_HEADER_BYTES[position + 3] = ( byte ) ( value ); |
| |
| return position + 4; |
| } |
| |
| |
| /** |
| * Inject a long into a byte[] at a given position. |
| */ |
| private int writeData( byte[] buffer, int position, long value ) |
| { |
| RECORD_MANAGER_HEADER_BYTES[position] = ( byte ) ( value >>> 56 ); |
| RECORD_MANAGER_HEADER_BYTES[position + 1] = ( byte ) ( value >>> 48 ); |
| RECORD_MANAGER_HEADER_BYTES[position + 2] = ( byte ) ( value >>> 40 ); |
| RECORD_MANAGER_HEADER_BYTES[position + 3] = ( byte ) ( value >>> 32 ); |
| RECORD_MANAGER_HEADER_BYTES[position + 4] = ( byte ) ( value >>> 24 ); |
| RECORD_MANAGER_HEADER_BYTES[position + 5] = ( byte ) ( value >>> 16 ); |
| RECORD_MANAGER_HEADER_BYTES[position + 6] = ( byte ) ( value >>> 8 ); |
| RECORD_MANAGER_HEADER_BYTES[position + 7] = ( byte ) ( value ); |
| |
| return position + 8; |
| } |
| |
| |
| /** |
| * Add a new <btree, revision> tuple into the B-tree of B-trees. |
| * |
| * @param name The B-tree name |
| * @param revision The B-tree revision |
| * @param btreeHeaderOffset The B-tree offset |
| * @throws IOException If the update failed |
| */ |
| /* no qualifier */<K, V> void addInBtreeOfBtrees( String name, long revision, long btreeHeaderOffset ) |
| throws IOException |
| { |
| checkOffset( btreeHeaderOffset ); |
| NameRevision nameRevision = new NameRevision( name, revision ); |
| |
| btreeOfBtrees.insert( nameRevision, btreeHeaderOffset ); |
| |
| // Update the B-tree of B-trees offset |
| currentBtreeOfBtreesOffset = getNewBTreeHeader( BTREE_OF_BTREES_NAME ).getBTreeHeaderOffset(); |
| } |
| |
| |
| /** |
| * Add a new <btree, revision> tuple into the CopiedPages B-tree. |
| * |
| * @param name The B-tree name |
| * @param revision The B-tree revision |
| * @param btreeHeaderOffset The B-tree offset |
| * @throws IOException If the update failed |
| */ |
| /* no qualifier */<K, V> void addInCopiedPagesBtree( String name, long revision, List<Page<K, V>> pages ) |
| throws IOException |
| { |
| RevisionName revisionName = new RevisionName( revision, name ); |
| |
| long[] pageOffsets = new long[pages.size()]; |
| int pos = 0; |
| |
| for ( Page<K, V> page : pages ) |
| { |
| pageOffsets[pos++] = ( ( AbstractPage<K, V> ) page ).getOffset(); |
| } |
| |
| copiedPageMap.put( revisionName, pageOffsets ); |
| } |
| |
| |
| /** |
| * Internal method used to update the B-tree of B-trees offset |
| * @param btreeOfBtreesOffset The new offset |
| */ |
| /* no qualifier */void setBtreeOfBtreesOffset( long btreeOfBtreesOffset ) |
| { |
| checkOffset( btreeOfBtreesOffset ); |
| this.currentBtreeOfBtreesOffset = btreeOfBtreesOffset; |
| } |
| |
| |
| /** |
| * Write the B-tree header on disk. We will write the following informations : |
| * <pre> |
| * +------------+ |
| * | revision | The B-tree revision |
| * +------------+ |
| * | nbElems | The B-tree number of elements |
| * +------------+ |
| * | rootPage | The root page offset |
| * +------------+ |
| * | BtreeInfo | The B-tree info offset |
| * +------------+ |
| * </pre> |
| * @param btree The B-tree which header has to be written |
| * @param btreeInfoOffset The offset of the B-tree informations |
| * @return The B-tree header offset |
| * @throws IOException If we weren't able to write the B-tree header |
| */ |
| /* no qualifier */<K, V> long writeBtreeHeader( BTree<K, V> btree, BTreeHeader<K, V> btreeHeader ) |
| throws IOException |
| { |
| int bufferSize = |
| LONG_SIZE + // The revision |
| LONG_SIZE + // the number of element |
| LONG_SIZE + // The root page offset |
| LONG_SIZE; // The B-tree info page offset |
| |
| // Get the pageIOs we need to store the data. We may need more than one. |
| PageIO[] btreeHeaderPageIos = getFreePageIOs( bufferSize ); |
| |
| // Store the B-tree header Offset into the B-tree |
| long btreeHeaderOffset = btreeHeaderPageIos[0].getOffset(); |
| |
| // Now store the B-tree data in the pages : |
| // - the B-tree revision |
| // - the B-tree number of elements |
| // - the B-tree root page offset |
| // - the B-tree info page offset |
| // Starts at 0 |
| long position = 0L; |
| |
| // The B-tree current revision |
| position = store( position, btreeHeader.getRevision(), btreeHeaderPageIos ); |
| |
| // The nb elems in the tree |
| position = store( position, btreeHeader.getNbElems(), btreeHeaderPageIos ); |
| |
| // Now, we can inject the B-tree rootPage offset into the B-tree header |
| position = store( position, btreeHeader.getRootPageOffset(), btreeHeaderPageIos ); |
| |
| // The B-tree info page offset |
| position = store( position, ( ( PersistedBTree<K, V> ) btree ).getBtreeInfoOffset(), btreeHeaderPageIos ); |
| |
| // And flush the pages to disk now |
| LOG.debug( "Flushing the newly managed '{}' btree header", btree.getName() ); |
| |
| if ( LOG_PAGES.isDebugEnabled() ) |
| { |
| LOG_PAGES.debug( "Writing BTreeHeader revision {} for {}", btreeHeader.getRevision(), btree.getName() ); |
| StringBuilder sb = new StringBuilder(); |
| |
| sb.append( "Offset : " ).append( Long.toHexString( btreeHeaderOffset ) ).append( "\n" ); |
| sb.append( " Revision : " ).append( btreeHeader.getRevision() ).append( "\n" ); |
| sb.append( " NbElems : " ).append( btreeHeader.getNbElems() ).append( "\n" ); |
| sb.append( " RootPage : 0x" ).append( Long.toHexString( btreeHeader.getRootPageOffset() ) ) |
| .append( "\n" ); |
| sb.append( " Info : 0x" ) |
| .append( Long.toHexString( ( ( PersistedBTree<K, V> ) btree ).getBtreeInfoOffset() ) ).append( "\n" ); |
| |
| LOG_PAGES.debug( "Btree Header[{}]\n{}", btreeHeader.getRevision(), sb.toString() ); |
| } |
| |
| flushPages( btreeHeaderPageIos ); |
| |
| btreeHeader.setBTreeHeaderOffset( btreeHeaderOffset ); |
| |
| return btreeHeaderOffset; |
| } |
| |
| |
| /** |
| * Write the B-tree informations on disk. We will write the following informations : |
| * <pre> |
| * +------------+ |
| * | pageSize | The B-tree page size (ie, the number of elements per page max) |
| * +------------+ |
| * | nameSize | The B-tree name size |
| * +------------+ |
| * | name | The B-tree name |
| * +------------+ |
| * | keySerSize | The keySerializer FQCN size |
| * +------------+ |
| * | keySerFQCN | The keySerializer FQCN |
| * +------------+ |
| * | valSerSize | The Value serializer FQCN size |
| * +------------+ |
| * | valSerKQCN | The valueSerializer FQCN |
| * +------------+ |
| * | dups | The flags that tell if the dups are allowed |
| * +------------+ |
| * </pre> |
| * @param btree The B-tree which header has to be written |
| * @return The B-tree header offset |
| * @throws IOException If we weren't able to write the B-tree header |
| */ |
| private <K, V> long writeBtreeInfo( BTree<K, V> btree ) throws IOException |
| { |
| // We will add the newly managed B-tree at the end of the header. |
| byte[] btreeNameBytes = Strings.getBytesUtf8( btree.getName() ); |
| byte[] keySerializerBytes = Strings.getBytesUtf8( btree.getKeySerializerFQCN() ); |
| byte[] valueSerializerBytes = Strings.getBytesUtf8( btree.getValueSerializerFQCN() ); |
| |
| int bufferSize = |
| INT_SIZE + // The page size |
| INT_SIZE + // The name size |
| btreeNameBytes.length + // The name |
| INT_SIZE + // The keySerializerBytes size |
| keySerializerBytes.length + // The keySerializerBytes |
| INT_SIZE + // The valueSerializerBytes size |
| valueSerializerBytes.length + // The valueSerializerBytes |
| INT_SIZE; // The allowDuplicates flag |
| |
| // Get the pageIOs we need to store the data. We may need more than one. |
| PageIO[] btreeHeaderPageIos = getFreePageIOs( bufferSize ); |
| |
| // Keep the B-tree header Offset into the B-tree |
| long btreeInfoOffset = btreeHeaderPageIos[0].getOffset(); |
| |
| // Now store the B-tree information data in the pages : |
| // - the B-tree page size |
| // - the B-tree name |
| // - the keySerializer FQCN |
| // - the valueSerializer FQCN |
| // - the flags that tell if the dups are allowed |
| // Starts at 0 |
| long position = 0L; |
| |
| // The B-tree page size |
| position = store( position, btree.getPageSize(), btreeHeaderPageIos ); |
| |
| // The tree name |
| position = store( position, btreeNameBytes, btreeHeaderPageIos ); |
| |
| // The keySerializer FQCN |
| position = store( position, keySerializerBytes, btreeHeaderPageIos ); |
| |
| // The valueSerialier FQCN |
| position = store( position, valueSerializerBytes, btreeHeaderPageIos ); |
| |
| // The allowDuplicates flag |
| position = store( position, ( btree.isAllowDuplicates() ? 1 : 0 ), btreeHeaderPageIos ); |
| |
| // And flush the pages to disk now |
| LOG.debug( "Flushing the newly managed '{}' btree header", btree.getName() ); |
| flushPages( btreeHeaderPageIos ); |
| |
| return btreeInfoOffset; |
| } |
| |
| |
| /** |
| * Update the B-tree header after a B-tree modification. This will make the latest modification |
| * visible.<br/> |
| * We update the following fields : |
| * <ul> |
| * <li>the revision</li> |
| * <li>the number of elements</li> |
| * <li>the B-tree root page offset</li> |
| * </ul> |
| * <br/> |
| * As a result, a new version of the BtreHeader will be created, which will replace the previous |
| * B-tree header |
| * @param btree TheB-tree to update |
| * @param btreeHeaderOffset The offset of the modified btree header |
| * @return The offset of the new B-tree Header |
| * @throws IOException If we weren't able to write the file on disk |
| * @throws EndOfFileExceededException If we tried to write after the end of the file |
| */ |
| /* no qualifier */<K, V> long updateBtreeHeader( BTree<K, V> btree, long btreeHeaderOffset ) |
| throws EndOfFileExceededException, IOException |
| { |
| return updateBtreeHeader( btree, btreeHeaderOffset, false ); |
| } |
| |
| |
| /** |
| * Update the B-tree header after a B-tree modification. This will make the latest modification |
| * visible.<br/> |
| * We update the following fields : |
| * <ul> |
| * <li>the revision</li> |
| * <li>the number of elements</li> |
| * <li>the reference to the current B-tree revisions</li> |
| * <li>the reference to the old B-tree revisions</li> |
| * </ul> |
| * <br/> |
| * As a result, we new version of the BtreHeader will be created |
| * @param btree The B-tree to update |
| * @param btreeHeaderOffset The offset of the modified btree header |
| * @return The offset of the new B-tree Header if it has changed (ie, when the onPlace flag is set to true) |
| * @throws IOException |
| * @throws EndOfFileExceededException |
| */ |
| /* no qualifier */<K, V> void updateBtreeHeaderOnPlace( BTree<K, V> btree, long btreeHeaderOffset ) |
| throws EndOfFileExceededException, |
| IOException |
| { |
| updateBtreeHeader( btree, btreeHeaderOffset, true ); |
| } |
| |
| |
| /** |
| * Update the B-tree header after a B-tree modification. This will make the latest modification |
| * visible.<br/> |
| * We update the following fields : |
| * <ul> |
| * <li>the revision</li> |
| * <li>the number of elements</li> |
| * <li>the reference to the current B-tree revisions</li> |
| * <li>the reference to the old B-tree revisions</li> |
| * </ul> |
| * <br/> |
| * As a result, a new version of the BtreHeader will be created, which may replace the previous |
| * B-tree header (if the onPlace flag is set to true) or a new set of pageIos will contain the new |
| * version. |
| * |
| * @param btree The B-tree to update |
| * @param rootPageOffset The offset of the modified rootPage |
| * @param onPlace Tells if we modify the B-tree on place, or if we create a copy |
| * @return The offset of the new B-tree Header if it has changed (ie, when the onPlace flag is set to true) |
| * @throws EndOfFileExceededException If we tried to write after the end of the file |
| * @throws IOException If tehre were some error while writing the data on disk |
| */ |
| private <K, V> long updateBtreeHeader( BTree<K, V> btree, long btreeHeaderOffset, boolean onPlace ) |
| throws EndOfFileExceededException, IOException |
| { |
| // Read the pageIOs associated with this B-tree |
| PageIO[] pageIos; |
| long newBtreeHeaderOffset = NO_PAGE; |
| long offset = ( ( PersistedBTree<K, V> ) btree ).getBtreeOffset(); |
| |
| if ( onPlace ) |
| { |
| // We just have to update the existing BTreeHeader |
| long headerSize = LONG_SIZE + LONG_SIZE + LONG_SIZE; |
| |
| pageIos = readPageIOs( offset, headerSize ); |
| |
| // Now, update the revision |
| long position = 0; |
| |
| position = store( position, btree.getRevision(), pageIos ); |
| position = store( position, btree.getNbElems(), pageIos ); |
| position = store( position, btreeHeaderOffset, pageIos ); |
| |
| // Write the pages on disk |
| if ( LOG.isDebugEnabled() ) |
| { |
| LOG.debug( "-----> Flushing the '{}' B-treeHeader", btree.getName() ); |
| LOG.debug( " revision : " + btree.getRevision() + ", NbElems : " + btree.getNbElems() |
| + ", btreeHeader offset : 0x" |
| + Long.toHexString( btreeHeaderOffset ) ); |
| } |
| |
| // Get new place on disk to store the modified BTreeHeader if it's not onPlace |
| // Rewrite the pages at the same place |
| LOG.debug( "Rewriting the B-treeHeader on place for B-tree " + btree.getName() ); |
| flushPages( pageIos ); |
| } |
| else |
| { |
| // We have to read and copy the existing BTreeHeader and to create a new one |
| pageIos = readPageIOs( offset, Long.MAX_VALUE ); |
| |
| // Now, copy every read page |
| PageIO[] newPageIOs = new PageIO[pageIos.length]; |
| int pos = 0; |
| |
| for ( PageIO pageIo : pageIos ) |
| { |
| // Fetch a free page |
| newPageIOs[pos] = fetchNewPage(); |
| |
| // keep a track of the allocated and copied pages so that we can |
| // free them when we do a commit or rollback, if the btree is an management one |
| if ( ( btree.getType() == BTreeTypeEnum.BTREE_OF_BTREES ) |
| || ( btree.getType() == BTreeTypeEnum.COPIED_PAGES_BTREE ) ) |
| { |
| freedPages.add( pageIo ); |
| allocatedPages.add( newPageIOs[pos] ); |
| } |
| |
| pageIo.copy( newPageIOs[pos] ); |
| |
| if ( pos > 0 ) |
| { |
| newPageIOs[pos - 1].setNextPage( newPageIOs[pos].getOffset() ); |
| } |
| |
| pos++; |
| } |
| |
| // store the new btree header offset |
| // and update the revision |
| long position = 0; |
| |
| position = store( position, btree.getRevision(), newPageIOs ); |
| position = store( position, btree.getNbElems(), newPageIOs ); |
| position = store( position, btreeHeaderOffset, newPageIOs ); |
| |
| // Get new place on disk to store the modified BTreeHeader if it's not onPlace |
| // Flush the new B-treeHeader on disk |
| LOG.debug( "Rewriting the B-treeHeader on place for B-tree " + btree.getName() ); |
| flushPages( newPageIOs ); |
| |
| newBtreeHeaderOffset = newPageIOs[0].getOffset(); |
| } |
| |
| nbUpdateBtreeHeader.incrementAndGet(); |
| |
| if ( LOG_CHECK.isDebugEnabled() ) |
| { |
| MavibotInspector.check( this ); |
| } |
| |
| return newBtreeHeaderOffset; |
| } |
| |
| |
| /** |
| * Write the pages on disk, either at the end of the file, or at |
| * the position they were taken from. |
| * |
| * @param pageIos The list of pages to write |
| * @throws IOException If the write failed |
| */ |
| private void flushPages( PageIO... pageIos ) throws IOException |
| { |
| if ( LOG.isDebugEnabled() ) |
| { |
| for ( PageIO pageIo : pageIos ) |
| { |
| dump( pageIo ); |
| } |
| } |
| |
| for ( PageIO pageIo : pageIos ) |
| { |
| pageIo.getData().rewind(); |
| long pos = pageIo.getOffset(); |
| |
| if ( fileChannel.size() < ( pageIo.getOffset() + pageSize ) ) |
| { |
| LOG.debug( "Adding a page at the end of the file" ); |
| // This is a page we have to add to the file |
| pos = fileChannel.size(); |
| fileChannel.write( pageIo.getData(), pos ); |
| //fileChannel.force( false ); |
| } |
| else |
| { |
| LOG.debug( "Writing a page at position {}", pageIo.getOffset() ); |
| fileChannel.write( pageIo.getData(), pageIo.getOffset() ); |
| //fileChannel.force( false ); |
| } |
| |
| //System.out.println( "Writing page at " + Long.toHexString( pos ) ); |
| writeCounter.put( pos, writeCounter.containsKey( pos ) ? writeCounter.get( pos ) + 1 : 1 ); |
| |
| nbUpdatePageIOs.incrementAndGet(); |
| |
| pageIo.getData().rewind(); |
| } |
| } |
| |
| |
| /** |
| * Compute the page in which we will store data given an offset, when |
| * we have a list of pages. |
| * |
| * @param offset The position in the data |
| * @return The page number in which the offset will start |
| */ |
| private int computePageNb( long offset ) |
| { |
| long pageNb = 0; |
| |
| offset -= pageSize - LINK_SIZE - PAGE_SIZE; |
| |
| if ( offset < 0 ) |
| { |
| return ( int ) pageNb; |
| } |
| |
| pageNb = 1 + offset / ( pageSize - LINK_SIZE ); |
| |
| return ( int ) pageNb; |
| } |
| |
| |
| /** |
| * Stores a byte[] into one ore more pageIO (depending if the long is stored |
| * across a boundary or not) |
| * |
| * @param position The position in a virtual byte[] if all the pages were contiguous |
| * @param bytes The byte[] to serialize |
| * @param pageIos The pageIOs we have to store the data in |
| * @return The new offset |
| */ |
| private long store( long position, byte[] bytes, PageIO... pageIos ) |
| { |
| if ( bytes != null ) |
| { |
| // Write the bytes length |
| position = store( position, bytes.length, pageIos ); |
| |
| // Compute the page in which we will store the data given the |
| // current position |
| int pageNb = computePageNb( position ); |
| |
| // Get back the buffer in this page |
| ByteBuffer pageData = pageIos[pageNb].getData(); |
| |
| // Compute the position in the current page |
| int pagePos = ( int ) ( position + ( pageNb + 1 ) * LONG_SIZE + INT_SIZE ) - pageNb * pageSize; |
| |
| // Compute the remaining size in the page |
| int remaining = pageData.capacity() - pagePos; |
| int nbStored = bytes.length; |
| |
| // And now, write the bytes until we have none |
| while ( nbStored > 0 ) |
| { |
| if ( remaining > nbStored ) |
| { |
| pageData.mark(); |
| pageData.position( pagePos ); |
| pageData.put( bytes, bytes.length - nbStored, nbStored ); |
| pageData.reset(); |
| nbStored = 0; |
| } |
| else |
| { |
| pageData.mark(); |
| pageData.position( pagePos ); |
| pageData.put( bytes, bytes.length - nbStored, remaining ); |
| pageData.reset(); |
| pageNb++; |
| pageData = pageIos[pageNb].getData(); |
| pagePos = LINK_SIZE; |
| nbStored -= remaining; |
| remaining = pageData.capacity() - pagePos; |
| } |
| } |
| |
| // We are done |
| position += bytes.length; |
| } |
| else |
| { |
| // No bytes : write 0 and return |
| position = store( position, 0, pageIos ); |
| } |
| |
| return position; |
| } |
| |
| |
| /** |
| * Stores a byte[] into one ore more pageIO (depending if the long is stored |
| * across a boundary or not). We don't add the byte[] size, it's already present |
| * in the received byte[]. |
| * |
| * @param position The position in a virtual byte[] if all the pages were contiguous |
| * @param bytes The byte[] to serialize |
| * @param pageIos The pageIOs we have to store the data in |
| * @return The new offset |
| */ |
| private long storeRaw( long position, byte[] bytes, PageIO... pageIos ) |
| { |
| if ( bytes != null ) |
| { |
| // Compute the page in which we will store the data given the |
| // current position |
| int pageNb = computePageNb( position ); |
| |
| // Get back the buffer in this page |
| ByteBuffer pageData = pageIos[pageNb].getData(); |
| |
| // Compute the position in the current page |
| int pagePos = ( int ) ( position + ( pageNb + 1 ) * LONG_SIZE + INT_SIZE ) - pageNb * pageSize; |
| |
| // Compute the remaining size in the page |
| int remaining = pageData.capacity() - pagePos; |
| int nbStored = bytes.length; |
| |
| // And now, write the bytes until we have none |
| while ( nbStored > 0 ) |
| { |
| if ( remaining > nbStored ) |
| { |
| pageData.mark(); |
| pageData.position( pagePos ); |
| pageData.put( bytes, bytes.length - nbStored, nbStored ); |
| pageData.reset(); |
| nbStored = 0; |
| } |
| else |
| { |
| pageData.mark(); |
| pageData.position( pagePos ); |
| pageData.put( bytes, bytes.length - nbStored, remaining ); |
| pageData.reset(); |
| pageNb++; |
| |
| if ( pageNb == pageIos.length ) |
| { |
| // We can stop here : we have reach the end of the page |
| break; |
| } |
| |
| pageData = pageIos[pageNb].getData(); |
| pagePos = LINK_SIZE; |
| nbStored -= remaining; |
| remaining = pageData.capacity() - pagePos; |
| } |
| } |
| |
| // We are done |
| position += bytes.length; |
| } |
| else |
| { |
| // No bytes : write 0 and return |
| position = store( position, 0, pageIos ); |
| } |
| |
| return position; |
| } |
| |
| |
| /** |
| * Stores an Integer into one ore more pageIO (depending if the int is stored |
| * across a boundary or not) |
| * |
| * @param position The position in a virtual byte[] if all the pages were contiguous |
| * @param value The int to serialize |
| * @param pageIos The pageIOs we have to store the data in |
| * @return The new offset |
| */ |
| private long store( long position, int value, PageIO... pageIos ) |
| { |
| // Compute the page in which we will store the data given the |
| // current position |
| int pageNb = computePageNb( position ); |
| |
| // Compute the position in the current page |
| int pagePos = ( int ) ( position + ( pageNb + 1 ) * LONG_SIZE + INT_SIZE ) - pageNb * pageSize; |
| |
| // Get back the buffer in this page |
| ByteBuffer pageData = pageIos[pageNb].getData(); |
| |
| // Compute the remaining size in the page |
| int remaining = pageData.capacity() - pagePos; |
| |
| if ( remaining < INT_SIZE ) |
| { |
| // We have to copy the serialized length on two pages |
| |
| switch ( remaining ) |
| { |
| case 3: |
| pageData.put( pagePos + 2, ( byte ) ( value >>> 8 ) ); |
| // Fallthrough !!! |
| |
| case 2: |
| pageData.put( pagePos + 1, ( byte ) ( value >>> 16 ) ); |
| // Fallthrough !!! |
| |
| case 1: |
| pageData.put( pagePos, ( byte ) ( value >>> 24 ) ); |
| break; |
| } |
| |
| // Now deal with the next page |
| pageData = pageIos[pageNb + 1].getData(); |
| pagePos = LINK_SIZE; |
| |
| switch ( remaining ) |
| { |
| case 1: |
| pageData.put( pagePos, ( byte ) ( value >>> 16 ) ); |
| // fallthrough !!! |
| |
| case 2: |
| pageData.put( pagePos + 2 - remaining, ( byte ) ( value >>> 8 ) ); |
| // fallthrough !!! |
| |
| case 3: |
| pageData.put( pagePos + 3 - remaining, ( byte ) ( value ) ); |
| break; |
| } |
| } |
| else |
| { |
| // Store the value in the page at the selected position |
| pageData.putInt( pagePos, value ); |
| } |
| |
| // Increment the position to reflect the addition of an Int (4 bytes) |
| position += INT_SIZE; |
| |
| return position; |
| } |
| |
| |
| /** |
| * Stores a Long into one ore more pageIO (depending if the long is stored |
| * across a boundary or not) |
| * |
| * @param position The position in a virtual byte[] if all the pages were contiguous |
| * @param value The long to serialize |
| * @param pageIos The pageIOs we have to store the data in |
| * @return The new offset |
| */ |
| private long store( long position, long value, PageIO... pageIos ) |
| { |
| // Compute the page in which we will store the data given the |
| // current position |
| int pageNb = computePageNb( position ); |
| |
| // Compute the position in the current page |
| int pagePos = ( int ) ( position + ( pageNb + 1 ) * LONG_SIZE + INT_SIZE ) - pageNb * pageSize; |
| |
| // Get back the buffer in this page |
| ByteBuffer pageData = pageIos[pageNb].getData(); |
| |
| // Compute the remaining size in the page |
| int remaining = pageData.capacity() - pagePos; |
| |
| if ( remaining < LONG_SIZE ) |
| { |
| // We have to copy the serialized length on two pages |
| |
| switch ( remaining ) |
| { |
| case 7: |
| pageData.put( pagePos + 6, ( byte ) ( value >>> 8 ) ); |
| // Fallthrough !!! |
| |
| case 6: |
| pageData.put( pagePos + 5, ( byte ) ( value >>> 16 ) ); |
| // Fallthrough !!! |
| |
| case 5: |
| pageData.put( pagePos + 4, ( byte ) ( value >>> 24 ) ); |
| // Fallthrough !!! |
| |
| case 4: |
| pageData.put( pagePos + 3, ( byte ) ( value >>> 32 ) ); |
| // Fallthrough !!! |
| |
| case 3: |
| pageData.put( pagePos + 2, ( byte ) ( value >>> 40 ) ); |
| // Fallthrough !!! |
| |
| case 2: |
| pageData.put( pagePos + 1, ( byte ) ( value >>> 48 ) ); |
| // Fallthrough !!! |
| |
| case 1: |
| pageData.put( pagePos, ( byte ) ( value >>> 56 ) ); |
| break; |
| } |
| |
| // Now deal with the next page |
| pageData = pageIos[pageNb + 1].getData(); |
| pagePos = LINK_SIZE; |
| |
| switch ( remaining ) |
| { |
| case 1: |
| pageData.put( pagePos, ( byte ) ( value >>> 48 ) ); |
| // fallthrough !!! |
| |
| case 2: |
| pageData.put( pagePos + 2 - remaining, ( byte ) ( value >>> 40 ) ); |
| // fallthrough !!! |
| |
| case 3: |
| pageData.put( pagePos + 3 - remaining, ( byte ) ( value >>> 32 ) ); |
| // fallthrough !!! |
| |
| case 4: |
| pageData.put( pagePos + 4 - remaining, ( byte ) ( value >>> 24 ) ); |
| // fallthrough !!! |
| |
| case 5: |
| pageData.put( pagePos + 5 - remaining, ( byte ) ( value >>> 16 ) ); |
| // fallthrough !!! |
| |
| case 6: |
| pageData.put( pagePos + 6 - remaining, ( byte ) ( value >>> 8 ) ); |
| // fallthrough !!! |
| |
| case 7: |
| pageData.put( pagePos + 7 - remaining, ( byte ) ( value ) ); |
| break; |
| } |
| } |
| else |
| { |
| // Store the value in the page at the selected position |
| pageData.putLong( pagePos, value ); |
| } |
| |
| // Increment the position to reflect the addition of an Long (8 bytes) |
| position += LONG_SIZE; |
| |
| return position; |
| } |
| |
| |
| /** |
| * Write the page in a serialized form. |
| * |
| * @param btree The persistedBtree we will create a new PageHolder for |
| * @param newPage The page to write on disk |
| * @param newRevision The page's revision |
| * @return A PageHolder containing the copied page |
| * @throws IOException If the page can't be written on disk |
| */ |
| /* No qualifier*/<K, V> PageHolder<K, V> writePage( BTree<K, V> btree, Page<K, V> newPage, |
| long newRevision ) throws IOException |
| { |
| // We first need to save the new page on disk |
| PageIO[] pageIos = serializePage( btree, newRevision, newPage ); |
| |
| if ( LOG_PAGES.isDebugEnabled() ) |
| { |
| LOG_PAGES.debug( "Write data for '{}' btree", btree.getName() ); |
| |
| logPageIos( pageIos ); |
| } |
| |
| // Write the page on disk |
| flushPages( pageIos ); |
| |
| // Build the resulting reference |
| long offset = pageIos[0].getOffset(); |
| long lastOffset = pageIos[pageIos.length - 1].getOffset(); |
| PersistedPageHolder<K, V> pageHolder = new PersistedPageHolder<K, V>( btree, newPage, offset, |
| lastOffset ); |
| |
| return pageHolder; |
| } |
| |
| |
| /* No qualifier */static void logPageIos( PageIO[] pageIos ) |
| { |
| int pageNb = 0; |
| |
| for ( PageIO pageIo : pageIos ) |
| { |
| StringBuilder sb = new StringBuilder(); |
| sb.append( "PageIO[" ).append( pageNb ).append( "]:0x" ); |
| sb.append( Long.toHexString( pageIo.getOffset() ) ).append( "/" ); |
| sb.append( pageIo.getSize() ); |
| pageNb++; |
| |
| ByteBuffer data = pageIo.getData(); |
| |
| int position = data.position(); |
| int dataLength = ( int ) pageIo.getSize() + 12; |
| |
| if ( dataLength > data.limit() ) |
| { |
| dataLength = data.limit(); |
| } |
| |
| byte[] bytes = new byte[dataLength]; |
| |
| data.get( bytes ); |
| data.position( position ); |
| int pos = 0; |
| |
| for ( byte b : bytes ) |
| { |
| int mod = pos % 16; |
| |
| switch ( mod ) |
| { |
| case 0: |
| sb.append( "\n " ); |
| // No break |
| case 4: |
| case 8: |
| case 12: |
| sb.append( " " ); |
| case 1: |
| case 2: |
| case 3: |
| case 5: |
| case 6: |
| case 7: |
| case 9: |
| case 10: |
| case 11: |
| case 13: |
| case 14: |
| case 15: |
| sb.append( Strings.dumpByte( b ) ).append( " " ); |
| } |
| pos++; |
| } |
| |
| LOG_PAGES.debug( sb.toString() ); |
| } |
| } |
| |
| |
| /** |
| * Compute the number of pages needed to store some specific size of data. |
| * |
| * @param dataSize The size of the data we want to store in pages |
| * @return The number of pages needed |
| */ |
| private int computeNbPages( int dataSize ) |
| { |
| if ( dataSize <= 0 ) |
| { |
| return 0; |
| } |
| |
| // Compute the number of pages needed. |
| // Considering that each page can contain PageSize bytes, |
| // but that the first 8 bytes are used for links and we |
| // use 4 bytes to store the data size, the number of needed |
| // pages is : |
| // NbPages = ( (dataSize - (PageSize - 8 - 4 )) / (PageSize - 8) ) + 1 |
| // NbPages += ( if (dataSize - (PageSize - 8 - 4 )) % (PageSize - 8) > 0 : 1 : 0 ) |
| int availableSize = ( pageSize - LONG_SIZE ); |
| int nbNeededPages = 1; |
| |
| // Compute the number of pages that will be full but the first page |
| if ( dataSize > availableSize - INT_SIZE ) |
| { |
| int remainingSize = dataSize - ( availableSize - INT_SIZE ); |
| nbNeededPages += remainingSize / availableSize; |
| int remain = remainingSize % availableSize; |
| |
| if ( remain > 0 ) |
| { |
| nbNeededPages++; |
| } |
| } |
| |
| return nbNeededPages; |
| } |
| |
| |
| /** |
| * Get as many pages as needed to store the data of the given size. The returned |
| * PageIOs are all linked together. |
| * |
| * @param dataSize The data size |
| * @return An array of pages, enough to store the full data |
| */ |
| private PageIO[] getFreePageIOs( int dataSize ) throws IOException |
| { |
| if ( dataSize == 0 ) |
| { |
| return new PageIO[] |
| {}; |
| } |
| |
| int nbNeededPages = computeNbPages( dataSize ); |
| |
| PageIO[] pageIOs = new PageIO[nbNeededPages]; |
| |
| // The first page : set the size |
| pageIOs[0] = fetchNewPage(); |
| pageIOs[0].setSize( dataSize ); |
| |
| for ( int i = 1; i < nbNeededPages; i++ ) |
| { |
| pageIOs[i] = fetchNewPage(); |
| |
| // Create the link |
| pageIOs[i - 1].setNextPage( pageIOs[i].getOffset() ); |
| } |
| |
| return pageIOs; |
| } |
| |
| |
| /** |
| * Return a new Page. We take one of the existing free pages, or we create |
| * a new page at the end of the file. |
| * |
| * @return The fetched PageIO |
| */ |
| private PageIO fetchNewPage() throws IOException |
| { |
| //System.out.println( "Fetching new page" ); |
| if ( firstFreePage == NO_PAGE ) |
| { |
| nbCreatedPages.incrementAndGet(); |
| |
| // We don't have any free page. Reclaim some new page at the end |
| // of the file |
| PageIO newPage = new PageIO( endOfFileOffset ); |
| |
| endOfFileOffset += pageSize; |
| |
| ByteBuffer data = ByteBuffer.allocateDirect( pageSize ); |
| |
| newPage.setData( data ); |
| newPage.setNextPage( NO_PAGE ); |
| newPage.setSize( 0 ); |
| |
| LOG.debug( "Requiring a new page at offset {}", newPage.getOffset() ); |
| |
| return newPage; |
| } |
| else |
| { |
| nbReusedPages.incrementAndGet(); |
| |
| freePageLock.lock(); |
| |
| // We have some existing free page. Fetch it from disk |
| PageIO pageIo = fetchPage( firstFreePage ); |
| |
| // Update the firstFreePage pointer |
| firstFreePage = pageIo.getNextPage(); |
| |
| freePageLock.unlock(); |
| |
| // overwrite the data of old page |
| ByteBuffer data = ByteBuffer.allocateDirect( pageSize ); |
| pageIo.setData( data ); |
| |
| pageIo.setNextPage( NO_PAGE ); |
| pageIo.setSize( 0 ); |
| |
| LOG.debug( "Reused page at offset {}", pageIo.getOffset() ); |
| |
| return pageIo; |
| } |
| } |
| |
| |
| /** |
| * fetch a page from disk, knowing its position in the file. |
| * |
| * @param offset The position in the file |
| * @return The found page |
| */ |
| /* no qualifier */PageIO fetchPage( long offset ) throws IOException, EndOfFileExceededException |
| { |
| checkOffset( offset ); |
| |
| if ( fileChannel.size() < offset + pageSize ) |
| { |
| // Error : we are past the end of the file |
| throw new EndOfFileExceededException( "We are fetching a page on " + offset + |
| " when the file's size is " + fileChannel.size() ); |
| } |
| else |
| { |
| // Read the page |
| fileChannel.position( offset ); |
| |
| ByteBuffer data = ByteBuffer.allocate( pageSize ); |
| fileChannel.read( data ); |
| data.rewind(); |
| |
| PageIO readPage = new PageIO( offset ); |
| readPage.setData( data ); |
| |
| return readPage; |
| } |
| } |
| |
| |
| /** |
| * @return the pageSize |
| */ |
| public int getPageSize() |
| { |
| return pageSize; |
| } |
| |
| |
| /** |
| * Set the page size, ie the number of bytes a page can store. |
| * |
| * @param pageSize The number of bytes for a page |
| */ |
| /* no qualifier */void setPageSize( int pageSize ) |
| { |
| if ( this.pageSize >= 13 ) |
| { |
| this.pageSize = pageSize; |
| } |
| else |
| { |
| this.pageSize = DEFAULT_PAGE_SIZE; |
| } |
| } |
| |
| |
| /** |
| * Close the RecordManager and flush everything on disk |
| */ |
| public void close() throws IOException |
| { |
| beginTransaction(); |
| |
| // Close all the managed B-trees |
| for ( BTree<Object, Object> tree : managedBtrees.values() ) |
| { |
| tree.close(); |
| } |
| |
| // Close the management B-trees |
| btreeOfBtrees.close(); |
| |
| managedBtrees.clear(); |
| |
| // Write the data |
| fileChannel.force( true ); |
| |
| // And close the channel |
| fileChannel.close(); |
| |
| reclaimer.storeCopiedPageMap( file.getParentFile() ); |
| |
| commit(); |
| } |
| |
| /** Hex chars */ |
| private static final byte[] HEX_CHAR = new byte[] |
| { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F' }; |
| |
| |
| public static String dump( byte octet ) |
| { |
| return new String( new byte[] |
| { HEX_CHAR[( octet & 0x00F0 ) >> 4], HEX_CHAR[octet & 0x000F] } ); |
| } |
| |
| |
| /** |
| * Dump a pageIO |
| */ |
| private void dump( PageIO pageIo ) |
| { |
| ByteBuffer buffer = pageIo.getData(); |
| buffer.mark(); |
| byte[] longBuffer = new byte[LONG_SIZE]; |
| byte[] intBuffer = new byte[INT_SIZE]; |
| |
| // get the next page offset |
| buffer.get( longBuffer ); |
| long nextOffset = LongSerializer.deserialize( longBuffer ); |
| |
| // Get the data size |
| buffer.get( intBuffer ); |
| int size = IntSerializer.deserialize( intBuffer ); |
| |
| buffer.reset(); |
| |
| System.out.println( "PageIO[" + Long.toHexString( pageIo.getOffset() ) + "], size = " + size + ", NEXT PageIO:" |
| + Long.toHexString( nextOffset ) ); |
| System.out.println( " 0 1 2 3 4 5 6 7 8 9 A B C D E F " ); |
| System.out.println( "+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+" ); |
| |
| for ( int i = 0; i < buffer.limit(); i += 16 ) |
| { |
| System.out.print( "|" ); |
| |
| for ( int j = 0; j < 16; j++ ) |
| { |
| System.out.print( dump( buffer.get() ) ); |
| |
| if ( j == 15 ) |
| { |
| System.out.println( "|" ); |
| } |
| else |
| { |
| System.out.print( " " ); |
| } |
| } |
| } |
| |
| System.out.println( "+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+" ); |
| |
| buffer.reset(); |
| } |
| |
| |
| /** |
| * Dump the RecordManager file |
| * @throws IOException |
| */ |
| public void dump() |
| { |
| System.out.println( "/---------------------------- Dump ----------------------------\\" ); |
| |
| try |
| { |
| RandomAccessFile randomFile = new RandomAccessFile( file, "r" ); |
| FileChannel fileChannel = randomFile.getChannel(); |
| |
| ByteBuffer recordManagerHeader = ByteBuffer.allocate( RECORD_MANAGER_HEADER_SIZE ); |
| |
| // load the RecordManager header |
| fileChannel.read( recordManagerHeader ); |
| |
| recordManagerHeader.rewind(); |
| |
| // The page size |
| long fileSize = fileChannel.size(); |
| int pageSize = recordManagerHeader.getInt(); |
| long nbPages = fileSize / pageSize; |
| |
| // The number of managed B-trees |
| int nbBtree = recordManagerHeader.getInt(); |
| |
| // The first free page |
| long firstFreePage = recordManagerHeader.getLong(); |
| |
| // The current B-tree of B-trees |
| long currentBtreeOfBtreesPage = recordManagerHeader.getLong(); |
| |
| // The previous B-tree of B-trees |
| long previousBtreeOfBtreesPage = recordManagerHeader.getLong(); |
| |
| // The current CopiedPages B-tree |
| long currentCopiedPagesBtreePage = recordManagerHeader.getLong(); |
| |
| // The previous CopiedPages B-tree |
| long previousCopiedPagesBtreePage = recordManagerHeader.getLong(); |
| |
| System.out.println( " RecordManager" ); |
| System.out.println( " -------------" ); |
| System.out.println( " Size = 0x" + Long.toHexString( fileSize ) ); |
| System.out.println( " NbPages = " + nbPages ); |
| System.out.println( " Header " ); |
| System.out.println( " page size : " + pageSize ); |
| System.out.println( " nbTree : " + nbBtree ); |
| System.out.println( " firstFreePage : 0x" + Long.toHexString( firstFreePage ) ); |
| System.out.println( " current BOB : 0x" + Long.toHexString( currentBtreeOfBtreesPage ) ); |
| System.out.println( " previous BOB : 0x" + Long.toHexString( previousBtreeOfBtreesPage ) ); |
| System.out.println( " current CopiedPages : 0x" + Long.toHexString( currentCopiedPagesBtreePage ) ); |
| System.out.println( " previous CopiedPages : 0x" + Long.toHexString( previousCopiedPagesBtreePage ) ); |
| |
| // Dump the Free pages list |
| dumpFreePages( firstFreePage ); |
| |
| // Dump the B-tree of B-trees |
| dumpBtreeHeader( currentBtreeOfBtreesPage ); |
| |
| // Dump the previous B-tree of B-trees if any |
| if ( previousBtreeOfBtreesPage != NO_PAGE ) |
| { |
| dumpBtreeHeader( previousBtreeOfBtreesPage ); |
| } |
| |
| // Dump the CopiedPages B-tree |
| dumpBtreeHeader( currentCopiedPagesBtreePage ); |
| |
| // Dump the previous B-tree of B-trees if any |
| if ( previousCopiedPagesBtreePage != NO_PAGE ) |
| { |
| dumpBtreeHeader( previousCopiedPagesBtreePage ); |
| } |
| |
| // Dump all the user's B-tree |
| randomFile.close(); |
| System.out.println( "\\---------------------------- Dump ----------------------------/" ); |
| } |
| catch ( IOException ioe ) |
| { |
| System.out.println( "Exception while dumping the file : " + ioe.getMessage() ); |
| } |
| } |
| |
| |
| /** |
| * Dump the free pages |
| */ |
| private void dumpFreePages( long freePageOffset ) throws EndOfFileExceededException, IOException |
| { |
| System.out.println( "\n FreePages : " ); |
| int pageNb = 1; |
| |
| while ( freePageOffset != NO_PAGE ) |
| { |
| PageIO pageIo = fetchPage( freePageOffset ); |
| |
| System.out.println( " freePage[" + pageNb + "] : 0x" + Long.toHexString( pageIo.getOffset() ) ); |
| |
| freePageOffset = pageIo.getNextPage(); |
| pageNb++; |
| } |
| } |
| |
| |
| /** |
| * Dump a B-tree Header |
| */ |
| private long dumpBtreeHeader( long btreeOffset ) throws EndOfFileExceededException, IOException |
| { |
| // First read the B-tree header |
| PageIO[] pageIos = readPageIOs( btreeOffset, Long.MAX_VALUE ); |
| |
| long dataPos = 0L; |
| |
| // The B-tree current revision |
| long revision = readLong( pageIos, dataPos ); |
| dataPos += LONG_SIZE; |
| |
| // The nb elems in the tree |
| long nbElems = readLong( pageIos, dataPos ); |
| dataPos += LONG_SIZE; |
| |
| // The B-tree rootPage offset |
| long rootPageOffset = readLong( pageIos, dataPos ); |
| dataPos += LONG_SIZE; |
| |
| // The B-tree page size |
| int btreePageSize = readInt( pageIos, dataPos ); |
| dataPos += INT_SIZE; |
| |
| // The tree name |
| ByteBuffer btreeNameBytes = readBytes( pageIos, dataPos ); |
| dataPos += INT_SIZE + btreeNameBytes.limit(); |
| String btreeName = Strings.utf8ToString( btreeNameBytes ); |
| |
| // The keySerializer FQCN |
| ByteBuffer keySerializerBytes = readBytes( pageIos, dataPos ); |
| dataPos += INT_SIZE + keySerializerBytes.limit(); |
| |
| String keySerializerFqcn = ""; |
| |
| if ( keySerializerBytes != null ) |
| { |
| keySerializerFqcn = Strings.utf8ToString( keySerializerBytes ); |
| } |
| |
| // The valueSerialier FQCN |
| ByteBuffer valueSerializerBytes = readBytes( pageIos, dataPos ); |
| |
| String valueSerializerFqcn = ""; |
| dataPos += INT_SIZE + valueSerializerBytes.limit(); |
| |
| if ( valueSerializerBytes != null ) |
| { |
| valueSerializerFqcn = Strings.utf8ToString( valueSerializerBytes ); |
| } |
| |
| // The B-tree allowDuplicates flag |
| int allowDuplicates = readInt( pageIos, dataPos ); |
| boolean dupsAllowed = allowDuplicates != 0; |
| |
| dataPos += INT_SIZE; |
| |
| // System.out.println( "\n B-Tree " + btreeName ); |
| // System.out.println( " ------------------------- " ); |
| |
| // System.out.println( " nbPageIOs[" + pageIos.length + "] = " + pageIoList ); |
| if ( LOG.isDebugEnabled() ) |
| { |
| StringBuilder sb = new StringBuilder(); |
| boolean isFirst = true; |
| |
| for ( PageIO pageIo : pageIos ) |
| { |
| if ( isFirst ) |
| { |
| isFirst = false; |
| } |
| else |
| { |
| sb.append( ", " ); |
| } |
| |
| sb.append( "0x" ).append( Long.toHexString( pageIo.getOffset() ) ); |
| } |
| |
| String pageIoList = sb.toString(); |
| |
| LOG.debug( " PageIOs[{}] = {}", pageIos.length, pageIoList ); |
| |
| // System.out.println( " dataSize = "+ pageIos[0].getSize() ); |
| LOG.debug( " dataSize = {}", pageIos[0].getSize() ); |
| |
| LOG.debug( " B-tree '{}'", btreeName ); |
| LOG.debug( " revision : {}", revision ); |
| LOG.debug( " nbElems : {}", nbElems ); |
| LOG.debug( " rootPageOffset : 0x{}", Long.toHexString( rootPageOffset ) ); |
| LOG.debug( " B-tree page size : {}", btreePageSize ); |
| LOG.debug( " keySerializer : '{}'", keySerializerFqcn ); |
| LOG.debug( " valueSerializer : '{}'", valueSerializerFqcn ); |
| LOG.debug( " dups allowed : {}", dupsAllowed ); |
| // |
| // System.out.println( " B-tree '" + btreeName + "'" ); |
| // System.out.println( " revision : " + revision ); |
| // System.out.println( " nbElems : " + nbElems ); |
| // System.out.println( " rootPageOffset : 0x" + Long.toHexString( rootPageOffset ) ); |
| // System.out.println( " B-tree page size : " + btreePageSize ); |
| // System.out.println( " keySerializer : " + keySerializerFqcn ); |
| // System.out.println( " valueSerializer : " + valueSerializerFqcn ); |
| // System.out.println( " dups allowed : " + dupsAllowed ); |
| } |
| |
| return rootPageOffset; |
| } |
| |
| |
| /** |
| * Get the number of managed trees. We don't count the CopiedPage B-tree and the B-tree of B-trees |
| * |
| * @return The number of managed B-trees |
| */ |
| public int getNbManagedTrees() |
| { |
| return nbBtree; |
| } |
| |
| |
| /** |
| * Get the managed B-trees. We don't return the CopiedPage B-tree nor the B-tree of B-trees. |
| * |
| * @return The managed B-trees |
| */ |
| public Set<String> getManagedTrees() |
| { |
| Set<String> btrees = new HashSet<String>( managedBtrees.keySet() ); |
| |
| return btrees; |
| } |
| |
| |
| /** |
| * Stores the copied pages into the CopiedPages B-tree |
| * |
| * @param name The B-tree name |
| * @param revision The revision |
| * @param copiedPages The pages that have been copied while creating this revision |
| * @throws IOException If we weren't able to store the data on disk |
| */ |
| /* No Qualifier */void storeCopiedPages( String name, long revision, long[] copiedPages ) throws IOException |
| { |
| RevisionName revisionName = new RevisionName( revision, name ); |
| |
| copiedPageMap.put( revisionName, copiedPages ); |
| } |
| |
| |
| /** |
| * Store a reference to an old rootPage into the Revision B-tree |
| * |
| * @param btree The B-tree we want to keep an old RootPage for |
| * @param rootPage The old rootPage |
| * @throws IOException If we have an issue while writing on disk |
| */ |
| /* No qualifier */<K, V> void storeRootPage( BTree<K, V> btree, Page<K, V> rootPage ) throws IOException |
| { |
| if ( !isKeepRevisions() ) |
| { |
| return; |
| } |
| |
| NameRevision nameRevision = new NameRevision( btree.getName(), rootPage.getRevision() ); |
| |
| ( ( AbstractBTree<NameRevision, Long> ) btreeOfBtrees ).insert( nameRevision, |
| ( ( AbstractPage<K, V> ) rootPage ).getOffset(), 0 ); |
| |
| if ( LOG_CHECK.isDebugEnabled() ) |
| { |
| MavibotInspector.check( this ); |
| } |
| } |
| |
| |
| /** |
| * Fetch the rootPage of a given B-tree for a given revision. |
| * |
| * @param btree The B-tree we are interested in |
| * @param revision The revision we want to get back |
| * @return The rootPage for this B-tree and this revision, if any |
| * @throws KeyNotFoundException If we can't find the rootPage for this revision and this B-tree |
| * @throws IOException If we had an ise while accessing the data on disk |
| */ |
| /* No qualifier */<K, V> Page<K, V> getRootPage( BTree<K, V> btree, long revision ) throws KeyNotFoundException, |
| IOException |
| { |
| if ( btree.getRevision() == revision ) |
| { |
| // We are asking for the current revision |
| return btree.getRootPage(); |
| } |
| |
| // Get the B-tree header offset |
| NameRevision nameRevision = new NameRevision( btree.getName(), revision ); |
| long btreeHeaderOffset = btreeOfBtrees.get( nameRevision ); |
| |
| // get the B-tree rootPage |
| Page<K, V> btreeRoot = readRootPage( btree, btreeHeaderOffset ); |
| |
| return btreeRoot; |
| } |
| |
| |
| /** |
| * Read a root page from the B-tree header offset |
| */ |
| private <K, V> Page<K, V> readRootPage( BTree<K, V> btree, long btreeHeaderOffset ) |
| throws EndOfFileExceededException, IOException |
| { |
| // Read the B-tree header pages on disk |
| PageIO[] btreeHeaderPageIos = readPageIOs( btreeHeaderOffset, Long.MAX_VALUE ); |
| long dataPos = LONG_SIZE + LONG_SIZE; |
| |
| // The B-tree rootPage offset |
| long rootPageOffset = readLong( btreeHeaderPageIos, dataPos ); |
| |
| // Read the rootPage pages on disk |
| PageIO[] rootPageIos = readPageIOs( rootPageOffset, Long.MAX_VALUE ); |
| |
| // Now, convert it to a Page |
| Page<K, V> btreeRoot = readPage( btree, rootPageIos ); |
| |
| return btreeRoot; |
| } |
| |
| |
| /** |
| * Get one managed trees, knowing its name. |
| * |
| * @param name The B-tree name we are looking for |
| * @return The managed B-trees |
| */ |
| public <K, V> BTree<K, V> getManagedTree( String name ) |
| { |
| return ( BTree<K, V> ) managedBtrees.get( name ); |
| } |
| |
| |
| /** |
| * Move a list of pages to the free page list. A logical page is associated with one |
| * or more physical PageIOs, which are on the disk. We have to move all those PagIO instances |
| * to the free list, and do the same in memory (we try to keep a reference to a set of |
| * free pages. |
| * |
| * @param btree The B-tree which were owning the pages |
| * @param revision The current revision |
| * @param pages The pages to free |
| * @throws IOException If we had a problem while updating the file |
| * @throws EndOfFileExceededException If we tried to write after the end of the file |
| */ |
| /* Package protected */<K, V> void freePages( BTree<K, V> btree, long revision, List<Page<K, V>> pages ) |
| throws EndOfFileExceededException, IOException |
| { |
| if ( ( pages == null ) || pages.isEmpty() ) |
| { |
| return; |
| } |
| |
| if ( !keepRevisions ) |
| { |
| // if the B-tree doesn't keep revisions, we can safely move |
| // the pages to the freed page list. |
| if ( LOG.isDebugEnabled() ) |
| { |
| LOG.debug( "Freeing the following pages :" ); |
| |
| for ( Page<K, V> page : pages ) |
| { |
| LOG.debug( " {}", page ); |
| } |
| } |
| |
| for ( Page<K, V> page : pages ) |
| { |
| long pageOffset = ( ( AbstractPage<K, V> ) page ).getOffset(); |
| |
| PageIO[] pageIos = readPageIOs( pageOffset, Long.MAX_VALUE ); |
| |
| for ( PageIO pageIo : pageIos ) |
| { |
| freedPages.add( pageIo ); |
| } |
| } |
| } |
| else |
| { |
| // We are keeping revisions of standard B-trees, so we move the pages to the CopiedPages B-tree |
| // but only for non managed B-trees |
| if ( LOG.isDebugEnabled() ) |
| { |
| LOG.debug( "Moving the following pages to the CopiedBtree :" ); |
| |
| for ( Page<K, V> page : pages ) |
| { |
| LOG.debug( " {}", page ); |
| } |
| } |
| |
| long[] pageOffsets = new long[pages.size()]; |
| int pos = 0; |
| |
| for ( Page<K, V> page : pages ) |
| { |
| pageOffsets[pos++] = ( ( AbstractPage<K, V> ) page ).offset; |
| } |
| |
| if ( ( btree.getType() != BTreeTypeEnum.BTREE_OF_BTREES ) |
| && ( btree.getType() != BTreeTypeEnum.COPIED_PAGES_BTREE ) ) |
| { |
| // Deal with standard B-trees |
| RevisionName revisionName = new RevisionName( revision, btree.getName() ); |
| |
| copiedPageMap.put( revisionName, pageOffsets ); |
| } |
| else |
| { |
| // Managed B-trees : we simply free the copied pages |
| for ( long pageOffset : pageOffsets ) |
| { |
| PageIO[] pageIos = readPageIOs( pageOffset, Long.MAX_VALUE ); |
| |
| for ( PageIO pageIo : pageIos ) |
| { |
| freedPages.add( pageIo ); |
| } |
| } |
| } |
| } |
| } |
| |
| |
| /** |
| * Add a PageIO to the list of free PageIOs |
| * |
| * @param pageIo The page to free |
| * @throws IOException If we weren't capable of updating the file |
| */ |
| private void free( PageIO pageIo ) throws IOException |
| { |
| freePageLock.lock(); |
| |
| // We add the Page's PageIOs before the |
| // existing free pages. |
| // Link it to the first free page |
| pageIo.setNextPage( firstFreePage ); |
| |
| LOG.debug( "Flushing the first free page" ); |
| |
| // And flush it to disk |
| //FIXME can be flushed last after releasing the lock |
| flushPages( pageIo ); |
| |
| // We can update the firstFreePage offset |
| firstFreePage = pageIo.getOffset(); |
| |
| freePageLock.unlock(); |
| } |
| |
| |
| /** |
| * Add an array of PageIOs to the list of free PageIOs |
| * |
| * @param offsets The offsets of the pages whose associated PageIOs will be fetched and freed. |
| * @throws IOException If we weren't capable of updating the file |
| */ |
| public void free( long[] offsets ) throws IOException |
| { |
| List<PageIO> pageIos = new ArrayList<PageIO>(); |
| int pageIndex = 0; |
| for ( int i = 0; i < offsets.length; i++ ) |
| { |
| PageIO[] ios = readPageIOs( offsets[i], Long.MAX_VALUE ); |
| for ( PageIO io : ios ) |
| { |
| pageIos.add( io ); |
| |
| if ( pageIndex > 0 ) |
| { |
| pageIos.get( pageIndex - 1 ).setNextPage( io.getOffset() ); |
| } |
| |
| pageIndex++; |
| } |
| } |
| |
| freePageLock.lock(); |
| |
| // We add the Page's PageIOs before the |
| // existing free pages. |
| // Link it to the first free page |
| pageIos.get( pageIndex - 1 ).setNextPage( firstFreePage ); |
| |
| LOG.debug( "Flushing the first free page" ); |
| |
| // And flush it to disk |
| //FIXME can be flushed last after releasing the lock |
| flushPages( pageIos.toArray( new PageIO[0] ) ); |
| |
| // We can update the firstFreePage offset |
| firstFreePage = pageIos.get( 0 ).getOffset(); |
| |
| freePageLock.unlock(); |
| } |
| |
| |
| /** |
| * @return the keepRevisions flag |
| */ |
| public boolean isKeepRevisions() |
| { |
| return keepRevisions; |
| } |
| |
| |
| /** |
| * @param keepRevisions the keepRevisions flag to set |
| */ |
| public void setKeepRevisions( boolean keepRevisions ) |
| { |
| this.keepRevisions = keepRevisions; |
| } |
| |
| |
| /** |
| * Creates a B-tree and automatically adds it to the list of managed btrees |
| * |
| * @param name the name of the B-tree |
| * @param keySerializer key serializer |
| * @param valueSerializer value serializer |
| * @param allowDuplicates flag for allowing duplicate keys |
| * @return a managed B-tree |
| * @throws IOException If we weren't able to update the file on disk |
| * @throws BTreeAlreadyManagedException If the B-tree is already managed |
| */ |
| @SuppressWarnings("all") |
| public <K, V> BTree<K, V> addBTree( String name, ElementSerializer<K> keySerializer, |
| ElementSerializer<V> valueSerializer, boolean allowDuplicates ) |
| throws IOException, BTreeAlreadyManagedException |
| { |
| PersistedBTreeConfiguration config = new PersistedBTreeConfiguration(); |
| |
| config.setName( name ); |
| config.setKeySerializer( keySerializer ); |
| config.setValueSerializer( valueSerializer ); |
| config.setAllowDuplicates( allowDuplicates ); |
| |
| BTree btree = new PersistedBTree( config ); |
| manage( btree ); |
| |
| if ( LOG_CHECK.isDebugEnabled() ) |
| { |
| MavibotInspector.check( this ); |
| } |
| |
| return btree; |
| } |
| |
| |
| /** |
| * Add a newly closd transaction into the closed transaction queue |
| */ |
| /* no qualifier */<K, V> void releaseTransaction( ReadTransaction<K, V> readTransaction ) |
| { |
| RevisionName revisionName = new RevisionName( |
| readTransaction.getRevision(), |
| readTransaction.getBtreeHeader().getBtree().getName() ); |
| //closedTransactionsQueue.add( revisionName ); |
| } |
| |
| |
| /** |
| * Get the current BTreeHeader for a given Btree. It might not exist |
| */ |
| public BTreeHeader getBTreeHeader( String name ) |
| { |
| // Get a lock |
| btreeHeadersLock.readLock().lock(); |
| |
| // get the current BTree Header for this BTree and revision |
| BTreeHeader<?, ?> btreeHeader = currentBTreeHeaders.get( name ); |
| |
| // And unlock |
| btreeHeadersLock.readLock().unlock(); |
| |
| return btreeHeader; |
| } |
| |
| |
| /** |
| * Get the new BTreeHeader for a given Btree. It might not exist |
| */ |
| public BTreeHeader getNewBTreeHeader( String name ) |
| { |
| // get the current BTree Header for this BTree and revision |
| BTreeHeader<?, ?> btreeHeader = newBTreeHeaders.get( name ); |
| |
| return btreeHeader; |
| } |
| |
| |
| /** |
| * {@inheritDoc} |
| */ |
| public void updateNewBTreeHeaders( BTreeHeader btreeHeader ) |
| { |
| newBTreeHeaders.put( btreeHeader.getBtree().getName(), btreeHeader ); |
| } |
| |
| |
| /** |
| * Swap the current BtreeHeader map with the new one. This method will only |
| * be called in a single trhead, when the current transaction will be committed. |
| */ |
| private void swapCurrentBtreeHeaders() |
| { |
| // Copy the reference to the current BtreeHeader Map |
| Map<String, BTreeHeader<?, ?>> tmp = currentBTreeHeaders; |
| |
| // Get a write lock |
| btreeHeadersLock.writeLock().lock(); |
| |
| // Swap the new BTreeHeader Map |
| currentBTreeHeaders = newBTreeHeaders; |
| |
| // And unlock |
| btreeHeadersLock.writeLock().unlock(); |
| |
| // Last, not least, clear the Map and reinject the latest revision in it |
| tmp.clear(); |
| tmp.putAll( currentBTreeHeaders ); |
| |
| // And update the new BTreeHeader map |
| newBTreeHeaders = tmp; |
| } |
| |
| |
| /** |
| * revert the new BTreeHeaders Map to the current BTreeHeader Map. This method |
| * is called when we have to rollback a transaction. |
| */ |
| private void revertBtreeHeaders() |
| { |
| // Clean up teh new BTreeHeaders Map |
| newBTreeHeaders.clear(); |
| |
| // Reinject the latest revision in it |
| newBTreeHeaders.putAll( currentBTreeHeaders ); |
| } |
| |
| |
| /** |
| * Loads a B-tree holding the values of a duplicate key |
| * This tree is also called as dups tree or sub tree |
| * |
| * @param offset the offset of the B-tree header |
| * @return the deserialized B-tree |
| */ |
| /* No qualifier */<K, V> BTree<V, V> loadDupsBtree( long btreeHeaderOffset, BTree<K, V> parentBtree ) |
| { |
| PageIO[] pageIos = null; |
| try |
| { |
| pageIos = readPageIOs( btreeHeaderOffset, Long.MAX_VALUE ); |
| |
| BTree<V, V> subBtree = BTreeFactory.<V, V> createPersistedBTree( BTreeTypeEnum.PERSISTED_SUB ); |
| loadBtree( pageIos, subBtree, parentBtree ); |
| |
| return subBtree; |
| } |
| catch ( Exception e ) |
| { |
| // should not happen |
| throw new BTreeCreationException( e ); |
| } |
| } |
| |
| |
| private void checkFreePages() throws EndOfFileExceededException, IOException |
| { |
| //System.out.println( "Checking the free pages, starting from " + Long.toHexString( firstFreePage ) ); |
| |
| // read all the free pages, add them into a set, to be sure we don't have a cycle |
| Set<Long> freePageOffsets = new HashSet<Long>(); |
| |
| long currentFreePageOffset = firstFreePage; |
| |
| while ( currentFreePageOffset != NO_PAGE ) |
| { |
| //System.out.println( "Next page offset :" + Long.toHexString( currentFreePageOffset ) ); |
| |
| if ( ( currentFreePageOffset % pageSize ) != 0 ) |
| { |
| throw new InvalidOffsetException( "Wrong offset : " + Long.toHexString( currentFreePageOffset ) ); |
| } |
| |
| if ( freePageOffsets.contains( currentFreePageOffset ) ) |
| { |
| throw new InvalidOffsetException( "Offset : " + Long.toHexString( currentFreePageOffset ) |
| + " already read, there is a cycle" ); |
| } |
| |
| freePageOffsets.add( currentFreePageOffset ); |
| PageIO pageIO = fetchPage( currentFreePageOffset ); |
| |
| currentFreePageOffset = pageIO.getNextPage(); |
| } |
| |
| return; |
| } |
| |
| |
| /** |
| * sets the threshold of the number of commits to be performed before |
| * reclaiming the free pages. |
| * |
| * @param spaceReclaimerThreshold the number of commits before the reclaimer runs |
| */ |
| public void setSpaceReclaimerThreshold( int spaceReclaimerThreshold ) |
| { |
| this.spaceReclaimerThreshold = spaceReclaimerThreshold; |
| } |
| |
| |
| /** |
| * @see Object#toString() |
| */ |
| public String toString() |
| { |
| StringBuilder sb = new StringBuilder(); |
| |
| sb.append( "RM free pages : [" ); |
| |
| if ( firstFreePage != NO_PAGE ) |
| { |
| long current = firstFreePage; |
| boolean isFirst = true; |
| |
| while ( current != NO_PAGE ) |
| { |
| if ( isFirst ) |
| { |
| isFirst = false; |
| } |
| else |
| { |
| sb.append( ", " ); |
| } |
| |
| PageIO pageIo; |
| |
| try |
| { |
| pageIo = fetchPage( current ); |
| sb.append( pageIo.getOffset() ); |
| current = pageIo.getNextPage(); |
| } |
| catch ( EndOfFileExceededException e ) |
| { |
| e.printStackTrace(); |
| } |
| catch ( IOException e ) |
| { |
| e.printStackTrace(); |
| } |
| |
| } |
| } |
| |
| sb.append( "]" ); |
| |
| return sb.toString(); |
| } |
| } |