| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.hyracks.storage.am.lsm.common.impls; |
| |
| import java.util.Comparator; |
| import java.util.List; |
| import java.util.PriorityQueue; |
| |
| import org.apache.hyracks.api.exceptions.HyracksDataException; |
| import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; |
| import org.apache.hyracks.storage.am.common.api.IIndexCursor; |
| import org.apache.hyracks.storage.am.common.api.ITreeIndexCursor; |
| import org.apache.hyracks.storage.am.common.api.IndexException; |
| import org.apache.hyracks.storage.am.common.ophelpers.MultiComparator; |
| import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; |
| import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness; |
| import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext; |
| import org.apache.hyracks.storage.am.lsm.common.api.ILSMTreeTupleReference; |
| import org.apache.hyracks.storage.common.buffercache.IBufferCache; |
| import org.apache.hyracks.storage.common.buffercache.ICachedPage; |
| |
| public abstract class LSMIndexSearchCursor implements ITreeIndexCursor { |
| protected final ILSMIndexOperationContext opCtx; |
| protected final boolean returnDeletedTuples; |
| protected PriorityQueueElement outputElement; |
| protected IIndexCursor[] rangeCursors; |
| protected PriorityQueueElement[] pqes; |
| protected PriorityQueue<PriorityQueueElement> outputPriorityQueue; |
| protected PriorityQueueComparator pqCmp; |
| protected MultiComparator cmp; |
| protected boolean needPush; |
| protected boolean includeMutableComponent; |
| protected ILSMHarness lsmHarness; |
| |
| protected List<ILSMComponent> operationalComponents; |
| |
| public LSMIndexSearchCursor(ILSMIndexOperationContext opCtx, boolean returnDeletedTuples) { |
| this.opCtx = opCtx; |
| this.returnDeletedTuples = returnDeletedTuples; |
| outputElement = null; |
| needPush = false; |
| } |
| |
| public ILSMIndexOperationContext getOpCtx() { |
| return opCtx; |
| } |
| |
| public void initPriorityQueue() throws HyracksDataException, IndexException { |
| int pqInitSize = (rangeCursors.length > 0) ? rangeCursors.length : 1; |
| if (outputPriorityQueue == null) { |
| outputPriorityQueue = new PriorityQueue<PriorityQueueElement>(pqInitSize, pqCmp); |
| pqes = new PriorityQueueElement[pqInitSize]; |
| for (int i = 0; i < pqInitSize; i++) { |
| pqes[i] = new PriorityQueueElement(i); |
| } |
| for (int i = 0; i < rangeCursors.length; i++) { |
| pushIntoPriorityQueue(pqes[i]); |
| } |
| } else { |
| outputPriorityQueue.clear(); |
| // did size change? |
| if (pqInitSize == pqes.length) { |
| // size is the same -> re-use |
| for (int i = 0; i < rangeCursors.length; i++) { |
| pqes[i].reset(null); |
| pushIntoPriorityQueue(pqes[i]); |
| } |
| } else { |
| // size changed (due to flushes, merges, etc) -> re-create |
| pqes = new PriorityQueueElement[pqInitSize]; |
| for (int i = 0; i < rangeCursors.length; i++) { |
| pqes[i] = new PriorityQueueElement(i); |
| pushIntoPriorityQueue(pqes[i]); |
| } |
| } |
| } |
| } |
| |
| public IIndexCursor getCursor(int cursorIndex) { |
| return rangeCursors[cursorIndex]; |
| } |
| |
| @Override |
| public void reset() throws HyracksDataException, IndexException { |
| outputElement = null; |
| needPush = false; |
| |
| try { |
| if (outputPriorityQueue != null) { |
| outputPriorityQueue.clear(); |
| } |
| |
| if (rangeCursors != null) { |
| for (int i = 0; i < rangeCursors.length; i++) { |
| rangeCursors[i].reset(); |
| } |
| } |
| rangeCursors = null; |
| } finally { |
| if (lsmHarness != null) { |
| lsmHarness.endSearch(opCtx); |
| } |
| } |
| } |
| |
| @Override |
| public boolean hasNext() throws HyracksDataException, IndexException { |
| checkPriorityQueue(); |
| return !outputPriorityQueue.isEmpty(); |
| } |
| |
| @Override |
| public void next() throws HyracksDataException { |
| outputElement = outputPriorityQueue.poll(); |
| needPush = true; |
| } |
| |
| @Override |
| public ICachedPage getPage() { |
| // do nothing |
| return null; |
| } |
| |
| @Override |
| public void close() throws HyracksDataException { |
| try { |
| if (outputPriorityQueue != null) { |
| outputPriorityQueue.clear(); |
| } |
| for (int i = 0; i < rangeCursors.length; i++) { |
| rangeCursors[i].close(); |
| } |
| rangeCursors = null; |
| } finally { |
| if (lsmHarness != null) { |
| lsmHarness.endSearch(opCtx); |
| } |
| } |
| } |
| |
| @Override |
| public void setBufferCache(IBufferCache bufferCache) { |
| // do nothing |
| } |
| |
| @Override |
| public void setFileId(int fileId) { |
| // do nothing |
| } |
| |
| @Override |
| public ITupleReference getTuple() { |
| return outputElement.getTuple(); |
| } |
| |
| protected boolean pushIntoPriorityQueue(PriorityQueueElement e) throws HyracksDataException, IndexException { |
| int cursorIndex = e.getCursorIndex(); |
| if (rangeCursors[cursorIndex].hasNext()) { |
| rangeCursors[cursorIndex].next(); |
| e.reset(rangeCursors[cursorIndex].getTuple()); |
| outputPriorityQueue.offer(e); |
| return true; |
| } |
| rangeCursors[cursorIndex].close(); |
| return false; |
| } |
| |
| protected boolean isDeleted(PriorityQueueElement checkElement) throws HyracksDataException, IndexException { |
| return ((ILSMTreeTupleReference) checkElement.getTuple()).isAntimatter(); |
| } |
| |
| protected void checkPriorityQueue() throws HyracksDataException, IndexException { |
| while (!outputPriorityQueue.isEmpty() || (needPush == true)) { |
| if (!outputPriorityQueue.isEmpty()) { |
| PriorityQueueElement checkElement = outputPriorityQueue.peek(); |
| // If there is no previous tuple or the previous tuple can be ignored |
| if (outputElement == null) { |
| if (isDeleted(checkElement) && !returnDeletedTuples) { |
| // If the key has been deleted then pop it and set needPush to true. |
| // We cannot push immediately because the tuple may be |
| // modified if hasNext() is called |
| outputElement = outputPriorityQueue.poll(); |
| needPush = true; |
| } else { |
| break; |
| } |
| } else { |
| // Compare the previous tuple and the head tuple in the PQ |
| if (compare(cmp, outputElement.getTuple(), checkElement.getTuple()) == 0) { |
| // If the previous tuple and the head tuple are |
| // identical |
| // then pop the head tuple and push the next tuple from |
| // the tree of head tuple |
| |
| // the head element of PQ is useless now |
| PriorityQueueElement e = outputPriorityQueue.poll(); |
| pushIntoPriorityQueue(e); |
| } else { |
| // If the previous tuple and the head tuple are different |
| // the info of previous tuple is useless |
| if (needPush == true) { |
| pushIntoPriorityQueue(outputElement); |
| needPush = false; |
| } |
| outputElement = null; |
| } |
| } |
| } else { |
| // the priority queue is empty and needPush |
| pushIntoPriorityQueue(outputElement); |
| needPush = false; |
| outputElement = null; |
| } |
| } |
| } |
| |
| @Override |
| public boolean exclusiveLatchNodes() { |
| return false; |
| } |
| |
| public class PriorityQueueElement { |
| private ITupleReference tuple; |
| private int cursorIndex; |
| |
| public PriorityQueueElement(int cursorIndex) { |
| tuple = null; |
| this.cursorIndex = cursorIndex; |
| } |
| |
| public ITupleReference getTuple() { |
| return tuple; |
| } |
| |
| public int getCursorIndex() { |
| return cursorIndex; |
| } |
| |
| public void reset(ITupleReference tuple) { |
| this.tuple = tuple; |
| } |
| } |
| |
| public class PriorityQueueComparator implements Comparator<PriorityQueueElement> { |
| |
| protected MultiComparator cmp; |
| |
| public PriorityQueueComparator(MultiComparator cmp) { |
| this.cmp = cmp; |
| } |
| |
| @Override |
| public int compare(PriorityQueueElement elementA, PriorityQueueElement elementB) { |
| int result; |
| try { |
| result = cmp.compare(elementA.getTuple(), elementB.getTuple()); |
| if (result != 0) { |
| return result; |
| } |
| } catch (HyracksDataException e) { |
| throw new IllegalArgumentException(e); |
| } |
| |
| if (elementA.getCursorIndex() > elementB.getCursorIndex()) { |
| return 1; |
| } else { |
| return -1; |
| } |
| } |
| |
| public MultiComparator getMultiComparator() { |
| return cmp; |
| } |
| } |
| |
| protected void setPriorityQueueComparator() { |
| if (pqCmp == null || cmp != pqCmp.getMultiComparator()) { |
| pqCmp = new PriorityQueueComparator(cmp); |
| } |
| } |
| |
| protected int compare(MultiComparator cmp, ITupleReference tupleA, ITupleReference tupleB) |
| throws HyracksDataException { |
| return cmp.compare(tupleA, tupleB); |
| } |
| |
| @Override |
| public void markCurrentTupleAsUpdated() throws HyracksDataException { |
| throw new HyracksDataException("Updating tuples is not supported with this cursor."); |
| } |
| } |