blob: 7a4102656e3f4e078f39424da6f1347356e0f3f0 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.ignite.internal.metastorage.server.persistence;
import static org.apache.ignite.internal.metastorage.server.Value.TOMBSTONE;
import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.appendLong;
import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.bytesToLong;
import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.bytesToValue;
import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.getAsLongs;
import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.keyToRocksKey;
import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.longToBytes;
import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.longsToBytes;
import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.revisionFromRocksKey;
import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.rocksKeyToBytes;
import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.valueToBytes;
import static org.apache.ignite.internal.metastorage.server.persistence.StorageColumnFamilyType.DATA;
import static org.apache.ignite.internal.metastorage.server.persistence.StorageColumnFamilyType.INDEX;
import static org.apache.ignite.internal.metastorage.server.persistence.StorageColumnFamilyType.REVISION_TO_TS;
import static org.apache.ignite.internal.metastorage.server.persistence.StorageColumnFamilyType.TS_TO_REVISION;
import static org.apache.ignite.internal.rocksdb.RocksUtils.incrementPrefix;
import static org.apache.ignite.internal.rocksdb.snapshot.ColumnFamilyRange.fullRange;
import static org.apache.ignite.internal.util.ArrayUtils.LONG_EMPTY_ARRAY;
import static org.apache.ignite.lang.ErrorGroups.MetaStorage.COMPACTION_ERR;
import static org.apache.ignite.lang.ErrorGroups.MetaStorage.OP_EXECUTION_ERR;
import static org.apache.ignite.lang.ErrorGroups.MetaStorage.RESTORING_STORAGE_ERR;
import static org.apache.ignite.lang.ErrorGroups.MetaStorage.STARTING_STORAGE_ERR;
import static org.rocksdb.util.SizeUnit.MB;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.LongConsumer;
import java.util.function.Predicate;
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.RevisionUpdateListener;
import org.apache.ignite.internal.metastorage.WatchListener;
import org.apache.ignite.internal.metastorage.dsl.Operation;
import org.apache.ignite.internal.metastorage.dsl.StatementResult;
import org.apache.ignite.internal.metastorage.dsl.Update;
import org.apache.ignite.internal.metastorage.exceptions.MetaStorageException;
import org.apache.ignite.internal.metastorage.impl.EntryImpl;
import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
import org.apache.ignite.internal.metastorage.server.Condition;
import org.apache.ignite.internal.metastorage.server.If;
import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
import org.apache.ignite.internal.metastorage.server.OnRevisionAppliedCallback;
import org.apache.ignite.internal.metastorage.server.Statement;
import org.apache.ignite.internal.metastorage.server.Value;
import org.apache.ignite.internal.metastorage.server.Watch;
import org.apache.ignite.internal.metastorage.server.WatchProcessor;
import org.apache.ignite.internal.rocksdb.ColumnFamily;
import org.apache.ignite.internal.rocksdb.RocksIteratorAdapter;
import org.apache.ignite.internal.rocksdb.RocksUtils;
import org.apache.ignite.internal.rocksdb.snapshot.RocksSnapshotManager;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.IgniteUtils;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
import org.rocksdb.AbstractNativeReference;
import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.BloomFilter;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
import org.rocksdb.LRUCache;
import org.rocksdb.Options;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.Slice;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;
* Key-value storage based on RocksDB. Keys are stored with revision. Values are stored in the default column family with an update counter
* and a boolean flag which represents whether this record is a tombstone.
* <br>
* Key: [8 bytes revision, N bytes key itself].
* <br>
* Value: [8 bytes update counter, 1 byte tombstone flag, N bytes value].
* <br>
* The mapping from the key to the set of the storage's revisions is stored in the "index" column family. A key represents the key of an
* entry and the value is a {@code byte[]} that represents a {@code long[]} where every item is a revision of the storage.
public class RocksDbKeyValueStorage implements KeyValueStorage {
private static final IgniteLogger LOG = Loggers.forClass(RocksDbKeyValueStorage.class);
/** A revision to store with system entries. */
private static final long SYSTEM_REVISION_MARKER_VALUE = 0;
/** Revision key. */
private static final byte[] REVISION_KEY = keyToRocksKey(
/** Update counter key. */
private static final byte[] UPDATE_COUNTER_KEY = keyToRocksKey(
/** Lexicographic order comparator. */
private static final Comparator<byte[]> CMP = Arrays::compareUnsigned;
static {
/** RW lock. */
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
/** Thread-pool for snapshot operations execution. */
private final ExecutorService snapshotExecutor;
/** Path to the rocksdb database. */
private final Path dbPath;
/** RockDB options. */
private volatile DBOptions options;
/** RocksDb instance. */
private volatile RocksDB db;
/** Data column family. */
private volatile ColumnFamily data;
/** Index column family. */
private volatile ColumnFamily index;
/** Timestamp to revision mapping column family. */
private volatile ColumnFamily tsToRevision;
/** Revision to timestamp mapping column family. */
private volatile ColumnFamily revisionToTs;
/** Snapshot manager. */
private volatile RocksSnapshotManager snapshotManager;
* Revision listener for recovery only. Notifies {@link MetaStorageManagerImpl} of revision update.
* Guarded by {@link #rwLock}.
private @Nullable LongConsumer recoveryRevisionListener;
* Revision. Will be incremented for each single-entry or multi-entry update operation.
* <p>Multi-threaded access is guarded by {@link #rwLock}.
private long rev;
* Update counter. Will be incremented for each update of any particular entry.
* <p>Multi-threaded access is guarded by {@link #rwLock}.
private long updCntr;
/** Watch processor. */
private final WatchProcessor watchProcessor;
/** Status of the watch recovery process. */
private enum RecoveryStatus {
* Current status of the watch recovery process. Watch recovery is needed for replaying missed updated when {@link #startWatches}
* is called.
private final AtomicReference<RecoveryStatus> recoveryStatus = new AtomicReference<>(RecoveryStatus.INITIAL);
* Buffer used to cache new events while an event replay is in progress. After replay finishes, the cache gets drained and is never
* used again.
* <p>Multi-threaded access is guarded by {@link #rwLock}.
private List<UpdatedEntries> eventCache;
* Current list of updated entries.
* <p>Since this list gets read and updated only on writes (under a write lock), no extra synchronisation is needed.
private final UpdatedEntries updatedEntries = new UpdatedEntries();
/** Tracks RocksDb resources that must be properly closed. */
private List<AbstractNativeReference> rocksResources = new ArrayList<>();
* Constructor.
* @param nodeName Node name.
* @param dbPath RocksDB path.
public RocksDbKeyValueStorage(String nodeName, Path dbPath, FailureProcessor failureProcessor) {
this.dbPath = dbPath;
this.watchProcessor = new WatchProcessor(nodeName, this::get, failureProcessor);
this.snapshotExecutor = Executors.newFixedThreadPool(2, NamedThreadFactory.create(nodeName, "metastorage-snapshot-executor", LOG));
public void start() {
try {
// Delete existing data, relying on the raft's snapshot and log playback
} catch (IOException | RocksDBException e) {
throw new MetaStorageException(STARTING_STORAGE_ERR, "Failed to start the storage", e);
} finally {
private List<ColumnFamilyDescriptor> cfDescriptors() {
Options baseOptions = new Options()
// Lowering the desired number of levels will, on average, lead to less lookups in files, making reads faster.
// Protect ourselves from slower flushes during the peak write load.
.setTableFormatConfig(new BlockBasedTableConfig()
// Speed-up key lookup in levels by adding a bloom filter and always caching it for level 0.
// This improves the access time to keys from lower levels. 12 is chosen to fit into a 4kb memory chunk.
// This proved to be big enough to positively affect the performance.
.setFilterPolicy(new BloomFilter(12))
// Often helps to avoid reading data from the storage device, making reads faster.
.setBlockCache(new LRUCache(64 * MB))
ColumnFamilyOptions dataFamilyOptions = new ColumnFamilyOptions(baseOptions)
// The prefix is the revision of an entry, so prefix length is the size of a long
ColumnFamilyOptions indexFamilyOptions = new ColumnFamilyOptions(baseOptions);
ColumnFamilyOptions tsToRevFamilyOptions = new ColumnFamilyOptions(baseOptions);
ColumnFamilyOptions revToTsFamilyOptions = new ColumnFamilyOptions(baseOptions);
return List.of(
new ColumnFamilyDescriptor(DATA.nameAsBytes(), dataFamilyOptions),
new ColumnFamilyDescriptor(INDEX.nameAsBytes(), indexFamilyOptions),
new ColumnFamilyDescriptor(TS_TO_REVISION.nameAsBytes(), tsToRevFamilyOptions),
new ColumnFamilyDescriptor(REVISION_TO_TS.nameAsBytes(), revToTsFamilyOptions)
protected DBOptions createDbOptions() {
DBOptions options = new DBOptions()
return options;
protected void createDb() throws RocksDBException {
List<ColumnFamilyDescriptor> descriptors = cfDescriptors();
assert descriptors.size() == 4;
var handles = new ArrayList<ColumnFamilyHandle>(descriptors.size());
options = createDbOptions();
db =, dbPath.toAbsolutePath().toString(), descriptors, handles);
data = ColumnFamily.wrap(db, handles.get(0));
index = ColumnFamily.wrap(db, handles.get(1));
tsToRevision = ColumnFamily.wrap(db, handles.get(2));
revisionToTs = ColumnFamily.wrap(db, handles.get(3));
snapshotManager = new RocksSnapshotManager(db,
List.of(fullRange(data), fullRange(index), fullRange(tsToRevision), fullRange(revisionToTs)),
byte[] revision = data.get(REVISION_KEY);
if (revision != null) {
rev = ByteUtils.bytesToLong(revision);
* Notifies of revision update.
* Must be called under the {@link #rwLock}.
private void notifyRevisionUpdate() {
if (recoveryRevisionListener != null) {
// Listener must be invoked only on recovery, after recovery listener must be null.
* Clear the RocksDB instance.
* @throws IOException If failed.
protected void destroyRocksDb() throws IOException {
// For unknown reasons, RocksDB#destroyDB(String, Options) throws RocksDBException with ".../LOCK: No such file or directory".
public void close() {
IgniteUtils.shutdownAndAwaitTermination(snapshotExecutor, 10, TimeUnit.SECONDS);
try {
} finally {
private void closeRocksResources() {
this.rocksResources = new ArrayList<>();
public CompletableFuture<Void> snapshot(Path snapshotPath) {
return snapshotManager.createSnapshot(snapshotPath);
public void restoreSnapshot(Path path) {
long currentRevision;
try {
// there's no way to easily remove all data from RocksDB, so we need to re-create it from scratch
currentRevision = bytesToLong(data.get(REVISION_KEY));
rev = currentRevision;
updCntr = bytesToLong(data.get(UPDATE_COUNTER_KEY));
} catch (Exception e) {
throw new MetaStorageException(RESTORING_STORAGE_ERR, "Failed to restore snapshot", e);
} finally {
public long revision() {
try {
return rev;
} finally {
public long updateCounter() {
try {
return updCntr;
} finally {
public void put(byte[] key, byte[] value, HybridTimestamp opTs) {
try (WriteBatch batch = new WriteBatch()) {
long curRev = rev + 1;
long cntr = updCntr + 1;
addDataToBatch(batch, key, value, curRev, cntr);
updateKeysIndex(batch, key, curRev);
fillAndWriteBatch(batch, curRev, cntr, opTs);
} catch (RocksDBException e) {
throw new MetaStorageException(OP_EXECUTION_ERR, e);
} finally {
* Adds a revision to the keys index.
* @param batch Write batch.
* @param key Key.
* @param curRev New revision for key.
private void updateKeysIndex(WriteBatch batch, byte[] key, long curRev) {
try {
// Get the revisions current value
byte @Nullable [] array = index.get(key);
// Store the new value
index.put(batch, key, appendLong(array, curRev));
} catch (RocksDBException e) {
throw new MetaStorageException(OP_EXECUTION_ERR, e);
* Fills the batch with system values (the update counter and the revision) and writes it to the db.
* @param batch Write batch.
* @param newRev New revision.
* @param newCntr New update counter.
* @param ts Operation's timestamp.
* @throws RocksDBException If failed.
private void fillAndWriteBatch(WriteBatch batch, long newRev, long newCntr, @Nullable HybridTimestamp ts) throws RocksDBException {
// Meta-storage recovery is based on the snapshot & external log. WAL is never used for recovery, and can be safely disabled.
try (WriteOptions opts = new WriteOptions().setDisableWAL(true)) {
byte[] revisionBytes = longToBytes(newRev);
data.put(batch, UPDATE_COUNTER_KEY, longToBytes(newCntr));
data.put(batch, REVISION_KEY, revisionBytes);
if (ts != null) {
byte[] tsBytes = hybridTsToArray(ts);
tsToRevision.put(batch, tsBytes, revisionBytes);
revisionToTs.put(batch, revisionBytes, tsBytes);
db.write(opts, batch);
rev = newRev;
updCntr = newCntr;
updatedEntries.ts = ts;
private static byte[] hybridTsToArray(HybridTimestamp ts) {
return longToBytes(ts.longValue());
private static Entry entry(byte[] key, long revision, Value value) {
return value.tombstone()
? EntryImpl.tombstone(key, revision, value.updateCounter())
: new EntryImpl(key, value.bytes(), revision, value.updateCounter());
public void putAll(List<byte[]> keys, List<byte[]> values, HybridTimestamp opTs) {
try (WriteBatch batch = new WriteBatch()) {
long curRev = rev + 1;
long counter = addAllToBatch(batch, keys, values, curRev);
for (byte[] key : keys) {
updateKeysIndex(batch, key, curRev);
fillAndWriteBatch(batch, curRev, counter, opTs);
} catch (RocksDBException e) {
throw new MetaStorageException(OP_EXECUTION_ERR, e);
} finally {
public Entry get(byte[] key) {
try {
return doGet(key, rev);
} finally {
public Entry get(byte[] key, long revUpperBound) {
try {
return doGet(key, revUpperBound);
} finally {
public List<Entry> get(byte[] key, long revLowerBound, long revUpperBound) {
try {
return doGet(key, revLowerBound, revUpperBound);
} finally {
public Collection<Entry> getAll(List<byte[]> keys) {
try {
return doGetAll(keys, rev);
} finally {
public Collection<Entry> getAll(List<byte[]> keys, long revUpperBound) {
try {
return doGetAll(keys, revUpperBound);
} finally {
public void remove(byte[] key, HybridTimestamp opTs) {
try (WriteBatch batch = new WriteBatch()) {
long curRev = rev + 1;
long counter = updCntr + 1;
if (addToBatchForRemoval(batch, key, curRev, counter)) {
updateKeysIndex(batch, key, curRev);
fillAndWriteBatch(batch, curRev, counter, opTs);
} catch (RocksDBException e) {
throw new MetaStorageException(OP_EXECUTION_ERR, e);
} finally {
public void removeAll(List<byte[]> keys, HybridTimestamp opTs) {
try (WriteBatch batch = new WriteBatch()) {
long curRev = rev + 1;
List<byte[]> existingKeys = new ArrayList<>(keys.size());
long counter = updCntr;
for (byte[] key : keys) {
if (addToBatchForRemoval(batch, key, curRev, counter + 1)) {
for (byte[] key : existingKeys) {
updateKeysIndex(batch, key, curRev);
fillAndWriteBatch(batch, curRev, counter, opTs);
} catch (RocksDBException e) {
throw new MetaStorageException(OP_EXECUTION_ERR, e);
} finally {
public boolean invoke(Condition condition, Collection<Operation> success, Collection<Operation> failure, HybridTimestamp opTs) {
try {
Entry[] entries = getAll(Arrays.asList(condition.keys())).toArray(new Entry[]{});
boolean branch = condition.test(entries);
Collection<Operation> ops = branch ? success : failure;
applyOperations(ops, opTs);
return branch;
} catch (RocksDBException e) {
throw new MetaStorageException(OP_EXECUTION_ERR, e);
} finally {
public StatementResult invoke(If iif, HybridTimestamp opTs) {
try {
If currIf = iif;
byte maximumNumOfNestedBranch = 100;
while (true) {
if (maximumNumOfNestedBranch-- <= 0) {
throw new MetaStorageException(
"Too many nested (" + maximumNumOfNestedBranch + ") statements in multi-invoke command.");
Entry[] entries = getAll(Arrays.asList(currIf.cond().keys())).toArray(new Entry[]{});
Statement branch = (currIf.cond().test(entries)) ? currIf.andThen() : currIf.orElse();
if (branch.isTerminal()) {
Update update = branch.update();
applyOperations(update.operations(), opTs);
return update.result();
} else {
currIf = branch.iif();
} catch (RocksDBException e) {
throw new MetaStorageException(OP_EXECUTION_ERR, e);
} finally {
private void applyOperations(Collection<Operation> ops, HybridTimestamp opTs) throws RocksDBException {
long curRev = rev + 1;
boolean modified = false;
long counter = updCntr;
List<byte[]> updatedKeys = new ArrayList<>();
try (WriteBatch batch = new WriteBatch()) {
for (Operation op : ops) {
byte[] key = op.key();
switch (op.type()) {
case PUT:
addDataToBatch(batch, key, op.value(), curRev, counter);
modified = true;
case REMOVE:
boolean removed = addToBatchForRemoval(batch, key, curRev, counter);
if (!removed) {
} else {
modified |= removed;
case NO_OP:
throw new MetaStorageException(OP_EXECUTION_ERR, "Unknown operation type: " + op.type());
if (modified) {
for (byte[] key : updatedKeys) {
updateKeysIndex(batch, key, curRev);
fillAndWriteBatch(batch, curRev, counter, opTs);
public Cursor<Entry> range(byte[] keyFrom, byte @Nullable [] keyTo) {
try {
return range(keyFrom, keyTo, rev);
} finally {
public Cursor<Entry> range(byte[] keyFrom, byte @Nullable [] keyTo, long revUpperBound) {
try {
var readOpts = new ReadOptions();
var upperBound = keyTo == null ? null : new Slice(keyTo);
RocksIterator iterator = index.newIterator(readOpts);;
return new RocksIteratorAdapter<>(iterator) {
/** Cached entry used to filter "empty" values. */
private Entry next;
public boolean hasNext() {
if (next != null) {
return true;
while (next == null && super.hasNext()) {
Entry nextCandidate = decodeEntry(it.key(), it.value());;
if (!nextCandidate.empty()) {
next = nextCandidate;
return true;
return false;
public Entry next() {
if (!hasNext()) {
throw new NoSuchElementException();
Entry result = next;
assert result != null;
next = null;
return result;
protected Entry decodeEntry(byte[] key, byte[] value) {
long[] revisions = getAsLongs(value);
long targetRevision = maxRevision(revisions, revUpperBound);
if (targetRevision == -1) {
return EntryImpl.empty(key);
// This is not a correct approach for using locks in terms of compaction correctness (we should block compaction for the
// whole iteration duration). However, compaction is not fully implemented yet, so this lock is taken for consistency
// sake. This part must be rewritten in the future.
try {
return doGetValue(key, targetRevision);
} finally {
public void close() {
RocksUtils.closeAll(readOpts, upperBound);
} finally {
public void watchRange(byte[] keyFrom, byte @Nullable [] keyTo, long rev, WatchListener listener) {
assert keyFrom != null : "keyFrom couldn't be null.";
assert rev > 0 : "rev must be positive.";
Predicate<byte[]> rangePredicate = keyTo == null
? k ->, k) <= 0
: k ->, k) <= 0 &&, k) > 0;
watchProcessor.addWatch(new Watch(rev, listener, rangePredicate));
public void watchExact(byte[] key, long rev, WatchListener listener) {
assert key != null : "key couldn't be null.";
assert rev > 0 : "rev must be positive.";
Predicate<byte[]> exactPredicate = k ->, key) == 0;
watchProcessor.addWatch(new Watch(rev, listener, exactPredicate));
public void watchExact(Collection<byte[]> keys, long rev, WatchListener listener) {
assert keys != null && !keys.isEmpty() : "keys couldn't be null or empty: " + keys;
assert rev > 0 : "rev must be positive.";
TreeSet<byte[]> keySet = new TreeSet<>(CMP);
Predicate<byte[]> inPredicate = keySet::contains;
watchProcessor.addWatch(new Watch(rev, listener, inPredicate));
public void startWatches(long startRevision, OnRevisionAppliedCallback revisionCallback) {
assert startRevision != 0 : "First meaningful revision is 1";
long currentRevision;
try {
currentRevision = rev;
// We update the recovery status under the read lock in order to avoid races between starting watches and applying a snapshot
// or concurrent writes. Replay of events can be done outside of the read lock relying on RocksDB snapshot isolation.
if (currentRevision == 0) {
} else {
// If revision is not 0, we need to replay updates that match the existing data.
} finally {
if (currentRevision != 0) {
replayUpdates(startRevision, currentRevision);
public void removeWatch(WatchListener listener) {
public void compact(HybridTimestamp lowWatermark) {
try (WriteBatch batch = new WriteBatch()) {
byte[] tsBytes = hybridTsToArray(lowWatermark);
long maxRevision;
// Find a revision with timestamp lesser or equal to the watermark.
try (RocksIterator rocksIterator = tsToRevision.newIterator()) {
byte[] tsValue = rocksIterator.value();
if (tsValue.length == 0) {
// Nothing to compact yet.
maxRevision = bytesToLong(tsValue);
try (RocksIterator iterator = index.newIterator()) {
RocksUtils.forEach(iterator, (key, value) -> compactForKey(batch, key, getAsLongs(value), maxRevision));
fillAndWriteBatch(batch, rev, updCntr, null);
} catch (RocksDBException e) {
throw new MetaStorageException(COMPACTION_ERR, e);
} finally {
public byte @Nullable [] nextKey(byte[] key) {
return incrementPrefix(key);
* Adds a key to a batch marking the value as a tombstone.
* @param batch Write batch.
* @param key Target key.
* @param curRev Revision.
* @param counter Update counter.
* @return {@code true} if an entry can be deleted.
* @throws RocksDBException If failed.
private boolean addToBatchForRemoval(WriteBatch batch, byte[] key, long curRev, long counter) throws RocksDBException {
Entry e = doGet(key, curRev);
if (e.empty() || e.tombstone()) {
return false;
addDataToBatch(batch, key, TOMBSTONE, curRev, counter);
return true;
* Compacts all entries by the given key, removing revision that are no longer needed.
* Last entry with a revision lesser or equal to the {@code minRevisionToKeep} and all consecutive entries will be preserved.
* If the first entry to keep is a tombstone, it will be removed.
* Example:
* <pre>
* Example 1:
* put entry1: revision 5
* put entry2: revision 7
* do compaction: revision 6
* entry1: exists
* entry2: exists
* Example 2:
* put entry1: revision 5
* put entry2: revision 7
* do compaction: revision 7
* entry1: doesn't exist
* entry2: exists
* </pre>
* @param batch Write batch.
* @param key Target key.
* @param revs Revisions.
* @param minRevisionToKeep Minimum revision that should be kept.
* @throws RocksDBException If failed.
private void compactForKey(WriteBatch batch, byte[] key, long[] revs, long minRevisionToKeep) throws RocksDBException {
if (revs.length < 2) {
// If we have less than two revisions, there is no point in compaction.
// Index of the first revision we will be keeping in the array of revisions.
int idxToKeepFrom = 0;
// Whether there is an entry with the minRevisionToKeep.
boolean hasMinRevision = false;
// Traverse revisions, looking for the first revision that needs to be kept.
for (long rev : revs) {
if (rev >= minRevisionToKeep) {
if (rev == minRevisionToKeep) {
hasMinRevision = true;
if (!hasMinRevision) {
// Minimal revision was not encountered, that mean that we are between revisions of a key, so previous revision
// must be preserved.
if (idxToKeepFrom <= 0) {
// All revisions are still in use.
for (int i = 0; i < idxToKeepFrom; i++) {
// This revision is not needed anymore, remove data.
data.delete(batch, keyToRocksKey(revs[i], key));
// Whether we only have last revision (even if it's lesser or equal to watermark).
boolean onlyLastRevisionLeft = idxToKeepFrom == (revs.length - 1);
// Get the number of the first revision that will be kept.
long rev = onlyLastRevisionLeft ? lastRevision(revs) : revs[idxToKeepFrom];
byte[] rocksKey = keyToRocksKey(rev, key);
Value value = bytesToValue(data.get(rocksKey));
if (value.tombstone()) {
// The first revision we are going to keep is a tombstone, we may delete it.
data.delete(batch, rocksKey);
if (!onlyLastRevisionLeft) {
// First revision was a tombstone, but there are other revisions, that need to be kept,
// so advance index of the first revision we need to keep.
if (onlyLastRevisionLeft && value.tombstone()) {
// We don't have any previous revisions for this entry and the single existing is a tombstone,
// so we can remove it from index.
index.delete(batch, key);
} else {
// Keeps revisions starting with idxToKeepFrom.
index.put(batch, key, longsToBytes(revs, idxToKeepFrom));
* Gets all entries with given keys and a revision.
* @param keys Target keys.
* @param rev Target revision.
* @return Collection of entries.
private Collection<Entry> doGetAll(Collection<byte[]> keys, long rev) {
assert keys != null : "keys list can't be null.";
assert !keys.isEmpty() : "keys list can't be empty.";
assert rev >= 0;
var res = new ArrayList<Entry>(keys.size());
for (byte[] key : keys) {
res.add(doGet(key, rev));
return res;
* Gets the value by key and revision.
* @param key Target key.
* @param revUpperBound Target upper bound of revision.
* @return Value.
private Entry doGet(byte[] key, long revUpperBound) {
assert revUpperBound >= 0 : "Invalid arguments: [revUpperBound=" + revUpperBound + ']';
long[] revs;
try {
revs = getRevisions(key);
} catch (RocksDBException e) {
throw new MetaStorageException(OP_EXECUTION_ERR, e);
if (revs.length == 0) {
return EntryImpl.empty(key);
long lastRev = maxRevision(revs, revUpperBound);
// lastRev can be -1 if maxRevision return -1.
if (lastRev == -1) {
return EntryImpl.empty(key);
return doGetValue(key, lastRev);
* Returns all entries corresponding to the given key and bounded by given revisions.
* All these entries are ordered by revisions and have the same key.
* The lower bound and the upper bound are inclusive.
* @param key The key.
* @param revLowerBound The lower bound of revision.
* @param revUpperBound The upper bound of revision.
* @return Entries corresponding to the given key.
private List<Entry> doGet(byte[] key, long revLowerBound, long revUpperBound) {
assert revLowerBound >= 0 : "Invalid arguments: [revLowerBound=" + revLowerBound + ']';
assert revUpperBound >= 0 : "Invalid arguments: [revUpperBound=" + revUpperBound + ']';
assert revUpperBound >= revLowerBound
: "Invalid arguments: [revLowerBound=" + revLowerBound + ", revUpperBound=" + revUpperBound + ']';
// TODO: IGNITE-19782 throw CompactedException if revLowerBound is compacted.
long[] revs;
try {
revs = getRevisions(key);
} catch (RocksDBException e) {
throw new MetaStorageException(OP_EXECUTION_ERR, e);
if (revs.length == 0) {
return Collections.emptyList();
int firstRevIndex = minRevisionIndex(revs, revLowerBound);
int lastRevIndex = maxRevisionIndex(revs, revUpperBound);
// firstRevIndex can be -1 if minRevisionIndex return -1. lastRevIndex can be -1 if maxRevisionIndex return -1.
if (firstRevIndex == -1 || lastRevIndex == -1) {
return Collections.emptyList();
List<Entry> entries = new ArrayList<>();
for (int i = firstRevIndex; i <= lastRevIndex; i++) {
entries.add(doGetValue(key, revs[i]));
return entries;
* Get a list of the revisions of the entry corresponding to the key.
* @param key Key.
* @return Array of revisions.
* @throws RocksDBException If failed to perform {@link RocksDB#get(ColumnFamilyHandle, byte[])}.
private long[] getRevisions(byte[] key) throws RocksDBException {
byte[] revisions = index.get(key);
if (revisions == null) {
return getAsLongs(revisions);
* Returns maximum revision which must be less or equal to {@code upperBoundRev}. If there is no such revision then {@code -1} will be
* returned.
* @param revs Revisions list.
* @param upperBoundRev Revision upper bound.
* @return Maximum revision or {@code -1} if there is no such revision.
private static long maxRevision(long[] revs, long upperBoundRev) {
for (int i = revs.length - 1; i >= 0; i--) {
long rev = revs[i];
if (rev <= upperBoundRev) {
return rev;
return -1;
* Returns index of minimum revision which must be greater or equal to {@code lowerBoundRev}.
* If there is no such revision then {@code -1} will be returned.
* @param revs Revisions list.
* @param lowerBoundRev Revision lower bound.
* @return Index of minimum revision or {@code -1} if there is no such revision.
private static int minRevisionIndex(long[] revs, long lowerBoundRev) {
for (int i = 0; i < revs.length; i++) {
long rev = revs[i];
if (rev >= lowerBoundRev) {
return i;
return -1;
* Returns index of maximum revision which must be less or equal to {@code upperBoundRev}.
* If there is no such revision then {@code -1} will be returned.
* @param revs Revisions list.
* @param upperBoundRev Revision upper bound.
* @return Index of maximum revision or {@code -1} if there is no such revision.
private static int maxRevisionIndex(long[] revs, long upperBoundRev) {
for (int i = revs.length - 1; i >= 0; i--) {
long rev = revs[i];
if (rev <= upperBoundRev) {
return i;
return -1;
* Gets the value by a key and a revision.
* @param key Target key.
* @param revision Target revision.
* @return Entry.
private Entry doGetValue(byte[] key, long revision) {
if (revision == 0) {
return EntryImpl.empty(key);
byte[] valueBytes;
try {
valueBytes = data.get(keyToRocksKey(revision, key));
} catch (RocksDBException e) {
throw new MetaStorageException(OP_EXECUTION_ERR, e);
if (valueBytes == null || valueBytes.length == 0) {
return EntryImpl.empty(key);
Value lastVal = bytesToValue(valueBytes);
if (lastVal.tombstone()) {
return EntryImpl.tombstone(key, revision, lastVal.updateCounter());
return new EntryImpl(key, lastVal.bytes(), revision, lastVal.updateCounter());
* Adds an entry to the batch.
* @param batch Write batch.
* @param key Key.
* @param value Value.
* @param curRev Revision.
* @param cntr Update counter.
* @throws RocksDBException If failed.
private void addDataToBatch(WriteBatch batch, byte[] key, byte[] value, long curRev, long cntr) throws RocksDBException {
byte[] rocksKey = keyToRocksKey(curRev, key);
byte[] rocksValue = valueToBytes(value, cntr);
data.put(batch, rocksKey, rocksValue);
updatedEntries.add(entry(key, curRev, new Value(value, cntr)));
* Adds all entries to the batch.
* @param batch Write batch.
* @param keys Keys.
* @param values Values.
* @param curRev Revision.
* @return New update counter value.
* @throws RocksDBException If failed.
private long addAllToBatch(WriteBatch batch, List<byte[]> keys, List<byte[]> values, long curRev) throws RocksDBException {
long counter = this.updCntr;
for (int i = 0; i < keys.size(); i++) {
byte[] key = keys.get(i);
byte[] bytes = values.get(i);
addDataToBatch(batch, key, bytes, curRev, counter);
return counter;
* Gets last revision from the list.
* @param revs Revisions.
* @return Last revision.
private static long lastRevision(long[] revs) {
return revs[revs.length - 1];
* Adds modified entries to the watch event queue.
private void queueWatchEvent() {
if (updatedEntries.isEmpty()) {
switch (recoveryStatus.get()) {
// Watches haven't been enabled yet, no need to queue any events, they will be replayed upon recovery.
// Buffer the event while event replay is still in progress.
if (eventCache == null) {
eventCache = new ArrayList<>();
private void notifyWatches() {
UpdatedEntries copy = updatedEntries.transfer();
assert copy.ts != null;
watchProcessor.notifyWatches(copy.updatedEntries, copy.ts);
private void replayUpdates(long lowerRevision, long upperRevision) {
long minWatchRevision = Math.max(lowerRevision, watchProcessor.minWatchRevision().orElse(-1));
if (minWatchRevision == -1 || minWatchRevision > upperRevision) {
// No events to replay, we can start processing more recent events from the event queue.
var updatedEntries = new ArrayList<Entry>();
HybridTimestamp ts = null;
try (
var upperBound = new Slice(longToBytes(upperRevision + 1));
var options = new ReadOptions().setIterateUpperBound(upperBound);
RocksIterator it = data.newIterator(options)
) {;
long lastSeenRevision = minWatchRevision;
for (; it.isValid(); {
byte[] rocksKey = it.key();
byte[] rocksValue = it.value();
long revision = revisionFromRocksKey(rocksKey);
if (revision != lastSeenRevision) {
if (!updatedEntries.isEmpty()) {
var updatedEntriesCopy = List.copyOf(updatedEntries);
assert ts != null;
watchProcessor.notifyWatches(updatedEntriesCopy, ts);
ts = timestampByRevision(revision);
lastSeenRevision = revision;
if (ts == null) {
// This will only execute on first iteration.
ts = timestampByRevision(revision);
updatedEntries.add(entry(rocksKeyToBytes(rocksKey), revision, bytesToValue(rocksValue)));
// Notify about the events left after finishing the loop above.
if (!updatedEntries.isEmpty()) {
assert ts != null;
watchProcessor.notifyWatches(updatedEntries, ts);
public HybridTimestamp timestampByRevision(long revision) {
try {
byte[] tsBytes = revisionToTs.get(longToBytes(revision));
assert tsBytes != null;
return HybridTimestamp.hybridTimestamp(bytesToLong(tsBytes));
} catch (RocksDBException e) {
throw new MetaStorageException(OP_EXECUTION_ERR, e);
private void finishReplay() {
// Take the lock to drain the event cache and prevent new events from being cached. Since event notification is asynchronous,
// this lock shouldn't be held for long.
try {
if (eventCache != null) {
eventCache.forEach(entries -> {
assert entries.ts != null;
watchProcessor.notifyWatches(entries.updatedEntries, entries.ts);
eventCache = null;
} finally {
public void setRecoveryRevisionListener(@Nullable LongConsumer listener) {
try {
this.recoveryRevisionListener = listener;
} finally {
public Path getDbPath() {
return dbPath;
private static class UpdatedEntries {
private final List<Entry> updatedEntries;
private HybridTimestamp ts;
public UpdatedEntries() {
this.updatedEntries = new ArrayList<>();
private UpdatedEntries(List<Entry> updatedEntries, HybridTimestamp ts) {
this.updatedEntries = updatedEntries;
this.ts = Objects.requireNonNull(ts);
boolean isEmpty() {
return updatedEntries.isEmpty();
void add(Entry entry) {
void clear() {
ts = null;
UpdatedEntries transfer() {
assert ts != null;
UpdatedEntries transferredValue = new UpdatedEntries(new ArrayList<>(updatedEntries), ts);
return transferredValue;
public void registerRevisionUpdateListener(RevisionUpdateListener listener) {
public void unregisterRevisionUpdateListener(RevisionUpdateListener listener) {
public CompletableFuture<Void> notifyRevisionUpdateListenerOnStart(long newRevision) {
return watchProcessor.notifyUpdateRevisionListeners(newRevision);
public void advanceSafeTime(HybridTimestamp newSafeTime) {
try {
if (recoveryStatus.get() == RecoveryStatus.DONE) {
} finally {