blob: 53439cc0bdcd6762a9a2586d9843043bc0415a98 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.tx.storage.state.rocksdb;
import static java.nio.ByteOrder.BIG_ENDIAN;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import static org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbTableStorage.TABLE_PREFIX_SIZE_BYTES;
import static org.apache.ignite.internal.util.ByteUtils.bytesToLong;
import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
import static org.apache.ignite.internal.util.ByteUtils.putLongToBytes;
import static org.apache.ignite.internal.util.ByteUtils.toBytes;
import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_STATE_STORAGE_ERR;
import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_STATE_STORAGE_REBALANCE_ERR;
import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_STATE_STORAGE_STOPPED_ERR;
import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.ignite.internal.lang.IgniteBiTuple;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.rocksdb.RocksIteratorAdapter;
import org.apache.ignite.internal.rocksdb.RocksUtils;
import org.apache.ignite.internal.tx.TxMeta;
import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.jetbrains.annotations.Nullable;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.Slice;
import org.rocksdb.WriteBatch;
/**
* Tx state storage implementation based on RocksDB.
*/
public class TxStateRocksDbStorage implements TxStateStorage {
/** Prefix length for the payload. Consists of tableId (4 bytes) and partitionId (2 bytes), both in Big Endian. */
private static final int PREFIX_SIZE_BYTES = TABLE_PREFIX_SIZE_BYTES + Short.BYTES;
/** Size of the key in the storage. Consists of {@link #PREFIX_SIZE_BYTES} and a UUID (2x {@link Long#BYTES}. */
private static final int FULL_KEY_SIZE_BYES = PREFIX_SIZE_BYTES + 2 * Long.BYTES;
/** Partition id. */
private final int partitionId;
/** Transaction state table storage. */
private final TxStateRocksDbTableStorage tableStorage;
/** Collection of opened RocksDB iterators. */
private final Set<RocksIterator> iterators = ConcurrentHashMap.newKeySet();
/** Busy lock. */
private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
/** Database key for the last applied index+term. */
private final byte[] lastAppliedIndexAndTermKey;
/** Shared TX state storage. */
private final TxStateRocksDbSharedStorage sharedStorage;
/** Table ID. */
private final int tableId;
/** On-heap-cached last applied index value. */
private volatile long lastAppliedIndex;
/** On-heap-cached last applied term value. */
private volatile long lastAppliedTerm;
/** Current state of the storage. */
private final AtomicReference<StorageState> state = new AtomicReference<>(StorageState.RUNNABLE);
/**
* The constructor.
*
* @param partitionId Partition id.
* @param tableStorage Table storage.
*/
TxStateRocksDbStorage(
int partitionId,
TxStateRocksDbTableStorage tableStorage
) {
this.partitionId = partitionId;
this.tableStorage = tableStorage;
this.sharedStorage = tableStorage.sharedStorage;
this.tableId = tableStorage.id;
this.lastAppliedIndexAndTermKey = ByteBuffer.allocate(PREFIX_SIZE_BYTES).order(BIG_ENDIAN)
.putInt(tableId)
.putShort((short) partitionId)
.array();
}
/**
* Starts the storage.
*
* @throws IgniteInternalException In case when the operation has failed.
*/
public void start() {
busy(() -> {
byte[] indexAndTermBytes = readLastAppliedIndexAndTerm(sharedStorage.readOptions);
if (indexAndTermBytes != null) {
lastAppliedIndex = bytesToLong(indexAndTermBytes);
lastAppliedTerm = bytesToLong(indexAndTermBytes, Long.BYTES);
}
return null;
});
}
@Override
public @Nullable TxMeta get(UUID txId) {
return busy(() -> {
try {
throwExceptionIfStorageInProgressOfRebalance();
byte[] txMetaBytes = sharedStorage.db().get(txIdToKey(txId));
return txMetaBytes == null ? null : fromBytes(txMetaBytes);
} catch (RocksDBException e) {
throw new IgniteInternalException(
TX_STATE_STORAGE_ERR,
format("Failed to get a value from storage: [{}]", createStorageInfo()),
e
);
}
});
}
@Override
public void put(UUID txId, TxMeta txMeta) {
busy(() -> {
try {
sharedStorage.db().put(txIdToKey(txId), toBytes(txMeta));
return null;
} catch (RocksDBException e) {
throw new IgniteInternalException(
TX_STATE_STORAGE_ERR,
format("Failed to put a value into storage: [{}]", createStorageInfo()),
e
);
}
});
}
@Override
public boolean compareAndSet(UUID txId, @Nullable TxState txStateExpected, TxMeta txMeta, long commandIndex, long commandTerm) {
return busy(() -> {
try (WriteBatch writeBatch = new WriteBatch()) {
byte[] txIdBytes = txIdToKey(txId);
byte[] txMetaExistingBytes = sharedStorage.db().get(sharedStorage.readOptions, txIdToKey(txId));
boolean result;
if (txMetaExistingBytes == null && txStateExpected == null) {
writeBatch.put(txIdBytes, toBytes(txMeta));
result = true;
} else {
if (txMetaExistingBytes != null) {
TxMeta txMetaExisting = fromBytes(txMetaExistingBytes);
if (txMetaExisting.txState() == txStateExpected) {
writeBatch.put(txIdBytes, toBytes(txMeta));
result = true;
} else {
result = txMetaExisting.txState() == txMeta.txState()
&& Objects.equals(txMetaExisting.commitTimestamp(), txMeta.commitTimestamp());
}
} else {
result = false;
}
}
// If the store is in the process of rebalancing, then there is no need to update lastAppliedIndex and lastAppliedTerm.
// This is necessary to prevent a situation where, in the middle of the rebalance, the node will be restarted and we will
// have non-consistent storage. They will be updated by either #abortRebalance() or #finishRebalance(long, long).
if (state.get() != StorageState.REBALANCE) {
updateLastApplied(writeBatch, commandIndex, commandTerm);
}
sharedStorage.db().write(sharedStorage.writeOptions, writeBatch);
return result;
} catch (RocksDBException e) {
throw new IgniteInternalException(
TX_STATE_STORAGE_ERR,
format("Failed perform CAS operation over a value in storage: [{}]", createStorageInfo()),
e
);
}
});
}
@Override
public void remove(UUID txId, long commandIndex, long commandTerm) {
busy(() -> {
try (WriteBatch writeBatch = new WriteBatch()) {
throwExceptionIfStorageInProgressOfRebalance();
writeBatch.delete(txIdToKey(txId));
// If the store is in the process of rebalancing, then there is no need to update lastAppliedIndex and lastAppliedTerm.
// This is necessary to prevent a situation where, in the middle of the rebalance, the node will be restarted and we will
// have non-consistent storage. They will be updated by either #abortRebalance() or #finishRebalance(long, long).
if (state.get() != StorageState.REBALANCE) {
updateLastApplied(writeBatch, commandIndex, commandTerm);
}
sharedStorage.db().write(sharedStorage.writeOptions, writeBatch);
return null;
} catch (RocksDBException e) {
throw new IgniteInternalException(
TX_STATE_STORAGE_ERR,
format("Failed to remove a value from storage: [{}]", createStorageInfo()),
e
);
}
});
}
@Override
public Cursor<IgniteBiTuple<UUID, TxMeta>> scan() {
return busy(() -> {
throwExceptionIfStorageInProgressOfRebalance();
// This lower bound is the lowest possible key that goes after "lastAppliedIndexAndTermKey".
byte[] lowerBound = ByteBuffer.allocate(PREFIX_SIZE_BYTES + 1).order(BIG_ENDIAN)
.putInt(tableId)
.putShort((short) partitionId)
.put((byte) 0)
.array();
byte[] upperBound = partitionEndPrefix();
ReadOptions readOptions = new ReadOptions().setIterateUpperBound(new Slice(upperBound));
RocksIterator rocksIterator = sharedStorage.db().newIterator(readOptions);
iterators.add(rocksIterator);
try {
// Skip applied index value.
rocksIterator.seek(lowerBound);
} catch (Exception e) {
// Unlikely, but what if...
iterators.remove(rocksIterator);
rocksIterator.close();
throw e;
}
return new RocksIteratorAdapter<IgniteBiTuple<UUID, TxMeta>>(rocksIterator) {
@Override
protected IgniteBiTuple<UUID, TxMeta> decodeEntry(byte[] keyBytes, byte[] valueBytes) {
UUID key = keyToTxId(keyBytes);
TxMeta txMeta = fromBytes(valueBytes);
return new IgniteBiTuple<>(key, txMeta);
}
@Override
public boolean hasNext() {
return busy(() -> {
throwExceptionIfStorageInProgressOfRebalance();
return super.hasNext();
});
}
@Override
public IgniteBiTuple<UUID, TxMeta> next() {
return busy(() -> {
throwExceptionIfStorageInProgressOfRebalance();
return super.next();
});
}
@Override
public void close() {
iterators.remove(rocksIterator);
readOptions.close();
super.close();
}
};
});
}
@Override
public CompletableFuture<Void> flush() {
return busy(() -> sharedStorage.awaitFlush(true));
}
@Override
public long lastAppliedIndex() {
return lastAppliedIndex;
}
@Override
public long lastAppliedTerm() {
return lastAppliedTerm;
}
@Override
public void lastApplied(long lastAppliedIndex, long lastAppliedTerm) {
busy(() -> {
try {
throwExceptionIfStorageInProgressOfRebalance();
sharedStorage.db().put(lastAppliedIndexAndTermKey, indexAndTermToBytes(lastAppliedIndex, lastAppliedTerm));
this.lastAppliedIndex = lastAppliedIndex;
this.lastAppliedTerm = lastAppliedTerm;
return null;
} catch (RocksDBException e) {
throw new IgniteInternalException(
TX_STATE_STORAGE_ERR,
format("Failed to write applied index value to storage: [{}]", createStorageInfo()),
e
);
}
});
}
private static byte[] indexAndTermToBytes(long lastAppliedIndex, long lastAppliedTerm) {
byte[] bytes = new byte[2 * Long.BYTES];
putLongToBytes(lastAppliedIndex, bytes, 0);
putLongToBytes(lastAppliedTerm, bytes, Long.BYTES);
return bytes;
}
/**
* Reads the value of {@link #lastAppliedIndex} from the storage.
*
* @param readOptions Read options to be used for reading.
* @return The value of last applied index.
*/
private long readLastAppliedIndex(ReadOptions readOptions) {
byte[] bytes = readLastAppliedIndexAndTerm(readOptions);
if (bytes == null) {
return 0;
}
return bytesToLong(bytes);
}
private byte @Nullable [] readLastAppliedIndexAndTerm(ReadOptions readOptions) {
try {
return sharedStorage.db().get(readOptions, lastAppliedIndexAndTermKey);
} catch (RocksDBException e) {
throw new IgniteInternalException(
TX_STATE_STORAGE_ERR,
format("Failed to read applied term value from storage: [{}]", createStorageInfo()),
e
);
}
}
@Override
public void destroy() {
if (!tryToCloseStorageAndResources()) {
return;
}
try (WriteBatch writeBatch = new WriteBatch()) {
clearStorageData(writeBatch);
writeBatch.delete(lastAppliedIndexAndTermKey);
sharedStorage.db().write(sharedStorage.writeOptions, writeBatch);
} catch (Exception e) {
throw new IgniteInternalException(TX_STATE_STORAGE_ERR, format("Failed to destroy storage: [{}]", createStorageInfo()), e);
}
}
private byte[] partitionStartPrefix() {
return ByteBuffer.allocate(PREFIX_SIZE_BYTES).order(BIG_ENDIAN)
.putInt(tableId)
.putShort((short) (partitionId))
.array();
}
private byte[] partitionEndPrefix() {
return ByteBuffer.allocate(PREFIX_SIZE_BYTES).order(BIG_ENDIAN)
.putInt(tableId)
.putShort((short) (partitionId + 1))
.array();
}
private byte[] txIdToKey(UUID txId) {
return ByteBuffer.allocate(FULL_KEY_SIZE_BYES).order(BIG_ENDIAN)
.putInt(tableId)
.putShort((short) partitionId)
.putLong(txId.getMostSignificantBits())
.putLong(txId.getLeastSignificantBits())
.array();
}
private UUID keyToTxId(byte[] bytes) {
long msb = bytesToLong(bytes, PREFIX_SIZE_BYTES);
long lsb = bytesToLong(bytes, PREFIX_SIZE_BYTES + Long.BYTES);
return new UUID(msb, lsb);
}
@Override
public void close() {
tryToCloseStorageAndResources();
}
@Override
public CompletableFuture<Void> startRebalance() {
if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.REBALANCE)) {
throwExceptionDependingOnStorageState();
}
busyLock.block();
try (WriteBatch writeBatch = new WriteBatch()) {
clearStorageData(writeBatch);
updateLastApplied(writeBatch, REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS);
sharedStorage.db().write(sharedStorage.writeOptions, writeBatch);
return nullCompletedFuture();
} catch (Exception e) {
throw new IgniteInternalException(
TX_STATE_STORAGE_REBALANCE_ERR,
format("Failed to start rebalance: [{}]", createStorageInfo()),
e
);
} finally {
busyLock.unblock();
}
}
@Override
public CompletableFuture<Void> abortRebalance() {
if (state.get() != StorageState.REBALANCE) {
return nullCompletedFuture();
}
try (WriteBatch writeBatch = new WriteBatch()) {
clearStorageData(writeBatch);
writeBatch.delete(lastAppliedIndexAndTermKey);
sharedStorage.db().write(sharedStorage.writeOptions, writeBatch);
lastAppliedIndex = 0;
lastAppliedTerm = 0;
state.set(StorageState.RUNNABLE);
} catch (Exception e) {
throw new IgniteInternalException(
TX_STATE_STORAGE_REBALANCE_ERR,
format("Failed to abort rebalance: [{}]", createStorageInfo()),
e
);
}
return nullCompletedFuture();
}
@Override
public CompletableFuture<Void> finishRebalance(long lastAppliedIndex, long lastAppliedTerm) {
if (state.get() != StorageState.REBALANCE) {
throw new IgniteInternalException(
TX_STATE_STORAGE_REBALANCE_ERR,
format("Rebalancing has not started: [{}]", createStorageInfo())
);
}
try (WriteBatch writeBatch = new WriteBatch()) {
updateLastApplied(writeBatch, lastAppliedIndex, lastAppliedTerm);
sharedStorage.db().write(sharedStorage.writeOptions, writeBatch);
state.set(StorageState.RUNNABLE);
} catch (Exception e) {
throw new IgniteInternalException(
TX_STATE_STORAGE_REBALANCE_ERR,
format("Failed to finish rebalance: [{}]", createStorageInfo()),
e
);
}
return nullCompletedFuture();
}
@Override
public CompletableFuture<Void> clear() {
if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.CLEANUP)) {
throwExceptionDependingOnStorageState();
}
// We changed the status and wait for all current operations (together with cursors) with the storage to be completed.
busyLock.block();
try (WriteBatch writeBatch = new WriteBatch()) {
clearStorageData(writeBatch);
updateLastApplied(writeBatch, 0, 0);
sharedStorage.db().write(sharedStorage.writeOptions, writeBatch);
return nullCompletedFuture();
} catch (RocksDBException e) {
throw new IgniteInternalException(
TX_STATE_STORAGE_ERR,
format("Failed to cleanup storage: [{}]", createStorageInfo()),
e
);
} finally {
state.set(StorageState.RUNNABLE);
busyLock.unblock();
}
}
private void clearStorageData(WriteBatch writeBatch) throws RocksDBException {
writeBatch.deleteRange(partitionStartPrefix(), partitionEndPrefix());
}
private void updateLastApplied(WriteBatch writeBatch, long lastAppliedIndex, long lastAppliedTerm) throws RocksDBException {
writeBatch.put(lastAppliedIndexAndTermKey, indexAndTermToBytes(lastAppliedIndex, lastAppliedTerm));
this.lastAppliedIndex = lastAppliedIndex;
this.lastAppliedTerm = lastAppliedTerm;
}
/**
* Tries to close the storage with resources if it hasn't already been closed.
*
* @return {@code True} if the storage was successfully closed, otherwise the storage has already been closed.
*/
private boolean tryToCloseStorageAndResources() {
if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.CLOSED)) {
StorageState state = this.state.get();
assert state == StorageState.CLOSED : state;
return false;
}
busyLock.block();
RocksUtils.closeAll(iterators);
iterators.clear();
return true;
}
private void throwExceptionIfStorageInProgressOfRebalance() {
if (state.get() == StorageState.REBALANCE) {
throw createStorageInProgressOfRebalanceException();
}
}
private IgniteInternalException createStorageInProgressOfRebalanceException() {
return new IgniteInternalException(
TX_STATE_STORAGE_REBALANCE_ERR,
format("Storage is in the process of rebalance: [{}]", createStorageInfo())
);
}
private void throwExceptionDependingOnStorageState() {
StorageState state = this.state.get();
switch (state) {
case CLOSED:
throw new IgniteInternalException(
TX_STATE_STORAGE_STOPPED_ERR,
format("Transaction state storage is stopped: [{}]", createStorageInfo())
);
case REBALANCE:
throw createStorageInProgressOfRebalanceException();
case CLEANUP:
throw new IgniteInternalException(
TX_STATE_STORAGE_ERR,
format("Storage is in the process of cleanup: [{}]", createStorageInfo())
);
default:
throw new IgniteInternalException(
TX_STATE_STORAGE_ERR,
format("Unexpected state: [{}, state={}]", createStorageInfo(), state)
);
}
}
private String createStorageInfo() {
return "table=" + tableStorage.id + ", partitionId=" + partitionId;
}
private <V> V busy(Supplier<V> supplier) {
if (!busyLock.enterBusy()) {
throwExceptionDependingOnStorageState();
}
try {
return supplier.get();
} finally {
busyLock.leaveBusy();
}
}
/**
* Storage states.
*/
private enum StorageState {
/** Storage is running. */
RUNNABLE,
/** Storage is in the process of being closed or has already closed. */
CLOSED,
/** Storage is in the process of being rebalanced. */
REBALANCE,
/** Storage is in the process of cleanup. */
CLEANUP
}
}