blob: 4f485c3850a199d919529b953373f4001813510e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.rocksdb.snapshot;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Function;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.rocksdb.RocksUtils;
import org.apache.ignite.internal.util.IgniteUtils;
import org.rocksdb.EnvOptions;
import org.rocksdb.IngestExternalFileOptions;
import org.rocksdb.Options;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.Slice;
import org.rocksdb.Snapshot;
import org.rocksdb.SstFileWriter;
/**
* Class for creating and restoring RocksDB snapshots.
*/
public class RocksSnapshotManager {
/** Suffix for the temporary snapshot folder. */
private static final String TMP_SUFFIX = ".tmp";
private final RocksDB db;
private final Collection<ColumnFamilyRange> ranges;
private final Executor executor;
/**
* Creates a new instance of the snapshot manager.
* This instance <b>does not</b> own any of the provided resources and will not close them.
*
* @param db RocksDB instance which snapshots will be managed.
* @param ranges Key ranges of Column Families that exist in the provided {@code db} instance.
* @param executor Executor which will be used for creating snapshots.
*/
public RocksSnapshotManager(RocksDB db, Collection<ColumnFamilyRange> ranges, Executor executor) {
assert !ranges.isEmpty();
this.db = db;
this.ranges = List.copyOf(ranges);
this.executor = executor;
}
/**
* Creates a snapshot of the enclosed RocksDB instance and saves it into a provided folder.
*
* @param snapshotDir Folder to save the snapshot into.
* @return Future that either completes successfully upon snapshot creation or signals a failure.
*/
public CompletableFuture<Void> createSnapshot(Path snapshotDir) {
Path tmpPath = Paths.get(snapshotDir.toString() + TMP_SUFFIX);
// The snapshot reference must be taken synchronously, otherwise we might let more writes sneak into the snapshot than needed.
Snapshot snapshot = db.getSnapshot();
return CompletableFuture.supplyAsync(
() -> {
createTmpSnapshotDir(tmpPath);
// Create futures for capturing SST snapshots of the column families
CompletableFuture<?>[] sstFutures = ranges.stream()
.map(cf -> createSstFileAsync(cf, snapshot, tmpPath))
.toArray(CompletableFuture[]::new);
return CompletableFuture.allOf(sstFutures);
}, executor)
.thenCompose(Function.identity())
.whenCompleteAsync((ignored, e) -> {
db.releaseSnapshot(snapshot);
// Snapshot is not actually closed here, because a Snapshot instance doesn't own a pointer, the
// database does. Calling close to maintain the AutoCloseable semantics
snapshot.close();
if (e != null) {
return;
}
// Delete snapshot directory if it already exists
IgniteUtils.deleteIfExists(snapshotDir);
try {
// Rename the temporary directory
IgniteUtils.atomicMoveFile(tmpPath, snapshotDir, null);
} catch (IOException ex) {
throw new IgniteInternalException("Failed to rename: " + tmpPath + " to " + snapshotDir, ex);
}
}, executor)
.thenApply(v -> null);
}
/**
* Creates a temporary directory for storing intermediate results while creating a snapshot.
*
* @param tmpDirPath Path to the temporary directory.
*/
private static void createTmpSnapshotDir(Path tmpDirPath) {
IgniteUtils.deleteIfExists(tmpDirPath);
try {
Files.createDirectories(tmpDirPath);
} catch (IOException e) {
throw new IgniteInternalException("Failed to create directory: " + tmpDirPath, e);
}
}
/**
* Creates an SST file for the column family (async version).
*
* @param range Column family range.
* @param snapshot Point-in-time snapshot.
* @param snapshotDir Directory to put the SST file in.
*/
private CompletableFuture<Void> createSstFileAsync(ColumnFamilyRange range, Snapshot snapshot, Path snapshotDir) {
return CompletableFuture.runAsync(() -> createSstFile(range, snapshot, snapshotDir), executor);
}
/**
* Creates an SST file for the column family.
*
* @param range Column family range.
* @param snapshot Point-in-time snapshot.
* @param snapshotDir Directory to put the SST file in.
*/
private void createSstFile(ColumnFamilyRange range, Snapshot snapshot, Path snapshotDir) {
try (
EnvOptions envOptions = new EnvOptions();
Options options = new Options().setEnv(db.getEnv());
SstFileWriter sstFileWriter = new SstFileWriter(envOptions, options);
RocksIterator it = snapshotIterator(range, snapshot)
) {
Path sstFile = snapshotDir.resolve(range.columnFamily().name());
sstFileWriter.open(sstFile.toString());
RocksUtils.forEach(it, sstFileWriter::put);
sstFileWriter.finish();
} catch (RocksDBException e) {
throw new IgniteInternalException("Failed to write snapshot", e);
}
}
/**
* Creates an iterator over the provided key range.
*/
private static RocksIterator snapshotIterator(ColumnFamilyRange range, Snapshot snapshot) {
var options = new ReadOptions().setSnapshot(snapshot);
if (range.isFullRange()) {
RocksIterator it = range.columnFamily().newIterator(options);
it.seekToFirst();
return it;
} else {
options.setIterateUpperBound(new Slice(range.upperBound()));
RocksIterator it = range.columnFamily().newIterator(options);
it.seek(range.lowerBound());
return it;
}
}
/**
* Restores the snapshot that was created by {@link #createSnapshot}.
*
* <p>This method loads the snapshot as-is, overwriting the existing keys if necessary. Most of the times storage implementations
* should manually remove all data before restoring a snapshot.
*
* @param snapshotDir Path to the directory where a snapshot was created.
*/
public void restoreSnapshot(Path snapshotDir) {
try (IngestExternalFileOptions ingestOptions = new IngestExternalFileOptions()) {
for (ColumnFamilyRange range : ranges) {
Path snapshotPath = snapshotDir.resolve(range.columnFamily().name());
if (!Files.exists(snapshotPath)) {
throw new IgniteInternalException("Snapshot not found: " + snapshotPath);
}
range.columnFamily().ingestExternalFile(List.of(snapshotPath.toString()), ingestOptions);
}
} catch (RocksDBException e) {
throw new IgniteInternalException("Fail to ingest sst file at path: " + snapshotDir, e);
}
}
}