blob: a913b8180208841f1c4c6f77990527ab81a014b2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hyracks.storage.am.lsm.rtree.impls;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.am.btree.api.IBTreeLeafFrame;
import org.apache.hyracks.storage.am.btree.impls.BTree;
import org.apache.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
import org.apache.hyracks.storage.am.btree.impls.RangePredicate;
import org.apache.hyracks.storage.am.common.api.ICursorInitialState;
import org.apache.hyracks.storage.am.common.api.ISearchPredicate;
import org.apache.hyracks.storage.am.common.api.ITreeIndexAccessor;
import org.apache.hyracks.storage.am.common.api.ITreeIndexCursor;
import org.apache.hyracks.storage.am.common.api.IndexException;
import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
import org.apache.hyracks.storage.am.common.ophelpers.MultiComparator;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.LSMComponentType;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMIndexSearchCursor;
import org.apache.hyracks.storage.am.rtree.api.IRTreeInteriorFrame;
import org.apache.hyracks.storage.am.rtree.api.IRTreeLeafFrame;
import org.apache.hyracks.storage.am.rtree.impls.RTree;
import org.apache.hyracks.storage.am.rtree.impls.RTreeSearchCursor;
import org.apache.hyracks.storage.am.rtree.impls.SearchPredicate;
public class LSMRTreeWithAntiMatterTuplesSearchCursor extends LSMIndexSearchCursor {
private ITreeIndexAccessor[] mutableRTreeAccessors;
private ITreeIndexAccessor[] btreeAccessors;
private RTreeSearchCursor[] mutableRTreeCursors;
private ITreeIndexCursor[] btreeCursors;
private RangePredicate btreeRangePredicate;
private boolean foundNext;
private ITupleReference frameTuple;
private int[] comparatorFields;
private MultiComparator btreeCmp;
private int currentCursor;
private SearchPredicate rtreeSearchPredicate;
private int numMutableComponents;
private boolean open;
public LSMRTreeWithAntiMatterTuplesSearchCursor(ILSMIndexOperationContext opCtx) {
this(opCtx, false);
}
public LSMRTreeWithAntiMatterTuplesSearchCursor(ILSMIndexOperationContext opCtx, boolean returnDeletedTuples) {
super(opCtx, returnDeletedTuples);
currentCursor = 0;
}
@Override
public void open(ICursorInitialState initialState, ISearchPredicate searchPred) throws HyracksDataException,
IndexException {
LSMRTreeCursorInitialState lsmInitialState = (LSMRTreeCursorInitialState) initialState;
cmp = lsmInitialState.getHilbertCmp();
btreeCmp = lsmInitialState.getBTreeCmp();
lsmHarness = lsmInitialState.getLSMHarness();
comparatorFields = lsmInitialState.getComparatorFields();
operationalComponents = lsmInitialState.getOperationalComponents();
rtreeSearchPredicate = (SearchPredicate) searchPred;
includeMutableComponent = false;
numMutableComponents = 0;
int numImmutableComponents = 0;
for (ILSMComponent component : operationalComponents) {
if (component.getType() == LSMComponentType.MEMORY) {
includeMutableComponent = true;
numMutableComponents++;
} else {
numImmutableComponents++;
}
}
if (includeMutableComponent) {
btreeRangePredicate = new RangePredicate(null, null, true, true, btreeCmp, btreeCmp);
}
mutableRTreeCursors = new RTreeSearchCursor[numMutableComponents];
mutableRTreeAccessors = new ITreeIndexAccessor[numMutableComponents];
btreeCursors = new BTreeRangeSearchCursor[numMutableComponents];
btreeAccessors = new ITreeIndexAccessor[numMutableComponents];
for (int i = 0; i < numMutableComponents; i++) {
ILSMComponent component = operationalComponents.get(i);
RTree rtree = (RTree) ((LSMRTreeMemoryComponent) component).getRTree();
BTree btree = (BTree) ((LSMRTreeMemoryComponent) component).getBTree();
mutableRTreeCursors[i] = new RTreeSearchCursor((IRTreeInteriorFrame) lsmInitialState
.getRTreeInteriorFrameFactory().createFrame(), (IRTreeLeafFrame) lsmInitialState
.getRTreeLeafFrameFactory().createFrame());
btreeCursors[i] = new BTreeRangeSearchCursor((IBTreeLeafFrame) lsmInitialState.getBTreeLeafFrameFactory()
.createFrame(), false);
btreeAccessors[i] = btree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
mutableRTreeAccessors[i] = rtree.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
}
rangeCursors = new RTreeSearchCursor[numImmutableComponents];
ITreeIndexAccessor[] immutableRTreeAccessors = new ITreeIndexAccessor[numImmutableComponents];
int j = 0;
for (int i = numMutableComponents; i < operationalComponents.size(); i++) {
ILSMComponent component = operationalComponents.get(i);
rangeCursors[j] = new RTreeSearchCursor((IRTreeInteriorFrame) lsmInitialState
.getRTreeInteriorFrameFactory().createFrame(), (IRTreeLeafFrame) lsmInitialState
.getRTreeLeafFrameFactory().createFrame());
RTree rtree = (RTree) ((LSMRTreeDiskComponent) component).getRTree();
immutableRTreeAccessors[j] = rtree.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
immutableRTreeAccessors[j].search(rangeCursors[j], searchPred);
j++;
}
searchNextCursor();
setPriorityQueueComparator();
initPriorityQueue();
open = true;
}
private void searchNextCursor() throws HyracksDataException, IndexException {
if (currentCursor < numMutableComponents) {
mutableRTreeCursors[currentCursor].reset();
mutableRTreeAccessors[currentCursor].search(mutableRTreeCursors[currentCursor], rtreeSearchPredicate);
}
}
@Override
public boolean hasNext() throws HyracksDataException, IndexException {
if (includeMutableComponent) {
if (foundNext) {
return true;
}
while (currentCursor < numMutableComponents) {
while (mutableRTreeCursors[currentCursor].hasNext()) {
mutableRTreeCursors[currentCursor].next();
ITupleReference currentTuple = mutableRTreeCursors[currentCursor].getTuple();
if (searchMemBTrees(currentTuple, currentCursor)) {
foundNext = true;
frameTuple = currentTuple;
return true;
}
}
mutableRTreeCursors[currentCursor].close();
currentCursor++;
searchNextCursor();
}
while (super.hasNext()) {
super.next();
ITupleReference diskRTreeTuple = super.getTuple();
if (searchMemBTrees(diskRTreeTuple, numMutableComponents)) {
foundNext = true;
frameTuple = diskRTreeTuple;
return true;
}
}
} else {
return super.hasNext();
}
return false;
}
@Override
public void next() throws HyracksDataException {
if (includeMutableComponent) {
foundNext = false;
} else {
super.next();
}
}
@Override
public ITupleReference getTuple() {
if (includeMutableComponent) {
return frameTuple;
} else {
return super.getTuple();
}
}
@Override
public void reset() throws HyracksDataException, IndexException {
if (!open) {
return;
}
currentCursor = 0;
foundNext = false;
if (includeMutableComponent) {
for (int i = 0; i < numMutableComponents; i++) {
mutableRTreeCursors[i].reset();
btreeCursors[i].reset();
}
}
super.reset();
}
@Override
public void close() throws HyracksDataException {
if (!open) {
return;
}
if (includeMutableComponent) {
for (int i = 0; i < numMutableComponents; i++) {
mutableRTreeCursors[i].close();
btreeCursors[i].close();
}
}
currentCursor = 0;
open = false;
super.close();
}
@Override
protected int compare(MultiComparator cmp, ITupleReference tupleA, ITupleReference tupleB)
throws HyracksDataException {
return cmp.selectiveFieldCompare(tupleA, tupleB, comparatorFields);
}
private boolean searchMemBTrees(ITupleReference tuple, int lastBTreeToSearch) throws HyracksDataException,
IndexException {
for (int i = 0; i < lastBTreeToSearch; i++) {
btreeCursors[i].reset();
btreeRangePredicate.setHighKey(tuple, true);
btreeRangePredicate.setLowKey(tuple, true);
btreeAccessors[i].search(btreeCursors[i], btreeRangePredicate);
try {
if (btreeCursors[i].hasNext()) {
return false;
}
} finally {
btreeCursors[i].close();
}
}
return true;
}
@Override
protected void setPriorityQueueComparator() {
if (pqCmp == null || cmp != pqCmp.getMultiComparator()) {
pqCmp = new PriorityQueueHilbertComparator(cmp, comparatorFields);
}
}
public class PriorityQueueHilbertComparator extends PriorityQueueComparator {
private final int[] comparatorFields;
public PriorityQueueHilbertComparator(MultiComparator cmp, int[] comparatorFields) {
super(cmp);
this.comparatorFields = comparatorFields;
}
@Override
public int compare(PriorityQueueElement elementA, PriorityQueueElement elementB) {
int result;
try {
result = cmp.selectiveFieldCompare(elementA.getTuple(), elementB.getTuple(), comparatorFields);
if (result != 0) {
return result;
}
} catch (HyracksDataException e) {
throw new IllegalArgumentException(e);
}
if (elementA.getCursorIndex() > elementB.getCursorIndex()) {
return 1;
} else {
return -1;
}
}
}
}