blob: 2c5fb500aee50a7b9642191aaa9bd95b5a61f185 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hyracks.storage.am.lsm.btree.impls;
import java.util.Iterator;
import java.util.PriorityQueue;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.util.CleanupUtils;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleReference;
import org.apache.hyracks.dataflow.common.utils.TupleUtils;
import org.apache.hyracks.storage.am.btree.impls.BTree;
import org.apache.hyracks.storage.am.btree.impls.BTree.BTreeAccessor;
import org.apache.hyracks.storage.am.btree.impls.RangePredicate;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.LSMComponentType;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMMemoryComponent;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMIndexSearchCursor;
import org.apache.hyracks.storage.common.ICursorInitialState;
import org.apache.hyracks.storage.common.IIndexCursor;
import org.apache.hyracks.storage.common.IIndexCursorStats;
import org.apache.hyracks.storage.common.ISearchOperationCallback;
import org.apache.hyracks.storage.common.ISearchPredicate;
import org.apache.hyracks.storage.common.NoOpIndexCursorStats;
import org.apache.hyracks.storage.common.util.IndexCursorUtils;
public class LSMBTreeRangeSearchCursor extends LSMIndexSearchCursor {
private final ArrayTupleReference copyTuple;
private final RangePredicate reusablePred;
private ISearchOperationCallback searchCallback;
private BTreeAccessor[] btreeAccessors;
private boolean[] isMemoryComponent;
private ArrayTupleBuilder tupleBuilder;
private boolean canCallProceed = true;
private boolean resultOfSearchCallbackProceed = false;
private int tupleFromMemoryComponentCount = 0;
public LSMBTreeRangeSearchCursor(ILSMIndexOperationContext opCtx) {
this(opCtx, false, NoOpIndexCursorStats.INSTANCE);
}
public LSMBTreeRangeSearchCursor(ILSMIndexOperationContext opCtx, boolean returnDeletedTuples,
IIndexCursorStats stats) {
super(opCtx, returnDeletedTuples, stats);
this.copyTuple = new ArrayTupleReference();
this.reusablePred = new RangePredicate(null, null, true, true, null, null);
}
@Override
public void doClose() throws HyracksDataException {
super.doClose();
canCallProceed = true;
}
@Override
public void doNext() throws HyracksDataException {
outputElement = outputPriorityQueue.poll();
needPushElementIntoQueue = true;
canCallProceed = false;
if (outputElement.getCursorIndex() == 0) {
tupleFromMemoryComponentCount++;
}
}
/**
* Checks the priority queue and resets and the top element if required.
* PriorityQueue can hold one element from each cursor.
* The boolean variable canCallProceedMethod controls whether we can call proceed() method for this element.
* i.e. it can return this element if proceed() succeeds.
* If proceed fails, that is most-likely that there is ongoing operations in the in-memory component.
* After resolving in-memory component issue, it progresses again.
* Also, in order to not release the same element again, it keeps the previous output and checks it
* against the current head in the queue.
*/
@Override
protected void checkPriorityQueue() throws HyracksDataException {
// Every SWITCH_COMPONENT_CYCLE calls, check if memory components need to be swapped with disk components
// We should do this regardless of the value of includeMutableComponent. This is because if the cursor
// of the memory component has gone past the end of the in memory component, then the includeMutableComponent
// will be set to false. Still, when that happens, we want to exit the memory component to allow it to be
// recycled and used for modifications.
if (hasNextCallCount >= SWITCH_COMPONENT_CYCLE) {
replaceMemoryComponentWithDiskComponentIfNeeded();
hasNextCallCount = 0;
}
while (!outputPriorityQueue.isEmpty() || needPushElementIntoQueue) {
if (!outputPriorityQueue.isEmpty()) {
PriorityQueueElement queueHead = outputPriorityQueue.peek();
if (canCallProceed) {
if (includeMutableComponent) {
resultOfSearchCallbackProceed = searchCallback.proceed(queueHead.getTuple());
if (!resultOfSearchCallbackProceed) {
// In case proceed() fails and there is an in-memory component,
// we can't simply use this element since there might be a change.
PriorityQueueElement mutableElement = remove(outputPriorityQueue, 0);
if (mutableElement != null) {
// Copies the current queue head
if (tupleBuilder == null) {
tupleBuilder = new ArrayTupleBuilder(cmp.getKeyFieldCount());
}
TupleUtils.copyTuple(tupleBuilder, queueHead.getTuple(), cmp.getKeyFieldCount());
copyTuple.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray());
// Unlatches/unpins the leaf page of the index.
rangeCursors[0].close();
// Reconcile.
searchCallback.reconcile(copyTuple);
// Re-traverses the index.
reusablePred.setLowKey(copyTuple, true);
btreeAccessors[0].search(rangeCursors[0], reusablePred);
pushIntoQueueFromCursorAndReplaceThisElement(mutableElement);
// now that we have completed the search and we have latches over the pages,
// it is safe to complete the operation.. but as per the API of the callback
// we only complete if we're producing this tuple
// get head again
queueHead = outputPriorityQueue.peek();
/*
* We need to restart in one of two cases:
* 1. no more elements in the priority queue.
* 2. the key of the head has changed (which means we need to call proceed)
*/
if (queueHead == null || cmp.compare(copyTuple, queueHead.getTuple()) != 0) {
// cancel since we're not continuing
searchCallback.cancel(copyTuple);
continue;
}
searchCallback.complete(copyTuple);
// it is safe to proceed now
} else {
// There are no more elements in the memory component.. can safely skip locking for the
// remaining operations
includeMutableComponent = false;
}
}
} else {
// only perform locking for tuples from memory components.
// all tuples from disk components have already been committed, and we're safe to proceed
resultOfSearchCallbackProceed = true;
}
}
// If there is no previous tuple or the previous tuple can be ignored.
// This check is needed not to release the same tuple again.
if (outputElement == null) {
if (isDeleted(queueHead) && !returnDeletedTuples) {
// If the key has been deleted then pop it and set needPush to true.
// We cannot push immediately because the tuple may be
// modified if hasNext() is called
outputElement = outputPriorityQueue.poll();
needPushElementIntoQueue = true;
canCallProceed = false;
} else {
break;
}
} else {
// Compare the previous tuple and the head tuple in the PQ
if (compare(cmp, outputElement.getTuple(), queueHead.getTuple()) == 0) {
// If the previous tuple and the head tuple are
// identical
// then pop the head tuple and push the next tuple from
// the tree of head tuple
// the head element of PQ is useless now
PriorityQueueElement e = outputPriorityQueue.poll();
pushIntoQueueFromCursorAndReplaceThisElement(e);
} else {
// If the previous tuple and the head tuple are different
// the info of previous tuple is useless
pushOutputElementIntoQueueIfNeeded();
canCallProceed = true;
outputElement = null;
}
}
} else {
// the priority queue is empty and needPush
pushIntoQueueFromCursorAndReplaceThisElement(outputElement);
needPushElementIntoQueue = false;
outputElement = null;
canCallProceed = true;
}
}
}
private void pushOutputElementIntoQueueIfNeeded() throws HyracksDataException {
if (needPushElementIntoQueue) {
pushIntoQueueFromCursorAndReplaceThisElement(outputElement);
needPushElementIntoQueue = false;
}
}
private void replaceMemoryComponentWithDiskComponentIfNeeded() throws HyracksDataException {
int replaceFrom = replaceFrom();
if (replaceFrom < 0) {
// no switch is needed, check if we need to re-do the search on the memory component.
// searches and modifications compete on the pages of the memory component
// if the cursor on the memory component is not advancing, we re-do the operation in order
// to release the latches and allow modifications to proceed
redoMemoryComponentSearchIfNeeded();
return;
}
opCtx.getIndex().getHarness().replaceMemoryComponentsWithDiskComponents(getOpCtx(), replaceFrom);
// redo the search on the new component
// switchRequest array has the size = number of memory components. which can be greater
// than operationalComponents size in certain cases (0 disk component, 1 memory component for example)
// To avoid index out of bound, we end the loop at the first of the two conditions
for (int i = replaceFrom; i < switchRequest.length && i < operationalComponents.size(); i++) {
if (switchRequest[i]) {
ILSMComponent component = operationalComponents.get(i);
BTree btree = (BTree) component.getIndex();
if (i == 0 && component.getType() != LSMComponentType.MEMORY) {
includeMutableComponent = false;
}
if (switchedElements[i] != null) {
copyTuple.reset(switchComponentTupleBuilders[i].getFieldEndOffsets(),
switchComponentTupleBuilders[i].getByteArray());
reusablePred.setLowKey(copyTuple, true);
rangeCursors[i].close();
btreeAccessors[i].reset(btree, iap);
btreeAccessors[i].search(rangeCursors[i], reusablePred);
// consume the element that we restarted the search at since before the switch it was consumed
if (rangeCursors[i].hasNext()) {
rangeCursors[i].next();
switchedElements[i].reset(rangeCursors[i].getTuple());
}
}
}
switchRequest[i] = false;
switchedElements[i] = null;
// any failed switch makes further switches pointless
switchPossible = switchPossible && operationalComponents.get(i).getType() == LSMComponentType.DISK;
}
}
private int replaceFrom() throws HyracksDataException {
int replaceFrom = -1;
if (!switchPossible) {
return replaceFrom;
}
for (int i = 0; i < operationalComponents.size(); i++) {
ILSMComponent next = operationalComponents.get(i);
if (next.getType() == LSMComponentType.DISK) {
if (i == 0) {
// if the first component is a disk component, then switch is not possible
switchPossible = false;
}
break;
} else if (next.getState() == ComponentState.UNREADABLE_UNWRITABLE) {
// if the component is UNREADABLE_UNWRITABLE, then it means that the flush has been completed while
// the search cursor is inside the component, a switch candidate
if (replaceFrom < 0) {
replaceFrom = i;
}
PriorityQueueElement element;
if (outputElement != null && outputElement.getCursorIndex() == i) {
// there should be no element from this cursor in the queue since the element was polled
if (findElement(outputPriorityQueue, i) != null) {
throw new IllegalStateException("found element in the queue from the cursor of output element");
}
element = outputElement;
} else {
element = findElement(outputPriorityQueue, i);
}
// if this cursor is still active (has an element)
// then we copy the search key to restart the operation after
// replacing the component
if (element != null) {
if (switchComponentTupleBuilders[i] == null) {
switchComponentTupleBuilders[i] = new ArrayTupleBuilder(cmp.getKeyFieldCount());
}
TupleUtils.copyTuple(switchComponentTupleBuilders[i], element.getTuple(), cmp.getKeyFieldCount());
}
rangeCursors[i].close();
switchRequest[i] = true;
switchedElements[i] = element;
}
}
return replaceFrom;
}
private void redoMemoryComponentSearchIfNeeded() throws HyracksDataException {
if (!includeMutableComponent) {
return;
}
// if the last n records, none were from memory and there are writers inside the component,
// we need to re-do the search so the cursor doesn't block modifications due to latches over page
if (tupleFromMemoryComponentCount == 0
&& ((AbstractLSMMemoryComponent) operationalComponents.get(0)).getWriterCount() > 0) {
// When we reach here, we know that the mutable component element is not the outputElement
// since if it was the output element, the tupleFromMemoryComponentCount would be at least 1
PriorityQueueElement mutableElement = remove(outputPriorityQueue, 0);
if (mutableElement != null) {
// if the element is null, then there is nothing to do since no latches are held
if (tupleBuilder == null) {
tupleBuilder = new ArrayTupleBuilder(cmp.getKeyFieldCount());
}
TupleUtils.copyTuple(tupleBuilder, mutableElement.getTuple(), cmp.getKeyFieldCount());
copyTuple.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray());
// Unlatches/unpins the leaf page of the index.
rangeCursors[0].close();
// Re-traverses the index.
reusablePred.setLowKey(copyTuple, true);
btreeAccessors[0].search(rangeCursors[0], reusablePred);
pushIntoQueueFromCursorAndReplaceThisElement(mutableElement);
}
}
tupleFromMemoryComponentCount = 0;
}
private PriorityQueueElement remove(PriorityQueue<PriorityQueueElement> outputPriorityQueue, int cursorIndex) {
// Scans the PQ for the component's element and delete it
Iterator<PriorityQueueElement> it = outputPriorityQueue.iterator();
while (it.hasNext()) {
PriorityQueueElement e = it.next();
if (e.getCursorIndex() == cursorIndex) {
it.remove();
return e;
}
}
return null;
}
private PriorityQueueElement findElement(PriorityQueue<PriorityQueueElement> outputPriorityQueue, int cursorIndex) {
// Scans the PQ for the component's element
Iterator<PriorityQueueElement> it = outputPriorityQueue.iterator();
while (it.hasNext()) {
PriorityQueueElement e = it.next();
if (e.getCursorIndex() == cursorIndex) {
return e;
}
}
return null;
}
@Override
public void doOpen(ICursorInitialState initialState, ISearchPredicate searchPred) throws HyracksDataException {
LSMBTreeCursorInitialState lsmInitialState = (LSMBTreeCursorInitialState) initialState;
cmp = lsmInitialState.getOriginalKeyComparator();
operationalComponents = lsmInitialState.getOperationalComponents();
lsmHarness = lsmInitialState.getLSMHarness();
searchCallback = lsmInitialState.getSearchOperationCallback();
RangePredicate predicate = (RangePredicate) lsmInitialState.getSearchPredicate();
reusablePred.setLowKeyComparator(cmp);
reusablePred.setHighKey(predicate.getHighKey(), predicate.isHighKeyInclusive());
reusablePred.setHighKeyComparator(predicate.getHighKeyComparator());
includeMutableComponent = false;
int numBTrees = operationalComponents.size();
if (rangeCursors == null) {
// object creation: should be relatively low
rangeCursors = new IIndexCursor[numBTrees];
btreeAccessors = new BTreeAccessor[numBTrees];
isMemoryComponent = new boolean[numBTrees];
} else if (rangeCursors.length != numBTrees) {
// should destroy first
Throwable failure = CleanupUtils.destroy(null, btreeAccessors);
btreeAccessors = null;
failure = CleanupUtils.destroy(failure, rangeCursors);
rangeCursors = null;
if (failure != null) {
throw HyracksDataException.create(failure);
}
rangeCursors = new IIndexCursor[numBTrees];
btreeAccessors = new BTreeAccessor[numBTrees];
isMemoryComponent = new boolean[numBTrees];
}
for (int i = 0; i < numBTrees; i++) {
ILSMComponent component = operationalComponents.get(i);
BTree btree;
if (component.getType() == LSMComponentType.MEMORY) {
includeMutableComponent = true;
}
btree = (BTree) component.getIndex();
if (btreeAccessors[i] == null || destroyIncompatible(component, i)) {
btreeAccessors[i] = btree.createAccessor(iap);
rangeCursors[i] = btreeAccessors[i].createSearchCursor(false);
} else {
// re-use
btreeAccessors[i].reset(btree, iap);
rangeCursors[i].close();
}
isMemoryComponent[i] = component.getType() == LSMComponentType.MEMORY;
}
IndexCursorUtils.open(btreeAccessors, rangeCursors, searchPred);
try {
setPriorityQueueComparator();
initPriorityQueue();
canCallProceed = true;
} catch (Throwable th) { // NOSONAR Must catch all
IndexCursorUtils.close(rangeCursors, th);
throw HyracksDataException.create(th);
}
}
private boolean destroyIncompatible(ILSMComponent component, int index) throws HyracksDataException {
// exclusive or. if the component is memory and the previous one at that index was a disk component
// or vice versa, then we should destroy the cursor and accessor since they need to be recreated
if (component.getType() == LSMComponentType.MEMORY ^ isMemoryComponent[index]) {
Throwable failure = CleanupUtils.destroy(null, btreeAccessors[index]);
btreeAccessors[index] = null;
failure = CleanupUtils.destroy(failure, rangeCursors[index]);
rangeCursors[index] = null;
if (failure != null) {
throw HyracksDataException.create(failure);
}
return true;
}
return false;
}
@Override
public boolean getSearchOperationCallbackProceedResult() {
return resultOfSearchCallbackProceed;
}
}