blob: 7f08ba4fbb193bdedc694ba97c54612999718d5d [file] [log] [blame]
/*
* Copyright 2009-2010 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
import java.util.Comparator;
import java.util.List;
import java.util.PriorityQueue;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMHarness;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMTreeTupleReference;
import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
public abstract class LSMIndexSearchCursor implements ITreeIndexCursor {
protected PriorityQueueElement outputElement;
protected IIndexCursor[] rangeCursors;
protected PriorityQueue<PriorityQueueElement> outputPriorityQueue;
protected PriorityQueueComparator pqCmp;
protected MultiComparator cmp;
protected boolean needPush;
protected boolean includeMemComponent;
protected ILSMHarness lsmHarness;
protected final ILSMIndexOperationContext opCtx;
protected List<ILSMComponent> operationalComponents;
public LSMIndexSearchCursor(ILSMIndexOperationContext opCtx) {
this.opCtx = opCtx;
outputElement = null;
needPush = false;
}
public void initPriorityQueue() throws HyracksDataException, IndexException {
int pqInitSize = (rangeCursors.length > 0) ? rangeCursors.length : 1;
outputPriorityQueue = new PriorityQueue<PriorityQueueElement>(pqInitSize, pqCmp);
for (int i = 0; i < rangeCursors.length; i++) {
pushIntoPriorityQueue(new PriorityQueueElement(i));
}
}
public IIndexCursor getCursor(int cursorIndex) {
return rangeCursors[cursorIndex];
}
@Override
public void reset() throws HyracksDataException, IndexException {
outputElement = null;
needPush = false;
if (outputPriorityQueue != null) {
outputPriorityQueue.clear();
}
if (rangeCursors != null) {
for (int i = 0; i < rangeCursors.length; i++) {
rangeCursors[i].reset();
}
}
rangeCursors = null;
}
@Override
public boolean hasNext() throws HyracksDataException, IndexException {
checkPriorityQueue();
return !outputPriorityQueue.isEmpty();
}
@Override
public void next() throws HyracksDataException {
outputElement = outputPriorityQueue.poll();
needPush = true;
}
@Override
public ICachedPage getPage() {
// do nothing
return null;
}
@Override
public void close() throws HyracksDataException {
if (lsmHarness != null) {
try {
outputPriorityQueue.clear();
for (int i = 0; i < rangeCursors.length; i++) {
rangeCursors[i].close();
}
rangeCursors = null;
} finally {
lsmHarness.endSearch(opCtx);
}
}
}
@Override
public void setBufferCache(IBufferCache bufferCache) {
// do nothing
}
@Override
public void setFileId(int fileId) {
// do nothing
}
@Override
public ITupleReference getTuple() {
return outputElement.getTuple();
}
protected boolean pushIntoPriorityQueue(PriorityQueueElement e) throws HyracksDataException, IndexException {
int cursorIndex = e.getCursorIndex();
if (rangeCursors[cursorIndex].hasNext()) {
rangeCursors[cursorIndex].next();
e.reset(rangeCursors[cursorIndex].getTuple());
outputPriorityQueue.offer(e);
return true;
}
rangeCursors[cursorIndex].close();
return false;
}
protected boolean isDeleted(PriorityQueueElement checkElement) throws HyracksDataException, IndexException {
return ((ILSMTreeTupleReference) checkElement.getTuple()).isAntimatter();
}
abstract protected void checkPriorityQueue() throws HyracksDataException, IndexException;
@Override
public boolean exclusiveLatchNodes() {
return false;
}
public class PriorityQueueElement {
private ITupleReference tuple;
private final int cursorIndex;
public PriorityQueueElement(int cursorIndex) {
tuple = null;
this.cursorIndex = cursorIndex;
}
public ITupleReference getTuple() {
return tuple;
}
public int getCursorIndex() {
return cursorIndex;
}
public void reset(ITupleReference tuple) {
this.tuple = tuple;
}
}
public class PriorityQueueComparator implements Comparator<PriorityQueueElement> {
protected final MultiComparator cmp;
public PriorityQueueComparator(MultiComparator cmp) {
this.cmp = cmp;
}
@Override
public int compare(PriorityQueueElement elementA, PriorityQueueElement elementB) {
int result = cmp.compare(elementA.getTuple(), elementB.getTuple());
if (result != 0) {
return result;
}
if (elementA.getCursorIndex() > elementB.getCursorIndex()) {
return 1;
} else {
return -1;
}
}
public MultiComparator getMultiComparator() {
return cmp;
}
}
protected void setPriorityQueueComparator() {
if (pqCmp == null || cmp != pqCmp.getMultiComparator()) {
pqCmp = new PriorityQueueComparator(cmp);
}
}
protected int compare(MultiComparator cmp, ITupleReference tupleA, ITupleReference tupleB) {
return cmp.compare(tupleA, tupleB);
}
}