blob: 284052b4751562602b1665083c7c13ef97be91a1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.storage.pagememory.index;
import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionDependingOnIndexStorageState;
import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionDependingOnStorageState;
import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionDependingOnStorageStateOnRebalance;
import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageInProgressOfRebalance;
import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageNotInCleanupOrRebalancedState;
import java.util.NoSuchElementException;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
import org.apache.ignite.internal.lang.IgniteStringFormatter;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.pagememory.freelist.FreeListImpl;
import org.apache.ignite.internal.pagememory.tree.BplusTree;
import org.apache.ignite.internal.pagememory.util.GradualTask;
import org.apache.ignite.internal.pagememory.util.GradualTaskExecutor;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.StorageRebalanceException;
import org.apache.ignite.internal.storage.index.IndexStorage;
import org.apache.ignite.internal.storage.index.PeekCursor;
import org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorageEngine;
import org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryStorageEngine;
import org.apache.ignite.internal.storage.pagememory.index.common.IndexRowKey;
import org.apache.ignite.internal.storage.pagememory.index.meta.IndexMeta;
import org.apache.ignite.internal.storage.pagememory.index.meta.IndexMetaKey;
import org.apache.ignite.internal.storage.pagememory.index.meta.IndexMetaTree;
import org.apache.ignite.internal.storage.pagememory.index.meta.UpdateLastRowIdUuidToBuildInvokeClosure;
import org.apache.ignite.internal.storage.util.StorageState;
import org.apache.ignite.internal.storage.util.StorageUtils;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.jetbrains.annotations.Nullable;
/**
* Abstract index storage based on Page Memory.
*/
public abstract class AbstractPageMemoryIndexStorage<K extends IndexRowKey, V extends K, TreeT extends BplusTree<K, V>>
implements IndexStorage {
private static final IgniteLogger LOG = Loggers.forClass(AbstractPageMemoryIndexStorage.class);
/** Partition id. */
protected final int partitionId;
/** Lowest possible RowId according to signed long ordering. */
protected final RowId lowestRowId;
/** Highest possible RowId according to signed long ordering. */
protected final RowId highestRowId;
/** Current state of the storage. */
protected final AtomicReference<StorageState> state = new AtomicReference<>(StorageState.RUNNABLE);
/** Index meta tree instance. */
private volatile IndexMetaTree indexMetaTree;
/** Free list. */
protected volatile FreeListImpl freeList;
/** Index tree instance. */
protected volatile TreeT indexTree;
/** Row ID for which the index needs to be built, {@code null} means that the index building has completed. */
private volatile @Nullable RowId nextRowIdToBuild;
/** Busy lock. */
private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
/** Index ID. */
private final int indexId;
private final boolean isVolatile;
protected AbstractPageMemoryIndexStorage(
IndexMeta indexMeta,
int partitionId,
TreeT indexTree,
FreeListImpl freeList,
IndexMetaTree indexMetaTree,
boolean isVolatile
) {
this.indexId = indexMeta.indexId();
this.partitionId = partitionId;
this.indexTree = indexTree;
this.freeList = freeList;
this.indexMetaTree = indexMetaTree;
this.isVolatile = isVolatile;
lowestRowId = RowId.lowestRowId(partitionId);
highestRowId = RowId.highestRowId(partitionId);
nextRowIdToBuild = getNextRowIdToBuild(indexMeta);
}
private @Nullable RowId getNextRowIdToBuild(IndexMeta indexMeta) {
UUID uuid = indexMeta.nextRowIdUuidToBuild();
return uuid == null ? null : new RowId(partitionId, uuid);
}
@Override
public @Nullable RowId getNextRowIdToBuild() {
return busyNonDataRead(() -> {
throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
return nextRowIdToBuild;
});
}
@Override
public void setNextRowIdToBuild(@Nullable RowId rowId) {
busyNonDataRead(() -> {
throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
UUID rowIdUuid = rowId == null ? null : rowId.uuid();
try {
indexMetaTree.invoke(new IndexMetaKey(indexId), null, new UpdateLastRowIdUuidToBuildInvokeClosure(rowIdUuid));
} catch (IgniteInternalCheckedException e) {
throw new StorageException("Error updating last row ID uuid to build: [{}, rowId={}]", e, createStorageInfo(), rowId);
}
nextRowIdToBuild = rowId;
return null;
});
}
/** Closes the index storage. */
public void close() {
if (!transitionToTerminalState(StorageState.CLOSED)) {
return;
}
busyLock.block();
closeStructures();
}
/**
* If not already in a terminal state, transitions to the supplied state and returns {@code true}, otherwise just returns {@code false}.
*/
private boolean transitionToTerminalState(StorageState targetState) {
return StorageUtils.transitionToTerminalState(targetState, state);
}
/**
* Prepares storage for rebalancing.
*
* @throws StorageRebalanceException If there was an error when starting the rebalance.
*/
public void startRebalance() {
if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.REBALANCE)) {
throwExceptionDependingOnStorageStateOnRebalance(state.get(), createStorageInfo());
}
// Changed storage states and expect all storage operations to stop soon.
busyLock.block();
try {
closeStructures();
} finally {
busyLock.unblock();
}
}
/**
* Completes the rebalancing of the storage.
*
* @throws StorageRebalanceException If there is an error while completing the storage rebalance.
*/
public void completeRebalance() {
if (!state.compareAndSet(StorageState.REBALANCE, StorageState.RUNNABLE)) {
throwExceptionDependingOnStorageStateOnRebalance(state.get(), createStorageInfo());
}
}
/**
* Prepares the storage for cleanup.
*
* <p>After cleanup (successful or not), method {@link #finishCleanup()} must be called.
*/
public void startCleanup() {
if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.CLEANUP)) {
throwExceptionDependingOnStorageState(state.get(), createStorageInfo());
}
// Changed storage states and expect all storage operations to stop soon.
busyLock.block();
try {
closeStructures();
} finally {
busyLock.unblock();
}
}
/**
* Finishes cleanup up the storage.
*/
public void finishCleanup() {
state.compareAndSet(StorageState.CLEANUP, StorageState.RUNNABLE);
}
/**
* Starts destruction of the data stored by this index partition.
*
* @param executor {@link GradualTaskExecutor} on which to destroy.
* @return Future that gets completed when the destruction operation finishes.
* @throws StorageException If something goes wrong.
*/
public final CompletableFuture<Void> startDestructionOn(GradualTaskExecutor executor) throws StorageException {
try {
int maxWorkUnits = isVolatile
? VolatilePageMemoryStorageEngine.MAX_DESTRUCTION_WORK_UNITS
: PersistentPageMemoryStorageEngine.MAX_DESTRUCTION_WORK_UNITS;
return executor.execute(createDestructionTask(maxWorkUnits))
.whenComplete((res, e) -> {
if (e != null) {
LOG.error("Unable to destroy index {}", e, indexId);
}
});
} catch (IgniteInternalCheckedException e) {
throw new StorageException("Unable to destroy index " + indexId, e);
}
}
/**
* Transitions this storage to the {@link StorageState#DESTROYED} state. Blocks the busy lock, but does not
* close the structures (they will have to be closed by calling {@link #closeStructures()}).
*
* @return {@code true} if this call actually made the transition and, hence, the caller must call {@link #closeStructures()}.
*/
public boolean transitionToDestroyedState() {
if (!transitionToTerminalState(StorageState.DESTROYED)) {
return false;
}
busyLock.block();
return true;
}
protected abstract GradualTask createDestructionTask(int maxWorkUnits) throws IgniteInternalCheckedException;
protected String createStorageInfo() {
return IgniteStringFormatter.format("indexId={}, partitionId={}", indexId, partitionId);
}
/**
* Closes internal structures.
*/
public void closeStructures() {
indexTree.close();
nextRowIdToBuild = null;
}
/**
* Updates the internal data structures of the storage on rebalance or cleanup.
*
* @param freeList Free list.
* @param indexTree Hash index tree instance.
* @throws StorageException If failed.
*/
public void updateDataStructures(IndexMetaTree indexMetaTree, FreeListImpl freeList, TreeT indexTree) {
throwExceptionIfStorageNotInCleanupOrRebalancedState(state.get(), this::createStorageInfo);
this.indexMetaTree = indexMetaTree;
this.freeList = freeList;
this.indexTree = indexTree;
try {
IndexMeta indexMeta = indexMetaTree.findOne(new IndexMetaKey(indexId), null);
assert indexMeta != null : indexId;
this.nextRowIdToBuild = getNextRowIdToBuild(indexMeta);
} catch (IgniteInternalCheckedException e) {
throw new StorageException("Error reading next row ID uuid to build: [{}]", e, createStorageInfo());
}
}
/** Constant that represents the absence of value in {@link ScanCursor}. Not equivalent to {@code null} value. */
private static final IndexRowKey NO_INDEX_ROW = () -> null;
/**
* Invoke a supplier that performs an operation that is not a data read.
*
* @param supplier Operation closure.
* @return Whatever the supplier returns.
*/
protected <T> T busyNonDataRead(Supplier<T> supplier) {
return busy(supplier, false);
}
/**
* Invoke a supplier that performs an operation that is a data read.
*
* @param supplier Operation closure.
* @return Whatever the supplier returns.
*/
protected <T> T busyDataRead(Supplier<T> supplier) {
return busy(supplier, true);
}
private <T> T busy(Supplier<T> supplier, boolean read) {
if (!busyLock.enterBusy()) {
throwExceptionDependingOnIndexStorageState(state.get(), read, createStorageInfo());
}
try {
return supplier.get();
} finally {
busyLock.leaveBusy();
}
}
/**
* Cursor that always returns up-to-date next element.
*
* @param <R> Type of the returned value.
*/
protected abstract class ScanCursor<R> implements PeekCursor<R> {
protected final TreeT localTree = indexTree;
private final @Nullable K lower;
private @Nullable Boolean hasNext;
/**
* Last row used in mapping in the {@link #next()} call. {@code null} upon cursor creation or after {@link #hasNext()} returned
* {@code null}.
*/
private @Nullable V treeRow;
/**
* Row used in the mapping of the latest {@link #peek()} call, that was performed after the last {@link #next()} call.
* {@link #NO_INDEX_ROW} if there was no such call.
*/
private @Nullable V peekedRow = (V) NO_INDEX_ROW;
protected ScanCursor(@Nullable K lower) {
this.lower = lower;
}
/**
* Maps value from the index tree into the required result.
*/
protected abstract R map(V value);
/**
* Check whether the passed value exceeds the upper bound for the scan.
*/
protected abstract boolean exceedsUpperBound(V value);
@Override
public void close() {
// No-op.
}
@Override
public boolean hasNext() {
return busyDataRead(() -> {
try {
return advanceIfNeededBusy();
} catch (IgniteInternalCheckedException e) {
throw new StorageException("Error while advancing the cursor", e);
}
});
}
@Override
public R next() {
return busyDataRead(() -> {
try {
if (!advanceIfNeededBusy()) {
throw new NoSuchElementException();
}
this.hasNext = null;
return map(treeRow);
} catch (IgniteInternalCheckedException e) {
throw new StorageException("Error while advancing the cursor", e);
}
});
}
@Override
public @Nullable R peek() {
return busyDataRead(() -> {
throwExceptionIfStorageInProgressOfRebalance(state.get(), AbstractPageMemoryIndexStorage.this::createStorageInfo);
try {
return map(peekBusy());
} catch (IgniteInternalCheckedException e) {
throw new StorageException("Error when peeking next element", e);
}
});
}
private @Nullable V peekBusy() throws IgniteInternalCheckedException {
if (hasNext != null) {
return treeRow;
}
if (treeRow == null) {
peekedRow = lower == null ? localTree.findFirst() : localTree.findNext(lower, true);
} else {
peekedRow = localTree.findNext(treeRow, false);
}
if (peekedRow != null && exceedsUpperBound(peekedRow)) {
peekedRow = null;
}
return peekedRow;
}
private boolean advanceIfNeededBusy() throws IgniteInternalCheckedException {
throwExceptionIfStorageInProgressOfRebalance(state.get(), AbstractPageMemoryIndexStorage.this::createStorageInfo);
if (hasNext != null) {
return hasNext;
}
treeRow = (peekedRow == NO_INDEX_ROW) ? peekBusy() : peekedRow;
peekedRow = (V) NO_INDEX_ROW;
hasNext = treeRow != null;
return hasNext;
}
}
protected void throwExceptionIfIndexIsNotBuilt() {
StorageUtils.throwExceptionIfIndexIsNotBuilt(nextRowIdToBuild, this::createStorageInfo);
}
}