blob: 4f485c3850a199d919529b953373f4001813510e [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.ignite.internal.rocksdb.snapshot;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Function;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.rocksdb.RocksUtils;
import org.apache.ignite.internal.util.IgniteUtils;
import org.rocksdb.EnvOptions;
import org.rocksdb.IngestExternalFileOptions;
import org.rocksdb.Options;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.Slice;
import org.rocksdb.Snapshot;
import org.rocksdb.SstFileWriter;
* Class for creating and restoring RocksDB snapshots.
public class RocksSnapshotManager {
/** Suffix for the temporary snapshot folder. */
private static final String TMP_SUFFIX = ".tmp";
private final RocksDB db;
private final Collection<ColumnFamilyRange> ranges;
private final Executor executor;
* Creates a new instance of the snapshot manager.
* This instance <b>does not</b> own any of the provided resources and will not close them.
* @param db RocksDB instance which snapshots will be managed.
* @param ranges Key ranges of Column Families that exist in the provided {@code db} instance.
* @param executor Executor which will be used for creating snapshots.
public RocksSnapshotManager(RocksDB db, Collection<ColumnFamilyRange> ranges, Executor executor) {
assert !ranges.isEmpty();
this.db = db;
this.ranges = List.copyOf(ranges);
this.executor = executor;
* Creates a snapshot of the enclosed RocksDB instance and saves it into a provided folder.
* @param snapshotDir Folder to save the snapshot into.
* @return Future that either completes successfully upon snapshot creation or signals a failure.
public CompletableFuture<Void> createSnapshot(Path snapshotDir) {
Path tmpPath = Paths.get(snapshotDir.toString() + TMP_SUFFIX);
// The snapshot reference must be taken synchronously, otherwise we might let more writes sneak into the snapshot than needed.
Snapshot snapshot = db.getSnapshot();
return CompletableFuture.supplyAsync(
() -> {
// Create futures for capturing SST snapshots of the column families
CompletableFuture<?>[] sstFutures =
.map(cf -> createSstFileAsync(cf, snapshot, tmpPath))
return CompletableFuture.allOf(sstFutures);
}, executor)
.whenCompleteAsync((ignored, e) -> {
// Snapshot is not actually closed here, because a Snapshot instance doesn't own a pointer, the
// database does. Calling close to maintain the AutoCloseable semantics
if (e != null) {
// Delete snapshot directory if it already exists
try {
// Rename the temporary directory
IgniteUtils.atomicMoveFile(tmpPath, snapshotDir, null);
} catch (IOException ex) {
throw new IgniteInternalException("Failed to rename: " + tmpPath + " to " + snapshotDir, ex);
}, executor)
.thenApply(v -> null);
* Creates a temporary directory for storing intermediate results while creating a snapshot.
* @param tmpDirPath Path to the temporary directory.
private static void createTmpSnapshotDir(Path tmpDirPath) {
try {
} catch (IOException e) {
throw new IgniteInternalException("Failed to create directory: " + tmpDirPath, e);
* Creates an SST file for the column family (async version).
* @param range Column family range.
* @param snapshot Point-in-time snapshot.
* @param snapshotDir Directory to put the SST file in.
private CompletableFuture<Void> createSstFileAsync(ColumnFamilyRange range, Snapshot snapshot, Path snapshotDir) {
return CompletableFuture.runAsync(() -> createSstFile(range, snapshot, snapshotDir), executor);
* Creates an SST file for the column family.
* @param range Column family range.
* @param snapshot Point-in-time snapshot.
* @param snapshotDir Directory to put the SST file in.
private void createSstFile(ColumnFamilyRange range, Snapshot snapshot, Path snapshotDir) {
try (
EnvOptions envOptions = new EnvOptions();
Options options = new Options().setEnv(db.getEnv());
SstFileWriter sstFileWriter = new SstFileWriter(envOptions, options);
RocksIterator it = snapshotIterator(range, snapshot)
) {
Path sstFile = snapshotDir.resolve(range.columnFamily().name());;
RocksUtils.forEach(it, sstFileWriter::put);
} catch (RocksDBException e) {
throw new IgniteInternalException("Failed to write snapshot", e);
* Creates an iterator over the provided key range.
private static RocksIterator snapshotIterator(ColumnFamilyRange range, Snapshot snapshot) {
var options = new ReadOptions().setSnapshot(snapshot);
if (range.isFullRange()) {
RocksIterator it = range.columnFamily().newIterator(options);
return it;
} else {
options.setIterateUpperBound(new Slice(range.upperBound()));
RocksIterator it = range.columnFamily().newIterator(options);;
return it;
* Restores the snapshot that was created by {@link #createSnapshot}.
* <p>This method loads the snapshot as-is, overwriting the existing keys if necessary. Most of the times storage implementations
* should manually remove all data before restoring a snapshot.
* @param snapshotDir Path to the directory where a snapshot was created.
public void restoreSnapshot(Path snapshotDir) {
try (IngestExternalFileOptions ingestOptions = new IngestExternalFileOptions()) {
for (ColumnFamilyRange range : ranges) {
Path snapshotPath = snapshotDir.resolve(range.columnFamily().name());
if (!Files.exists(snapshotPath)) {
throw new IgniteInternalException("Snapshot not found: " + snapshotPath);
range.columnFamily().ingestExternalFile(List.of(snapshotPath.toString()), ingestOptions);
} catch (RocksDBException e) {
throw new IgniteInternalException("Fail to ingest sst file at path: " + snapshotDir, e);