blob: 2847004e4f49155479b812ffba458fe642973141 [file] [log] [blame]
/*
* Copyright 2009-2010 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.hyracks.storage.am.lsm.btree.impls;
import java.io.File;
import java.util.List;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
import edu.uci.ics.hyracks.storage.am.bloomfilter.impls.BloomCalculations;
import edu.uci.ics.hyracks.storage.am.bloomfilter.impls.BloomFilter;
import edu.uci.ics.hyracks.storage.am.bloomfilter.impls.BloomFilterFactory;
import edu.uci.ics.hyracks.storage.am.bloomfilter.impls.BloomFilterSpecification;
import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeDuplicateKeyException;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTree.BTreeAccessor;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTree.BTreeBulkLoader;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
import edu.uci.ics.hyracks.storage.am.common.api.IFreePageManager;
import edu.uci.ics.hyracks.storage.am.common.api.IInMemoryFreePageManager;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexBulkLoader;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexOperationContext;
import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
import edu.uci.ics.hyracks.storage.am.lsm.btree.tuples.LSMBTreeTupleReference;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.IInMemoryBufferCache;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMHarness;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessorInternal;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexFileManager;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryBufferCache;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.BlockingIOOperationCallbackWrapper;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.TreeIndexFactory;
import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
public class LSMBTree extends AbstractLSMIndex implements ITreeIndex {
// In-memory components.
private final LSMBTreeMutableComponent mutableComponent;
// For creating BTree's used in flush and merge.
private final LSMBTreeImmutableComponentFactory componentFactory;
// For creating BTree's used in bulk load. Different from diskBTreeFactory
// because it should have a different tuple writer in it's leaf frames.
private final LSMBTreeImmutableComponentFactory bulkLoadComponentFactory;
// Common for in-memory and on-disk components.
private final ITreeIndexFrameFactory insertLeafFrameFactory;
private final ITreeIndexFrameFactory deleteLeafFrameFactory;
private final IBinaryComparatorFactory[] cmpFactories;
public LSMBTree(IInMemoryBufferCache memBufferCache, IInMemoryFreePageManager memFreePageManager,
ITreeIndexFrameFactory interiorFrameFactory, ITreeIndexFrameFactory insertLeafFrameFactory,
ITreeIndexFrameFactory deleteLeafFrameFactory, ILSMIndexFileManager fileManager,
TreeIndexFactory<BTree> diskBTreeFactory, TreeIndexFactory<BTree> bulkLoadBTreeFactory,
BloomFilterFactory bloomFilterFactory, IFileMapProvider diskFileMapProvider, int fieldCount,
IBinaryComparatorFactory[] cmpFactories, ILSMMergePolicy mergePolicy,
ILSMOperationTrackerFactory opTrackerFactory, ILSMIOOperationScheduler ioScheduler,
ILSMIOOperationCallbackProvider ioOpCallbackProvider) {
super(memFreePageManager, diskBTreeFactory.getBufferCache(), fileManager, diskFileMapProvider, mergePolicy,
opTrackerFactory, ioScheduler, ioOpCallbackProvider);
mutableComponent = new LSMBTreeMutableComponent(new BTree(memBufferCache,
((InMemoryBufferCache) memBufferCache).getFileMapProvider(), memFreePageManager, interiorFrameFactory,
insertLeafFrameFactory, cmpFactories, fieldCount, new FileReference(new File("membtree"))),
memFreePageManager);
this.insertLeafFrameFactory = insertLeafFrameFactory;
this.deleteLeafFrameFactory = deleteLeafFrameFactory;
this.cmpFactories = cmpFactories;
componentFactory = new LSMBTreeImmutableComponentFactory(diskBTreeFactory, bloomFilterFactory);
bulkLoadComponentFactory = new LSMBTreeImmutableComponentFactory(bulkLoadBTreeFactory, bloomFilterFactory);
}
@Override
public synchronized void create() throws HyracksDataException {
if (isActivated) {
throw new HyracksDataException("Failed to create the index since it is activated.");
}
fileManager.deleteDirs();
fileManager.createDirs();
componentsRef.get().clear();
}
@Override
public synchronized void activate() throws HyracksDataException {
if (isActivated) {
return;
}
((InMemoryBufferCache) mutableComponent.getBTree().getBufferCache()).open();
mutableComponent.getBTree().create();
mutableComponent.getBTree().activate();
List<ILSMComponent> immutableComponents = componentsRef.get();
immutableComponents.clear();
List<LSMComponentFileReferences> validFileReferences;
try {
validFileReferences = fileManager.cleanupAndGetValidFiles();
} catch (IndexException e) {
throw new HyracksDataException(e);
}
for (LSMComponentFileReferences lsmComonentFileReference : validFileReferences) {
LSMBTreeImmutableComponent component;
try {
component = createDiskComponent(componentFactory,
lsmComonentFileReference.getInsertIndexFileReference(),
lsmComonentFileReference.getBloomFilterFileReference(), false);
} catch (IndexException e) {
throw new HyracksDataException(e);
}
immutableComponents.add(component);
}
isActivated = true;
}
@Override
public synchronized void deactivate(boolean flushOnExit) throws HyracksDataException {
if (!isActivated) {
return;
}
if (flushOnExit) {
BlockingIOOperationCallbackWrapper cb = new BlockingIOOperationCallbackWrapper(
ioOpCallbackProvider.getIOOperationCallback(this));
ILSMIndexAccessor accessor = createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
accessor.scheduleFlush(cb);
try {
cb.waitForIO();
} catch (InterruptedException e) {
throw new HyracksDataException(e);
}
}
List<ILSMComponent> immutableComponents = componentsRef.get();
for (ILSMComponent c : immutableComponents) {
LSMBTreeImmutableComponent component = (LSMBTreeImmutableComponent) c;
BTree btree = component.getBTree();
BloomFilter bloomFilter = component.getBloomFilter();
btree.deactivate();
bloomFilter.deactivate();
}
mutableComponent.getBTree().deactivate();
mutableComponent.getBTree().destroy();
((InMemoryBufferCache) mutableComponent.getBTree().getBufferCache()).close();
isActivated = false;
}
@Override
public synchronized void deactivate() throws HyracksDataException {
deactivate(true);
}
@Override
public void destroy() throws HyracksDataException {
if (isActivated) {
throw new HyracksDataException("Failed to destroy the index since it is activated.");
}
List<ILSMComponent> immutableComponents = componentsRef.get();
for (ILSMComponent c : immutableComponents) {
LSMBTreeImmutableComponent component = (LSMBTreeImmutableComponent) c;
component.getBTree().destroy();
component.getBloomFilter().destroy();
}
mutableComponent.getBTree().destroy();
fileManager.deleteDirs();
}
@Override
public void clear() throws HyracksDataException {
if (!isActivated) {
throw new HyracksDataException("Failed to clear the index since it is not activated.");
}
List<ILSMComponent> immutableComponents = componentsRef.get();
mutableComponent.getBTree().clear();
for (ILSMComponent c : immutableComponents) {
LSMBTreeImmutableComponent component = (LSMBTreeImmutableComponent) c;
component.getBloomFilter().deactivate();
component.getBTree().deactivate();
component.getBloomFilter().destroy();
component.getBTree().destroy();
}
immutableComponents.clear();
}
@Override
public void getOperationalComponents(ILSMIndexOperationContext ctx) {
List<ILSMComponent> immutableComponents = componentsRef.get();
List<ILSMComponent> operationalComponents = ctx.getComponentHolder();
operationalComponents.clear();
switch (ctx.getOperation()) {
case UPDATE:
case UPSERT:
case PHYSICALDELETE:
case FLUSH:
case DELETE:
operationalComponents.add(mutableComponent);
break;
case SEARCH:
case INSERT:
operationalComponents.add(mutableComponent);
operationalComponents.addAll(immutableComponents);
break;
case MERGE:
operationalComponents.addAll(immutableComponents);
break;
default:
throw new UnsupportedOperationException("Operation " + ctx.getOperation() + " not supported.");
}
}
@Override
public void modify(IIndexOperationContext ictx, ITupleReference tuple) throws HyracksDataException, IndexException {
LSMBTreeOpContext ctx = (LSMBTreeOpContext) ictx;
switch (ctx.getOperation()) {
case PHYSICALDELETE:
ctx.memBTreeAccessor.delete(tuple);
break;
case INSERT:
insert(tuple, ctx);
break;
default:
ctx.memBTreeAccessor.upsert(tuple);
break;
}
mutableComponent.setIsModified();
}
private boolean insert(ITupleReference tuple, LSMBTreeOpContext ctx) throws HyracksDataException, IndexException {
MultiComparator comparator = MultiComparator.createIgnoreFieldLength(mutableComponent.getBTree()
.getComparatorFactories());
LSMBTreePointSearchCursor searchCursor = new LSMBTreePointSearchCursor(ctx);
IIndexCursor memCursor = new BTreeRangeSearchCursor(ctx.memBTreeOpCtx.leafFrame, false);
RangePredicate predicate = new RangePredicate(tuple, tuple, true, true, comparator, comparator);
// first check the inmemory component
ctx.memBTreeAccessor.search(memCursor, predicate);
try {
if (memCursor.hasNext()) {
memCursor.next();
LSMBTreeTupleReference lsmbtreeTuple = (LSMBTreeTupleReference) memCursor.getTuple();
if (!lsmbtreeTuple.isAntimatter()) {
throw new BTreeDuplicateKeyException("Failed to insert key since key already exists.");
} else {
memCursor.close();
ctx.memBTreeAccessor.upsertIfConditionElseInsert(tuple, AntimatterAwareTupleAcceptor.INSTANCE);
return true;
}
}
} finally {
memCursor.close();
}
// TODO: Can we just remove the above code that search the mutable
// component and do it together with the search call below? i.e. instead
// of passing false to the lsmHarness.search(), we pass true to include
// the mutable component?
// the key was not in the inmemory component, so check the disk
// components
search(ctx, searchCursor, predicate);
try {
if (searchCursor.hasNext()) {
throw new BTreeDuplicateKeyException("Failed to insert key since key already exists.");
}
} finally {
searchCursor.close();
}
ctx.memBTreeAccessor.upsertIfConditionElseInsert(tuple, AntimatterAwareTupleAcceptor.INSTANCE);
return true;
}
@Override
public void search(ILSMIndexOperationContext ictx, IIndexCursor cursor, ISearchPredicate pred)
throws HyracksDataException, IndexException {
LSMBTreeOpContext ctx = (LSMBTreeOpContext) ictx;
List<ILSMComponent> operationalComponents = ctx.getComponentHolder();
int numBTrees = operationalComponents.size();
assert numBTrees > 0;
boolean includeMutableComponent = operationalComponents.get(0) == mutableComponent;
LSMBTreeCursorInitialState initialState = new LSMBTreeCursorInitialState(numBTrees, insertLeafFrameFactory,
ctx.cmp, ctx.bloomFilterCmp, includeMutableComponent, lsmHarness, ctx.memBTreeAccessor, pred,
ctx.searchCallback, operationalComponents);
cursor.open(initialState, pred);
}
@Override
public boolean scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
throws HyracksDataException {
if (!mutableComponent.isModified()) {
return false;
}
LSMComponentFileReferences componentFileRefs = fileManager.getRelFlushFileReference();
LSMBTreeOpContext opCtx = createOpContext(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
assert ctx.getComponentHolder().size() == 1;
ILSMComponent flushingComponent = ctx.getComponentHolder().get(0);
opCtx.setOperation(IndexOperation.FLUSH);
opCtx.getComponentHolder().add(flushingComponent);
ILSMIndexAccessorInternal flushAccessor = new LSMBTreeAccessor(lsmHarness, opCtx);
ioScheduler.scheduleOperation(new LSMBTreeFlushOperation(flushAccessor, flushingComponent, componentFileRefs
.getInsertIndexFileReference(), componentFileRefs.getBloomFilterFileReference(), callback));
return true;
}
@Override
public ILSMComponent flush(ILSMIOOperation operation) throws HyracksDataException, IndexException {
LSMBTreeFlushOperation flushOp = (LSMBTreeFlushOperation) operation;
LSMBTreeMutableComponent flushingComponent = (LSMBTreeMutableComponent) flushOp.getFlushingComponent();
IIndexAccessor accessor = flushingComponent.getBTree().createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
RangePredicate nullPred = new RangePredicate(null, null, true, true, null, null);
IIndexCursor countingCursor = ((BTreeAccessor) accessor).createCountingSearchCursor();
accessor.search(countingCursor, nullPred);
long numElements = 0L;
try {
while (countingCursor.hasNext()) {
countingCursor.next();
ITupleReference countTuple = countingCursor.getTuple();
numElements = IntegerSerializerDeserializer.getInt(countTuple.getFieldData(0),
countTuple.getFieldStart(0));
}
} finally {
countingCursor.close();
}
int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numElements);
BloomFilterSpecification bloomFilterSpec = BloomCalculations.computeBloomSpec(maxBucketsPerElement,
MAX_BLOOM_FILTER_ACCEPTABLE_FALSE_POSITIVE_RATE);
LSMBTreeImmutableComponent component = createDiskComponent(componentFactory, flushOp.getBTreeFlushTarget(),
flushOp.getBloomFilterFlushTarget(), true);
IIndexBulkLoader bulkLoader = component.getBTree().createBulkLoader(1.0f, false, numElements);
IIndexBulkLoader builder = component.getBloomFilter().createBuilder(numElements,
bloomFilterSpec.getNumHashes(), bloomFilterSpec.getNumBucketsPerElements());
IIndexCursor scanCursor = accessor.createSearchCursor();
accessor.search(scanCursor, nullPred);
try {
while (scanCursor.hasNext()) {
scanCursor.next();
builder.add(scanCursor.getTuple());
bulkLoader.add(scanCursor.getTuple());
}
} finally {
scanCursor.close();
builder.end();
}
bulkLoader.end();
return component;
}
public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
throws HyracksDataException, IndexException {
LSMBTreeOpContext opCtx = createOpContext(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
List<ILSMComponent> mergingComponents = ctx.getComponentHolder();
opCtx.getComponentHolder().addAll(mergingComponents);
ITreeIndexCursor cursor = new LSMBTreeRangeSearchCursor(opCtx);
RangePredicate rangePred = new RangePredicate(null, null, true, true, null, null);
search(opCtx, cursor, rangePred);
opCtx.setOperation(IndexOperation.MERGE);
BTree firstBTree = (BTree) ((LSMBTreeImmutableComponent) mergingComponents.get(0)).getBTree();
BTree lastBTree = (BTree) ((LSMBTreeImmutableComponent) mergingComponents.get(mergingComponents.size() - 1))
.getBTree();
FileReference firstFile = diskFileMapProvider.lookupFileName(firstBTree.getFileId());
FileReference lastFile = diskFileMapProvider.lookupFileName(lastBTree.getFileId());
LSMComponentFileReferences relMergeFileRefs = fileManager.getRelMergeFileReference(firstFile.getFile()
.getName(), lastFile.getFile().getName());
ILSMIndexAccessorInternal accessor = new LSMBTreeAccessor(lsmHarness, opCtx);
ioScheduler.scheduleOperation(new LSMBTreeMergeOperation(accessor, mergingComponents, cursor, relMergeFileRefs
.getInsertIndexFileReference(), relMergeFileRefs.getBloomFilterFileReference(), callback));
}
@Override
public ILSMComponent merge(List<ILSMComponent> mergedComponents, ILSMIOOperation operation)
throws HyracksDataException, IndexException {
LSMBTreeMergeOperation mergeOp = (LSMBTreeMergeOperation) operation;
ITreeIndexCursor cursor = mergeOp.getCursor();
mergedComponents.addAll(mergeOp.getMergingComponents());
long numElements = 0L;
for (int i = 0; i < mergedComponents.size(); ++i) {
numElements += ((LSMBTreeImmutableComponent) mergedComponents.get(i)).getBloomFilter().getNumElements();
}
int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numElements);
BloomFilterSpecification bloomFilterSpec = BloomCalculations.computeBloomSpec(maxBucketsPerElement,
MAX_BLOOM_FILTER_ACCEPTABLE_FALSE_POSITIVE_RATE);
LSMBTreeImmutableComponent mergedComponent = createDiskComponent(componentFactory,
mergeOp.getBTreeMergeTarget(), mergeOp.getBloomFilterMergeTarget(), true);
IIndexBulkLoader bulkLoader = mergedComponent.getBTree().createBulkLoader(1.0f, false, numElements);
IIndexBulkLoader builder = mergedComponent.getBloomFilter().createBuilder(numElements,
bloomFilterSpec.getNumHashes(), bloomFilterSpec.getNumBucketsPerElements());
try {
while (cursor.hasNext()) {
cursor.next();
ITupleReference frameTuple = cursor.getTuple();
builder.add(frameTuple);
bulkLoader.add(frameTuple);
}
} finally {
cursor.close();
builder.end();
}
bulkLoader.end();
return mergedComponent;
}
private LSMBTreeImmutableComponent createDiskComponent(LSMBTreeImmutableComponentFactory factory,
FileReference btreeFileRef, FileReference bloomFilterFileRef, boolean createComponent)
throws HyracksDataException, IndexException {
// Create new BTree instance.
LSMBTreeImmutableComponent component = (LSMBTreeImmutableComponent) factory
.createLSMComponentInstance(new LSMComponentFileReferences(btreeFileRef, null, bloomFilterFileRef));
if (createComponent) {
component.getBTree().create();
component.getBloomFilter().create();
}
// BTree will be closed during cleanup of merge().
component.getBTree().activate();
component.getBloomFilter().activate();
return component;
}
@Override
public IIndexBulkLoader createBulkLoader(float fillLevel, boolean verifyInput, long numElementsHint)
throws TreeIndexException {
try {
return new LSMBTreeBulkLoader(fillLevel, verifyInput, numElementsHint);
} catch (HyracksDataException e) {
throw new TreeIndexException(e);
}
}
private ILSMComponent createBulkLoadTarget() throws HyracksDataException, IndexException {
LSMComponentFileReferences componentFileRefs = fileManager.getRelFlushFileReference();
return createDiskComponent(bulkLoadComponentFactory, componentFileRefs.getInsertIndexFileReference(),
componentFileRefs.getBloomFilterFileReference(), true);
}
@Override
public void markAsValid(ILSMComponent lsmComponent) throws HyracksDataException {
// The order of forcing the dirty page to be flushed is critical. The
// bloom filter must be always done first.
LSMBTreeImmutableComponent component = (LSMBTreeImmutableComponent) lsmComponent;
// Flush the bloom filter first.
int fileId = component.getBloomFilter().getFileId();
IBufferCache bufferCache = component.getBTree().getBufferCache();
int startPage = 0;
int maxPage = component.getBloomFilter().getNumPages();
forceFlushDirtyPages(bufferCache, fileId, startPage, maxPage);
forceFlushDirtyPages(component.getBTree());
markAsValidInternal(component.getBTree());
}
public class LSMBTreeBulkLoader implements IIndexBulkLoader {
private final ILSMComponent component;
private final BTreeBulkLoader bulkLoader;
private final IIndexBulkLoader builder;
private boolean endHasBeenCalled = false;
public LSMBTreeBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint)
throws TreeIndexException, HyracksDataException {
try {
component = createBulkLoadTarget();
} catch (HyracksDataException e) {
throw new TreeIndexException(e);
} catch (IndexException e) {
throw new TreeIndexException(e);
}
bulkLoader = (BTreeBulkLoader) ((LSMBTreeImmutableComponent) component).getBTree().createBulkLoader(
fillFactor, verifyInput, numElementsHint);
int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numElementsHint);
BloomFilterSpecification bloomFilterSpec = BloomCalculations.computeBloomSpec(maxBucketsPerElement,
MAX_BLOOM_FILTER_ACCEPTABLE_FALSE_POSITIVE_RATE);
builder = ((LSMBTreeImmutableComponent) component).getBloomFilter().createBuilder(numElementsHint,
bloomFilterSpec.getNumHashes(), bloomFilterSpec.getNumBucketsPerElements());
}
@Override
public void add(ITupleReference tuple) throws IndexException, HyracksDataException {
try {
bulkLoader.add(tuple);
builder.add(tuple);
} catch (IndexException e) {
handleException();
throw e;
} catch (HyracksDataException e) {
handleException();
throw e;
} catch (RuntimeException e) {
handleException();
throw e;
}
}
protected void handleException() throws HyracksDataException, IndexException {
if (!endHasBeenCalled) {
builder.end();
}
((LSMBTreeImmutableComponent) component).getBTree().deactivate();
((LSMBTreeImmutableComponent) component).getBTree().destroy();
((LSMBTreeImmutableComponent) component).getBloomFilter().deactivate();
((LSMBTreeImmutableComponent) component).getBloomFilter().destroy();
}
@Override
public void end() throws HyracksDataException, IndexException {
bulkLoader.end();
builder.end();
endHasBeenCalled = true;
lsmHarness.addBulkLoadedComponent(component);
}
}
public LSMBTreeOpContext createOpContext(IModificationOperationCallback modificationCallback,
ISearchOperationCallback searchCallback) {
return new LSMBTreeOpContext(mutableComponent.getBTree(), insertLeafFrameFactory, deleteLeafFrameFactory,
modificationCallback, searchCallback, componentFactory.getBloomFilterKeyFields().length);
}
@Override
public ILSMIndexAccessorInternal createAccessor(IModificationOperationCallback modificationCallback,
ISearchOperationCallback searchCallback) {
return new LSMBTreeAccessor(lsmHarness, createOpContext(modificationCallback, searchCallback));
}
public class LSMBTreeAccessor extends LSMTreeIndexAccessor {
public LSMBTreeAccessor(ILSMHarness lsmHarness, ILSMIndexOperationContext ctx) {
super(lsmHarness, ctx);
}
@Override
public IIndexCursor createSearchCursor() {
return new LSMBTreeSearchCursor(ctx);
}
public MultiComparator getMultiComparator() {
LSMBTreeOpContext concreteCtx = (LSMBTreeOpContext) ctx;
return concreteCtx.cmp;
}
}
@Override
public IBufferCache getBufferCache() {
return diskBufferCache;
}
public IBinaryComparatorFactory[] getComparatorFactories() {
return cmpFactories;
}
@Override
public ITreeIndexFrameFactory getInteriorFrameFactory() {
return mutableComponent.getBTree().getInteriorFrameFactory();
}
@Override
public int getFieldCount() {
return mutableComponent.getBTree().getFieldCount();
}
@Override
public int getFileId() {
return mutableComponent.getBTree().getFileId();
}
@Override
public IFreePageManager getFreePageManager() {
return mutableComponent.getBTree().getFreePageManager();
}
@Override
public ITreeIndexFrameFactory getLeafFrameFactory() {
return mutableComponent.getBTree().getLeafFrameFactory();
}
@Override
public long getMemoryAllocationSize() {
InMemoryBufferCache memBufferCache = (InMemoryBufferCache) mutableComponent.getBTree().getBufferCache();
return memBufferCache.getNumPages() * memBufferCache.getPageSize();
}
@Override
public int getRootPageId() {
return mutableComponent.getBTree().getRootPageId();
}
public boolean isEmptyIndex() throws HyracksDataException {
List<ILSMComponent> immutableComponents = componentsRef.get();
return immutableComponents.isEmpty()
&& mutableComponent.getBTree().isEmptyTree(
mutableComponent.getBTree().getInteriorFrameFactory().createFrame());
}
@Override
public void validate() throws HyracksDataException {
mutableComponent.getBTree().validate();
List<ILSMComponent> immutableComponents = componentsRef.get();
for (ILSMComponent c : immutableComponents) {
BTree btree = (BTree) ((LSMBTreeImmutableComponent) c).getBTree();
btree.validate();
}
}
}