blob: 5782a56a9c5069420115139db9120949bcb4de27 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
* A class used to bulk load a BTree. It will allow the load of N elements in
* a given BTree without to have to inject one by one, saving a lot of time.
* The second advantage is that the btree will be dense (the leaves will be
* complete, except the last one).
* This class can also be used to compact a BTree.
* @author <a href="">Apache Directory Project</a>
public class BulkLoader<K, V>
private BulkLoader()
static enum LevelEnum
* A private class used to store the temporary sorted file. It's used by
* the bulkLoader
private static class SortedFile
/** the file that contains the values */
private File file;
/** The number of stored values */
private int nbValues;
/** A constructor for this class */
/*No Qualifier*/SortedFile( File file, int nbValues )
this.file = file;
this.nbValues = nbValues;
* Process the data, and creates files to store them sorted if necessary, or store them
* TODO readElements.
* @param btree
* @param iterator
* @param sortedFiles
* @param tuples
* @param chunkSize
* @return
* @throws IOException
private static <K, V> int readElements( BTree<K, V> btree, Iterator<Tuple<K, V>> iterator, List<File> sortedFiles,
List<Tuple<K, V>> tuples, int chunkSize ) throws IOException
int nbRead = 0;
int nbIteration = 0;
int nbElems = 0;
boolean inMemory = true;
Set<K> keys = new HashSet<K>();
while ( true )
// Read up to chukSize elements
while ( iterator.hasNext() && ( nbRead < chunkSize ) )
Tuple<K, V> tuple =;
tuples.add( tuple );
if ( !keys.contains( tuple.getKey() ) )
keys.add( tuple.getKey() );
if ( nbRead < chunkSize )
if ( nbIteration != 1 )
// Flush the sorted data on disk and exit
inMemory = false;
sortedFiles.add( flushToDisk( nbIteration, tuples, btree ) );
// Update the number of read elements
nbElems += nbRead;
if ( !iterator.hasNext() )
// special case : we have exactly chunkSize elements in the incoming data
if ( nbIteration > 1 )
// Flush the sorted data on disk and exit
inMemory = false;
sortedFiles.add( flushToDisk( nbIteration, tuples, btree ) );
// We have read all the data in one round trip, let's get out, no need
// to store the data on disk
// Update the number of read elements
nbElems += nbRead;
// We have read chunkSize elements, we have to sort them on disk
nbElems += nbRead;
nbRead = 0;
sortedFiles.add( flushToDisk( nbIteration, tuples, btree ) );
if ( !inMemory )
return nbElems;
* Read all the sorted files, and inject them into one single big file containing all the
* sorted and merged elements.
* @throws IOException
private static <K, V> Tuple<Iterator<Tuple<K, Set<V>>>, SortedFile> processFiles( BTree<K, V> btree,
Iterator<Tuple<K, Set<V>>> dataIterator ) throws IOException
File file = File.createTempFile( "sortedUnique", "data" );
FileOutputStream fos = new FileOutputStream( file );
// Number of read elements
int nbReads = 0;
// Flush the tuples on disk
while ( dataIterator.hasNext() )
// grab a tuple
Tuple<K, Set<V>> tuple =;
// Serialize the key
byte[] bytesKey = btree.getKeySerializer().serialize( tuple.key );
fos.write( IntSerializer.serialize( bytesKey.length ) );
fos.write( bytesKey );
// Serialize the number of values
int nbValues = tuple.getValue().size();
fos.write( IntSerializer.serialize( nbValues ) );
// Serialize the values
for ( V value : tuple.getValue() )
byte[] bytesValue = btree.getValueSerializer().serialize( value );
// Serialize the value
fos.write( IntSerializer.serialize( bytesValue.length ) );
fos.write( bytesValue );
FileInputStream fis = new FileInputStream( file );
Iterator<Tuple<K, Set<V>>> uniqueIterator = createUniqueFileIterator( btree, fis );
SortedFile sortedFile = new SortedFile( file, nbReads );
Tuple<Iterator<Tuple<K, Set<V>>>, SortedFile> result = new Tuple<Iterator<Tuple<K, Set<V>>>, SortedFile>(
uniqueIterator, sortedFile );
return result;
* Bulk Load data into a persisted BTree
* @param btree The persisted BTree in which we want to load the data
* @param iterator The iterator over the data to bulkload
* @param chunkSize The number of elements we may store in memory at each iteration
* @throws IOException If there is a problem while processing the data
public static <K, V> BTree<K, V> load( BTree<K, V> btree, Iterator<Tuple<K, V>> iterator, int chunkSize )
throws IOException
if ( btree == null )
throw new RuntimeException( "Invalid BTree : it's null" );
if ( iterator == null )
// Nothing to do...
return null;
// Iterate through the elements by chunk
boolean inMemory = true;
// The list of files we will use to store the sorted chunks
List<File> sortedFiles = new ArrayList<File>();
// An array of chukSize tuple max
List<Tuple<K, V>> tuples = new ArrayList<Tuple<K, V>>( chunkSize );
// Now, start to read all the tuples to sort them. We may use intermediate files
// for that purpose if we hit the threshold.
int nbElems = readElements( btree, iterator, sortedFiles, tuples, chunkSize );
// If the tuple list is empty, we have to process the load based on files, not in memory
if ( nbElems > 0 )
inMemory = tuples.size() > 0;
// Now that we have processed all the data, we can start storing them in the btree
Iterator<Tuple<K, Set<V>>> dataIterator = null;
FileInputStream[] streams = null;
BTree<K, V> resultBTree = null;
if ( inMemory )
// Here, we have all the data in memory, no need to merge files
// We will build a simple iterator over the data
dataIterator = createTupleIterator( btree, tuples );
resultBTree = bulkLoad( btree, dataIterator, nbElems );
// We first have to build an iterator over the files
int nbFiles = sortedFiles.size();
streams = new FileInputStream[nbFiles];
for ( int i = 0; i < nbFiles; i++ )
streams[i] = new FileInputStream( sortedFiles.get( i ) );
dataIterator = createIterator( btree, streams );
// Process the files, and construct one single file with an iterator
Tuple<Iterator<Tuple<K, Set<V>>>, SortedFile> result = processFiles( btree, dataIterator );
resultBTree = bulkLoad( btree, result.key, result.value.nbValues );
// Ok, we have an iterator over sorted elements, we can now load them in the
// target btree.
// Now, close the FileInputStream, and delete them if we have some
if ( !inMemory )
int nbFiles = sortedFiles.size();
for ( int i = 0; i < nbFiles; i++ )
sortedFiles.get( i ).delete();
return resultBTree;
* Creates a node leaf LevelInfo based on the number of elements in the lower level. We can store
* up to PageSize + 1 references to pages in a node.
/* no qualifier*/static <K, V> LevelInfo<K, V> computeLevel( BTree<K, V> btree, int nbElems, LevelEnum levelType )
int pageSize = btree.getPageSize();
int incrementNode = 0;
if ( levelType == LevelEnum.NODE )
incrementNode = 1;
LevelInfo<K, V> level = new LevelInfo<K, V>();
level.setType( ( levelType == LevelEnum.NODE ) );
level.setNbElems( nbElems );
level.setNbPages( nbElems / ( pageSize + incrementNode ) );
level.setLevelNumber( 0 );
level.setNbAddedElems( 0 );
level.setCurrentPos( 0 );
// Create the first level page
if ( nbElems <= pageSize + incrementNode )
if ( nbElems % ( pageSize + incrementNode ) != 0 )
level.setNbPages( 1 );
level.setNbElemsLimit( nbElems );
if ( level.isNode() )
level.setCurrentPage( BTreeFactory.createNode( btree, 0L, nbElems - 1 ) );
level.setCurrentPage( BTreeFactory.createLeaf( btree, 0L, nbElems ) );
int remaining = nbElems % ( pageSize + incrementNode );
if ( remaining == 0 )
level.setNbElemsLimit( nbElems );
if ( level.isNode() )
level.setCurrentPage( BTreeFactory.createNode( btree, 0L, pageSize ) );
level.setCurrentPage( BTreeFactory.createLeaf( btree, 0L, pageSize ) );
if ( remaining < ( pageSize / 2 ) + incrementNode )
level.setNbElemsLimit( nbElems - remaining - ( pageSize + incrementNode ) );
if ( level.getNbElemsLimit() > 0 )
if ( level.isNode() )
level.setCurrentPage( BTreeFactory.createNode( btree, 0L, pageSize ) );
level.setCurrentPage( BTreeFactory.createLeaf( btree, 0L, pageSize ) );
if ( level.isNode() )
.setCurrentPage( BTreeFactory.createNode( btree, 0L, ( pageSize / 2 ) + remaining - 1 ) );
level.setCurrentPage( BTreeFactory.createLeaf( btree, 0L, ( pageSize / 2 ) + remaining ) );
level.setNbElemsLimit( nbElems - remaining );
if ( level.isNode() )
level.setCurrentPage( BTreeFactory.createNode( btree, 0L, pageSize ) );
level.setCurrentPage( BTreeFactory.createLeaf( btree, 0L, pageSize ) );
return level;
* Compute the number of pages necessary to store all the elements per level. The resulting list is
* reversed ( ie the leaves are on the left, the root page on the right.
/* No Qualifier */static <K, V> List<LevelInfo<K, V>> computeLevels( BTree<K, V> btree, int nbElems )
List<LevelInfo<K, V>> levelList = new ArrayList<LevelInfo<K, V>>();
// Compute the leaves info
LevelInfo<K, V> leafLevel = computeLevel( btree, nbElems, LevelEnum.LEAF );
levelList.add( leafLevel );
int nbPages = leafLevel.getNbPages();
int levelNumber = 1;
while ( nbPages > 1 )
// Compute the Nodes info
LevelInfo<K, V> nodeLevel = computeLevel( btree, nbPages, LevelEnum.NODE );
nodeLevel.setLevelNumber( levelNumber++ );
levelList.add( nodeLevel );
nbPages = nodeLevel.getNbPages();
return levelList;
* Inject a tuple into a leaf
private static <K, V> void injectInLeaf( BTree<K, V> btree, Tuple<K, Set<V>> tuple, LevelInfo<K, V> leafLevel )
PersistedLeaf<K, V> leaf = ( PersistedLeaf<K, V> ) leafLevel.getCurrentPage();
KeyHolder<K> keyHolder = new PersistedKeyHolder<K>( btree.getKeySerializer(), tuple.getKey() );
leaf.setKey( leafLevel.getCurrentPos(), keyHolder );
if ( btree.getType() != BTreeTypeEnum.PERSISTED_SUB )
ValueHolder<V> valueHolder = new PersistedValueHolder<V>( btree, ( V[] ) tuple.getValue().toArray() );
leaf.setValue( leafLevel.getCurrentPos(), valueHolder );
private static <K, V> int computeNbElemsLeaf( BTree<K, V> btree, LevelInfo<K, V> levelInfo )
int pageSize = btree.getPageSize();
int remaining = levelInfo.getNbElems() - levelInfo.getNbAddedElems();
if ( remaining < pageSize )
return remaining;
else if ( remaining == pageSize )
return pageSize;
else if ( remaining > levelInfo.getNbElems() - levelInfo.getNbElemsLimit() )
return pageSize;
return remaining - pageSize / 2;
* Compute the number of nodes necessary to store all the elements.
/* No qualifier */int computeNbElemsNode( BTree<K, V> btree, LevelInfo<K, V> levelInfo )
int pageSize = btree.getPageSize();
int remaining = levelInfo.getNbElems() - levelInfo.getNbAddedElems();
if ( remaining < pageSize + 1 )
return remaining;
else if ( remaining == pageSize + 1 )
return pageSize + 1;
else if ( remaining > levelInfo.getNbElems() - levelInfo.getNbElemsLimit() )
return pageSize + 1;
return remaining - pageSize / 2;
* Inject a page reference into the root page.
private static <K, V> void injectInRoot( BTree<K, V> btree, Page<K, V> page, PageHolder<K, V> pageHolder,
LevelInfo<K, V> level ) throws IOException
PersistedNode<K, V> node = ( PersistedNode<K, V> ) level.getCurrentPage();
if ( ( level.getCurrentPos() == 0 ) && ( node.getPage( 0 ) == null ) )
node.setPageHolder( 0, pageHolder );
// Inject the pageHolder and the page leftmost key
node.setPageHolder( level.getCurrentPos() + 1, pageHolder );
KeyHolder<K> keyHolder = new PersistedKeyHolder<K>( btree.getKeySerializer(), page.getLeftMostKey() );
node.setKey( level.getCurrentPos(), keyHolder );
// Check that we haven't added the last element. If so,
// we have to write the page on disk and update the btree
if ( level.getNbAddedElems() == level.getNbElems() )
PageHolder<K, V> rootHolder = ( ( PersistedBTree<K, V> ) btree ).getRecordManager().writePage(
btree, node, 0L );
( ( PersistedBTree<K, V> ) btree ).setRootPage( rootHolder.getValue() );
* Inject a page reference into a Node. This method will recurse if needed.
private static <K, V> void injectInNode( BTree<K, V> btree, Page<K, V> page, List<LevelInfo<K, V>> levels,
int levelIndex )
throws IOException
int pageSize = btree.getPageSize();
LevelInfo<K, V> level = levels.get( levelIndex );
PersistedNode<K, V> node = ( PersistedNode<K, V> ) level.getCurrentPage();
// We first have to write the page on disk
PageHolder<K, V> pageHolder = ( ( PersistedBTree<K, V> ) btree ).getRecordManager().writePage( btree, page, 0L );
// First deal with a node that has less than PageSize elements at this level.
// It will become the root node.
if ( level.getNbElems() <= pageSize + 1 )
injectInRoot( btree, page, pageHolder, level );
// Now, we have some parent node. We process the 3 different use case :
// o Full node before the limit
// o Node over the limit but with at least N/2 elements
// o Node over the limit but with elements spread into 2 nodes
if ( level.getNbAddedElems() < level.getNbElemsLimit() )
// Ok, we haven't yet reached the incomplete pages (if any).
// Let's add the page reference into the node
// There is one special case : when we are adding the very first page
// reference into a node. In this case, we don't store the key
if ( ( level.getCurrentPos() == 0 ) && ( node.getKey( 0 ) == null ) )
node.setPageHolder( 0, pageHolder );
// Inject the pageHolder and the page leftmost key
node.setPageHolder( level.getCurrentPos(), pageHolder );
KeyHolder<K> keyHolder = new PersistedKeyHolder<K>( btree.getKeySerializer(), page.getLeftMostKey() );
node.setKey( level.getCurrentPos() - 1, keyHolder );
// Now, increment this level nb of added elements
// Check that we haven't added the last element. If so,
// we have to write the page on disk and update the parent's node
if ( level.getNbAddedElems() == level.getNbElems() )
//PageHolder<K, V> rootHolder = ( ( PersistedBTree<K, V> ) btree ).getRecordManager().writePage(
// btree, node, 0L );
//( ( PersistedBTree<K, V> ) btree ).setRootPage( rootHolder.getValue() );
injectInNode( btree, node, levels, levelIndex + 1 );
// Check that we haven't completed the current node, and that this is not the root node.
if ( ( level.getCurrentPos() == pageSize + 1 ) && ( level.getLevelNumber() < levels.size() - 1 ) )
// yes. We have to write the node on disk, update its parent
// and create a new current node
injectInNode( btree, node, levels, levelIndex + 1 );
// The page is full, we have to create a new one, with a size depending on the remaining elements
if ( level.getNbAddedElems() < level.getNbElemsLimit() )
// We haven't reached the limit, create a new full node
level.setCurrentPage( BTreeFactory.createNode( btree, 0L, pageSize ) );
else if ( level.getNbElems() - level.getNbAddedElems() <= pageSize )
level.setCurrentPage( BTreeFactory.createNode( btree, 0L,
level.getNbElems() - level.getNbAddedElems() - 1 ) );
level.setCurrentPage( BTreeFactory.createNode( btree, 0L, ( level.getNbElems() - 1 )
- ( level.getNbAddedElems() + 1 ) - pageSize / 2 ) );
level.setCurrentPos( 0 );
// We are dealing with the last page or the last two pages
// We can have either one single pages which can contain up to pageSize-1 elements
// or with two pages, the first one containing ( nbElems - limit ) - pageSize/2 elements
// and the second one will contain pageSize/2 elements.
if ( level.getNbElems() - level.getNbElemsLimit() > pageSize )
// As the remaining elements are above a page size, they will be spread across
// two pages. We have two cases here, depending on the page we are filling
if ( level.getNbElems() - level.getNbAddedElems() <= pageSize / 2 + 1 )
// As we have less than PageSize/2 elements to write, we are on the second page
if ( ( level.getCurrentPos() == 0 ) && ( node.getKey( 0 ) == null ) )
node.setPageHolder( 0, pageHolder );
// Inject the pageHolder and the page leftmost key
node.setPageHolder( level.getCurrentPos(), pageHolder );
KeyHolder<K> keyHolder = new PersistedKeyHolder<K>( btree.getKeySerializer(),
page.getLeftMostKey() );
node.setKey( level.getCurrentPos() - 1, keyHolder );
// Now, increment this level nb of added elements
// Check if we are done with the page
if ( level.getNbAddedElems() == level.getNbElems() )
// Yes, we have to update the parent
injectInNode( btree, node, levels, levelIndex + 1 );
// This is the first page
if ( ( level.getCurrentPos() == 0 ) && ( node.getKey( 0 ) == null ) )
// First element of the page
node.setPageHolder( 0, pageHolder );
// Any other following elements
// Inject the pageHolder and the page leftmost key
node.setPageHolder( level.getCurrentPos(), pageHolder );
KeyHolder<K> keyHolder = new PersistedKeyHolder<K>( btree.getKeySerializer(),
page.getLeftMostKey() );
node.setKey( level.getCurrentPos() - 1, keyHolder );
// Now, increment this level nb of added elements
// Check if we are done with the page
if ( level.getCurrentPos() == node.getNbElems() + 1 )
// Yes, we have to update the parent
injectInNode( btree, node, levels, levelIndex + 1 );
// An create a new one
level.setCurrentPage( BTreeFactory.createNode( btree, 0L, pageSize / 2 ) );
level.setCurrentPos( 0 );
// Two cases : we don't have anything else to write, or this is a single page
if ( level.getNbAddedElems() == level.getNbElems() )
// We are done with the page
injectInNode( btree, node, levels, levelIndex + 1 );
// We have some more elements to add in the page
// This is the first page
if ( ( level.getCurrentPos() == 0 ) && ( node.getKey( 0 ) == null ) )
// First element of the page
node.setPageHolder( 0, pageHolder );
// Any other following elements
// Inject the pageHolder and the page leftmost key
node.setPageHolder( level.getCurrentPos(), pageHolder );
KeyHolder<K> keyHolder = new PersistedKeyHolder<K>( btree.getKeySerializer(),
page.getLeftMostKey() );
node.setKey( level.getCurrentPos() - 1, keyHolder );
// Now, increment this level nb of added elements
// Check if we are done with the page
if ( level.getCurrentPos() == node.getNbElems() + 1 )
// Yes, we have to update the parent
injectInNode( btree, node, levels, levelIndex + 1 );
// An create a new one
level.setCurrentPage( BTreeFactory.createNode( btree, 0L, pageSize / 2 ) );
level.setCurrentPos( 0 );
private static <K, V> BTree<K, V> bulkLoadSinglePage( BTree<K, V> btree, Iterator<Tuple<K, Set<V>>> dataIterator,
int nbElems ) throws IOException
// Use the root page
Page<K, V> rootPage = btree.getRootPage();
// Initialize the root page
( ( AbstractPage<K, V> ) rootPage ).setNbElems( nbElems );
KeyHolder<K>[] keys = new KeyHolder[nbElems];
ValueHolder<V>[] values = new ValueHolder[nbElems];
switch ( btree.getType() )
( ( InMemoryLeaf<K, V> ) rootPage ).values = values;
( ( PersistedLeaf<K, V> ) rootPage ).values = values;
// We first have to inject data into the page
int pos = 0;
while ( dataIterator.hasNext() )
Tuple<K, Set<V>> tuple =;
// Store the current element in the rootPage
KeyHolder<K> keyHolder = new PersistedKeyHolder<K>( btree.getKeySerializer(), tuple.getKey() );
keys[pos] = keyHolder;
switch ( btree.getType() )
ValueHolder<V> valueHolder = new InMemoryValueHolder<V>( btree, ( V[] ) tuple.getValue()
.toArray() );
( ( InMemoryLeaf<K, V> ) rootPage ).values[pos] = valueHolder;
valueHolder = new PersistedValueHolder<V>( btree, ( V[] ) tuple.getValue()
.toArray() );
( ( PersistedLeaf<K, V> ) rootPage ).values[pos] = valueHolder;
// Update the rootPage
( ( AbstractPage<K, V> ) rootPage ).setKeys( keys );
// Update the btree with the nb of added elements, and write it$
BTreeHeader<K, V> btreeHeader = ( ( AbstractBTree<K, V> ) btree ).getBtreeHeader();
btreeHeader.setNbElems( nbElems );
return btree;
* Construct the target BTree from the sorted data. We will use the nb of elements
* to determinate the structure of the BTree, as it must be balanced
private static <K, V> BTree<K, V> bulkLoad( BTree<K, V> btree, Iterator<Tuple<K, Set<V>>> dataIterator, int nbElems )
throws IOException
int pageSize = btree.getPageSize();
// Special case : we can store all the element sin a single page
if ( nbElems <= pageSize )
return bulkLoadSinglePage( btree, dataIterator, nbElems );
// Ok, we will need more than one page to store the elements, which
// means we also will need more than one level.
// First, compute the needed number of levels.
List<LevelInfo<K, V>> levels = computeLevels( btree, nbElems );
// Now, let's fill the levels
LevelInfo<K, V> leafLevel = levels.get( 0 );
while ( dataIterator.hasNext() )
// let's fill page up to the point all the complete pages have been filled
if ( leafLevel.getNbAddedElems() < leafLevel.getNbElemsLimit() )
// grab a tuple
Tuple<K, Set<V>> tuple =;
injectInLeaf( btree, tuple, leafLevel );
// The page is completed, update the parent's node and create a new current page
if ( leafLevel.getCurrentPos() == pageSize )
injectInNode( btree, leafLevel.getCurrentPage(), levels, 1 );
// The page is full, we have to create a new one
leafLevel.setCurrentPage( BTreeFactory
.createLeaf( btree, 0L, computeNbElemsLeaf( btree, leafLevel ) ) );
leafLevel.setCurrentPos( 0 );
// We have to deal with uncompleted pages now (if we have any)
if ( leafLevel.getNbAddedElems() == nbElems )
// First use case : we have injected all the elements in the btree : get out
if ( nbElems - leafLevel.getNbElemsLimit() > pageSize )
// Second use case : the number of elements after the limit does not
// fit in a page, that means we have to split it into
// two pages
// First page will contain nbElems - leafLevel.nbElemsLimit - PageSize/2 elements
int nbToAdd = nbElems - leafLevel.getNbElemsLimit() - pageSize / 2;
while ( nbToAdd > 0 )
// grab a tuple
Tuple<K, Set<V>> tuple =;
injectInLeaf( btree, tuple, leafLevel );
// Now inject the page into the node
Page<K, V> currentPage = leafLevel.getCurrentPage();
injectInNode( btree, currentPage, levels, 1 );
// Create a new page for the remaining elements
nbToAdd = pageSize / 2;
leafLevel.setCurrentPage( BTreeFactory.createLeaf( btree, 0L, nbToAdd ) );
leafLevel.setCurrentPos( 0 );
while ( nbToAdd > 0 )
// grab a tuple
Tuple<K, Set<V>> tuple =;
injectInLeaf( btree, tuple, leafLevel );
// And update the parent node
Page<K, V> levelCurrentPage = leafLevel.getCurrentPage();
injectInNode( btree, levelCurrentPage, levels, 1 );
// We are done
// Third use case : we can push all the elements in the last page.
// Let's do it
int nbToAdd = nbElems - leafLevel.getNbElemsLimit();
while ( nbToAdd > 0 )
// grab a tuple
Tuple<K, Set<V>> tuple =;
injectInLeaf( btree, tuple, leafLevel );
// Now inject the page into the node
injectInNode( btree, leafLevel.getCurrentPage(), levels, 1 );
// and we are done
// Update the btree with the nb of added elements, and write it$
BTreeHeader<K, V> btreeHeader = ( ( AbstractBTree<K, V> ) btree ).getBtreeHeader();
btreeHeader.setNbElems( nbElems );
return btree;
* Flush a list of tuples to disk after having sorted them. In the process, we may have to gather the values
* for the tuples having the same keys.
* @throws IOException
private static <K, V> File flushToDisk( int fileNb, List<Tuple<K, V>> tuples, BTree<K, V> btree )
throws IOException
// Sort the tuples.
Tuple<K, Set<V>>[] sortedTuples = sort( btree, tuples );
File file = File.createTempFile( "sorted", Integer.toString( fileNb ) );
FileOutputStream fos = new FileOutputStream( file );
// Flush the tuples on disk
for ( Tuple<K, Set<V>> tuple : sortedTuples )
// Serialize the key
byte[] bytesKey = btree.getKeySerializer().serialize( tuple.key );
fos.write( IntSerializer.serialize( bytesKey.length ) );
fos.write( bytesKey );
// Serialize the number of values
int nbValues = tuple.getValue().size();
fos.write( IntSerializer.serialize( nbValues ) );
// Serialize the values
for ( V value : tuple.getValue() )
byte[] bytesValue = btree.getValueSerializer().serialize( value );
// Serialize the value
fos.write( IntSerializer.serialize( bytesValue.length ) );
fos.write( bytesValue );
return file;
* Sort a list of tuples, eliminating the duplicate keys and storing the values in a set when we
* have a duplicate key
private static <K, V> Tuple<K, Set<V>>[] sort( BTree<K, V> btree, List<Tuple<K, V>> tuples )
Comparator<Tuple<K, Set<V>>> tupleComparator = new TupleComparator( btree.getKeyComparator(), btree
.getValueComparator() );
// Sort the list
Tuple<K, V>[] tuplesArray = ( Tuple<K, V>[] ) tuples.toArray( new Tuple[]
{} );
// First, eliminate the equals keys. We use a map for that
Map<K, Set<V>> mapTuples = new HashMap<K, Set<V>>();
for ( Tuple<K, V> tuple : tuplesArray )
// Is the key present in the map ?
Set<V> foundSet = mapTuples.get( tuple.key );
if ( foundSet != null )
// We already have had such a key, add the value to the existing key
foundSet.add( tuple.value );
// No such key present in the map : create a new set to store the values,
// and add it in the map associated with the new key
Set<V> set = new TreeSet<V>();
set.add( tuple.value );
mapTuples.put( tuple.key, set );
// Now, sort the map, by extracting all the key/values from the map
int size = mapTuples.size();
Tuple<K, Set<V>>[] sortedTuples = new Tuple[size];
int pos = 0;
// We create an array containing all the elements
for ( Map.Entry<K, Set<V>> entry : mapTuples.entrySet() )
sortedTuples[pos] = new Tuple<K, Set<V>>();
sortedTuples[pos].key = entry.getKey();
sortedTuples[pos].value = entry.getValue();
// And we sort the array
Arrays.sort( sortedTuples, tupleComparator );
return sortedTuples;
* Build an iterator over an array of sorted tuples, in memory
private static <K, V> Iterator<Tuple<K, Set<V>>> createTupleIterator( BTree<K, V> btree, List<Tuple<K, V>> tuples )
final Tuple<K, Set<V>>[] sortedTuples = sort( btree, tuples );
Iterator<Tuple<K, Set<V>>> tupleIterator = new Iterator<Tuple<K, Set<V>>>()
private int pos = 0;
public Tuple<K, Set<V>> next()
// Return the current tuple, if any
if ( pos < sortedTuples.length )
Tuple<K, Set<V>> tuple = sortedTuples[pos];
return tuple;
return null;
public boolean hasNext()
return pos < sortedTuples.length;
public void remove()
return tupleIterator;
private static <K, V> Tuple<K, Set<V>> fetchTuple( BTree<K, V> btree, FileInputStream fis )
if ( fis.available() == 0 )
return null;
Tuple<K, Set<V>> tuple = new Tuple<K, Set<V>>();
tuple.value = new TreeSet<V>();
byte[] intBytes = new byte[4];
// Read the key length intBytes );
int keyLength = IntSerializer.deserialize( intBytes );
// Read the key
byte[] keyBytes = new byte[keyLength]; keyBytes );
K key = btree.getKeySerializer().fromBytes( keyBytes );
tuple.key = key;
// get the number of values intBytes );
int nbValues = IntSerializer.deserialize( intBytes );
// Serialize the values
for ( int i = 0; i < nbValues; i++ )
// Read the value length intBytes );
int valueLength = IntSerializer.deserialize( intBytes );
// Read the value
byte[] valueBytes = new byte[valueLength]; valueBytes );
V value = btree.getValueSerializer().fromBytes( valueBytes );
tuple.value.add( value );
return tuple;
catch ( IOException ioe )
return null;
* Build an iterator over an array of sorted tuples, from files on the disk
* @throws FileNotFoundException
private static <K, V> Iterator<Tuple<K, Set<V>>> createIterator( final BTree<K, V> btree,
final FileInputStream[] streams )
throws FileNotFoundException
// The number of files we have to read from
final int nbFiles = streams.length;
// We will read only one element at a time from each file
final Tuple<K, Set<V>>[] readTuples = new Tuple[nbFiles];
final TreeMap<Tuple<K, Integer>, Set<V>> candidates =
new TreeMap<Tuple<K, Integer>, Set<V>>(
new TupleComparator<K, Integer>( btree.getKeyComparator(), IntComparator.INSTANCE ) );
// Read the tuple from each files
for ( int i = 0; i < nbFiles; i++ )
while ( true )
readTuples[i] = fetchTuple( btree, streams[i] );
if ( readTuples[i] != null )
Tuple<K, Integer> candidate = new Tuple<K, Integer>( readTuples[i].key, i, btree.getKeySerializer()
.getComparator() );
if ( !candidates.containsKey( candidate ) )
candidates.put( candidate, readTuples[i].getValue() );
// We have to merge the pulled tuple with the existing one, and read one more tuple
Set<V> oldValues = candidates.get( candidate );
oldValues.addAll( readTuples[i].getValue() );
Iterator<Tuple<K, Set<V>>> tupleIterator = new Iterator<Tuple<K, Set<V>>>()
public Tuple<K, Set<V>> next()
// Get the first candidate
Tuple<K, Integer> tupleCandidate = candidates.firstKey();
// Remove it from the set
candidates.remove( tupleCandidate );
// Get the the next tuple from the stream we just got the tuple from
Tuple<K, Set<V>> tuple = readTuples[tupleCandidate.value];
// fetch the next tuple from the file we just read teh candidate from
while ( true )
// fetch it from the disk and store it into its reader
readTuples[tupleCandidate.value] = fetchTuple( btree, streams[tupleCandidate.value] );
if ( readTuples[tupleCandidate.value] == null )
// No more tuple for this file
if ( readTuples[tupleCandidate.value] != null )
// And store it into the candidate set
Set<V> oldValues = candidates.get( readTuples[tupleCandidate.value] );
if ( oldValues != null )
// We already have another element with the same key, merge them
oldValues.addAll( readTuples[tupleCandidate.value].value );
Tuple<K, Integer> newTuple = new Tuple<K, Integer>( readTuples[tupleCandidate.value].key,
tupleCandidate.value );
candidates.put( newTuple, readTuples[tupleCandidate.value].getValue() );
// and exit the loop
// We can now return the found value
return tuple;
public boolean hasNext()
// Check that we have at least one element to read
return !candidates.isEmpty();
public void remove()
return tupleIterator;
* Build an iterator over an array of sorted tuples, from files on the disk
* @throws FileNotFoundException
private static <K, V> Iterator<Tuple<K, Set<V>>> createUniqueFileIterator( final BTree<K, V> btree,
final FileInputStream stream )
throws FileNotFoundException
Iterator<Tuple<K, Set<V>>> tupleIterator = new Iterator<Tuple<K, Set<V>>>()
boolean hasNext = true;
public Tuple<K, Set<V>> next()
// Get the tuple from the stream
Tuple<K, Set<V>> tuple = fetchTuple( btree, stream );
// We can now return the found value
return tuple;
public boolean hasNext()
// Check that we have at least one element to read
return stream.available() > 0;
catch ( IOException e )
return false;
public void remove()
return tupleIterator;
* Compact a given persisted BTree, making it dense. All the values will be stored
* in newly created pages, each one of them containing as much elements
* as it's size.
* </br>
* The RecordManager will be stopped and restarted, do not use this method
* on a running BTree.
* @param recordManager The associated recordManager
* @param btree The BTree to compact
public static void compact( RecordManager recordManager, BTree<?, ?> btree )
* Compact a given in-memory BTree, making it dense. All the values will be stored
* in newly created pages, each one of them containing as much elements
* as it's size.
* </br>
* @param btree The BTree to compact
* @throws KeyNotFoundException
* @throws IOException
public static BTree<?, ?> compact( BTree<?, ?> btree ) throws IOException, KeyNotFoundException
// First, create a new BTree which will contain all the elements
InMemoryBTreeConfiguration configuration = new InMemoryBTreeConfiguration();
configuration.setName( btree.getName() );
configuration.setPageSize( btree.getPageSize() );
configuration.setKeySerializer( btree.getKeySerializer() );
configuration.setValueSerializer( btree.getValueSerializer() );
configuration.setAllowDuplicates( btree.isAllowDuplicates() );
configuration.setReadTimeOut( btree.getReadTimeOut() );
configuration.setWriteBufferSize( btree.getWriteBufferSize() );
File file = ( ( InMemoryBTree ) btree ).getFile();
if ( file != null )
configuration.setFilePath( file.getPath() );
// Create a new Btree Builder
InMemoryBTreeBuilder btreeBuilder = new InMemoryBTreeBuilder( configuration );
// Create a cursor over the existing btree
final TupleCursor cursor = btree.browse();
// Create an iterator that will iterate the existing btree
Iterator<Tuple> tupleItr = new Iterator<Tuple>()
public Tuple next()
catch ( EndOfFileExceededException e )
return null;
catch ( IOException e )
return null;
public boolean hasNext()
return cursor.hasNext();
catch ( EndOfFileExceededException e )
return false;
catch ( IOException e )
return false;
public void remove()
// And finally, compact the btree
return tupleItr );