blob: 47d00c0cbccaebec2c39510aaa0a11de1d061d52 [file] [log] [blame]
/*
* Copyright 2009-2010 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.hyracks.storage.am.lsm.rtree.impls;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
import edu.uci.ics.hyracks.storage.am.common.api.ICursorInitialState;
import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMIndexSearchCursor;
import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeInteriorFrame;
import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeLeafFrame;
import edu.uci.ics.hyracks.storage.am.rtree.impls.RTreeSearchCursor;
public class LSMRTreeWithAntiMatterTuplesSearchCursor extends LSMIndexSearchCursor {
private RTreeSearchCursor memRTreeCursor;
private BTreeRangeSearchCursor memBTreeCursor;
private RangePredicate btreeRangePredicate;
private ITreeIndexAccessor memBTreeAccessor;
private boolean foundNext;
private ITupleReference frameTuple;
private int[] comparatorFields;
private MultiComparator btreeCmp;
public LSMRTreeWithAntiMatterTuplesSearchCursor(ILSMIndexOperationContext opCtx) {
super(opCtx);
}
@Override
public void open(ICursorInitialState initialState, ISearchPredicate searchPred) throws HyracksDataException {
LSMRTreeCursorInitialState lsmInitialState = (LSMRTreeCursorInitialState) initialState;
cmp = lsmInitialState.getHilbertCmp();
btreeCmp = lsmInitialState.getBTreeCmp();
int numDiskRTrees = lsmInitialState.getNumberOfTrees();
rangeCursors = new RTreeSearchCursor[numDiskRTrees];
for (int i = 0; i < numDiskRTrees; i++) {
rangeCursors[i] = new RTreeSearchCursor((IRTreeInteriorFrame) lsmInitialState
.getRTreeInteriorFrameFactory().createFrame(), (IRTreeLeafFrame) lsmInitialState
.getRTreeLeafFrameFactory().createFrame());
}
includeMemComponent = lsmInitialState.getIncludeMemComponent();
operationalComponents = lsmInitialState.getOperationalComponents();
if (includeMemComponent) {
memRTreeCursor = new RTreeSearchCursor((IRTreeInteriorFrame) lsmInitialState.getRTreeInteriorFrameFactory()
.createFrame(), (IRTreeLeafFrame) lsmInitialState.getRTreeLeafFrameFactory().createFrame());
memBTreeCursor = new BTreeRangeSearchCursor((IBTreeLeafFrame) lsmInitialState.getBTreeLeafFrameFactory()
.createFrame(), false);
memBTreeAccessor = lsmInitialState.getBTreeAccessors()[0];
btreeRangePredicate = new RangePredicate(null, null, true, true, btreeCmp, btreeCmp);
}
lsmHarness = lsmInitialState.getLSMHarness();
comparatorFields = lsmInitialState.getComparatorFields();
setPriorityQueueComparator();
}
@Override
public boolean hasNext() throws HyracksDataException, IndexException {
if (includeMemComponent) {
if (foundNext) {
return true;
}
while (memRTreeCursor.hasNext()) {
memRTreeCursor.next();
ITupleReference memRTreeTuple = memRTreeCursor.getTuple();
if (searchMemBTree(memRTreeTuple)) {
foundNext = true;
frameTuple = memRTreeTuple;
return true;
}
}
while (super.hasNext()) {
super.next();
ITupleReference diskRTreeTuple = super.getTuple();
if (searchMemBTree(diskRTreeTuple)) {
foundNext = true;
frameTuple = diskRTreeTuple;
return true;
}
}
} else {
return super.hasNext();
}
return false;
}
@Override
public void next() throws HyracksDataException {
if (includeMemComponent) {
foundNext = false;
} else {
super.next();
}
}
@Override
public ITupleReference getTuple() {
if (includeMemComponent) {
return frameTuple;
} else {
return super.getTuple();
}
}
@Override
public void reset() throws HyracksDataException, IndexException {
if (includeMemComponent) {
memRTreeCursor.reset();
memBTreeCursor.reset();
}
super.reset();
}
@Override
public void close() throws HyracksDataException {
if (includeMemComponent) {
memRTreeCursor.close();
memBTreeCursor.close();
}
super.close();
}
public ITreeIndexCursor getMemRTreeCursor() {
return memRTreeCursor;
}
@Override
protected int compare(MultiComparator cmp, ITupleReference tupleA, ITupleReference tupleB) {
return cmp.selectiveFieldCompare(tupleA, tupleB, comparatorFields);
}
private boolean searchMemBTree(ITupleReference tuple) throws HyracksDataException {
try {
btreeRangePredicate.setHighKey(tuple, true);
btreeRangePredicate.setLowKey(tuple, true);
memBTreeAccessor.search(memBTreeCursor, btreeRangePredicate);
} catch (IndexException e) {
throw new HyracksDataException(e);
}
try {
if (memBTreeCursor.hasNext()) {
return false;
} else {
return true;
}
} finally {
memBTreeCursor.close();
}
}
@Override
protected void setPriorityQueueComparator() {
if (pqCmp == null || cmp != pqCmp.getMultiComparator()) {
pqCmp = new PriorityQueueHilbertComparator(cmp, comparatorFields);
}
}
public class PriorityQueueHilbertComparator extends PriorityQueueComparator {
private final int[] comparatorFields;
public PriorityQueueHilbertComparator(MultiComparator cmp, int[] comparatorFields) {
super(cmp);
this.comparatorFields = comparatorFields;
}
@Override
public int compare(PriorityQueueElement elementA, PriorityQueueElement elementB) {
int result = cmp.selectiveFieldCompare(elementA.getTuple(), elementB.getTuple(), comparatorFields);
if (result != 0) {
return result;
}
if (elementA.getCursorIndex() > elementB.getCursorIndex()) {
return 1;
} else {
return -1;
}
}
}
@Override
protected void checkPriorityQueue() throws HyracksDataException, IndexException {
while (!outputPriorityQueue.isEmpty() || needPush == true) {
if (!outputPriorityQueue.isEmpty()) {
PriorityQueueElement checkElement = outputPriorityQueue.peek();
// If there is no previous tuple or the previous tuple can be ignored
if (outputElement == null) {
if (isDeleted(checkElement)) {
// If the key has been deleted then pop it and set needPush to true.
// We cannot push immediately because the tuple may be
// modified if hasNext() is called
outputElement = outputPriorityQueue.poll();
needPush = true;
} else {
break;
}
} else {
// Compare the previous tuple and the head tuple in the PQ
if (compare(cmp, outputElement.getTuple(), checkElement.getTuple()) == 0) {
// If the previous tuple and the head tuple are
// identical
// then pop the head tuple and push the next tuple from
// the tree of head tuple
// the head element of PQ is useless now
PriorityQueueElement e = outputPriorityQueue.poll();
pushIntoPriorityQueue(e);
} else {
// If the previous tuple and the head tuple are different
// the info of previous tuple is useless
if (needPush == true) {
pushIntoPriorityQueue(outputElement);
needPush = false;
}
outputElement = null;
}
}
} else {
// the priority queue is empty and needPush
pushIntoPriorityQueue(outputElement);
needPush = false;
outputElement = null;
}
}
}
}