blob: f7af35488e6dd245c8bab801972d93c72790b740 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.contrib.streaming.state;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.CompatibilityUtil;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.ByteArrayDataInputView;
import org.apache.flink.core.memory.ByteArrayDataOutputView;
import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources;
import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.DirectoryStateHandle;
import org.apache.flink.runtime.state.DoneFuture;
import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle;
import org.apache.flink.runtime.state.KeyExtractorFunction;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.Keyed;
import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.LocalRecoveryDirectoryProvider;
import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
import org.apache.flink.runtime.state.PriorityComparable;
import org.apache.flink.runtime.state.PriorityComparator;
import org.apache.flink.runtime.state.PriorityQueueSetFactory;
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator;
import org.apache.flink.runtime.state.SnapshotDirectory;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.SnapshotStrategy;
import org.apache.flink.runtime.state.StateHandleID;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.StateSnapshotTransformer;
import org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory;
import org.apache.flink.runtime.state.StateUtil;
import org.apache.flink.runtime.state.StreamCompressionDecorator;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
import org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ResourceGuard;
import org.apache.flink.util.StateMigrationException;
import org.apache.flink.util.function.SupplierWithException;
import org.rocksdb.Checkpoint;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.Snapshot;
import org.rocksdb.WriteOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.SortedMap;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RunnableFuture;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
/**
* An {@link AbstractKeyedStateBackend} that stores its state in {@code RocksDB} and serializes state to
* streams provided by a {@link org.apache.flink.runtime.state.CheckpointStreamFactory} upon
* checkpointing. This state backend can store very large state that exceeds memory and spills
* to disk. Except for the snapshotting, this class should be accessed as if it is not threadsafe.
*
* <p>This class follows the rules for closing/releasing native RocksDB resources as described in
+ <a href="https://github.com/facebook/rocksdb/wiki/RocksJava-Basics#opening-a-database-with-column-families">
* this document</a>.
*/
public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
private static final Logger LOG = LoggerFactory.getLogger(RocksDBKeyedStateBackend.class);
/** The name of the merge operator in RocksDB. Do not change except you know exactly what you do. */
public static final String MERGE_OPERATOR_NAME = "stringappendtest";
/** File suffix of sstable files. */
private static final String SST_FILE_SUFFIX = ".sst";
private static final Map<Class<? extends StateDescriptor>, StateFactory> STATE_FACTORIES =
Stream.of(
Tuple2.of(ValueStateDescriptor.class, (StateFactory) RocksDBValueState::create),
Tuple2.of(ListStateDescriptor.class, (StateFactory) RocksDBListState::create),
Tuple2.of(MapStateDescriptor.class, (StateFactory) RocksDBMapState::create),
Tuple2.of(AggregatingStateDescriptor.class, (StateFactory) RocksDBAggregatingState::create),
Tuple2.of(ReducingStateDescriptor.class, (StateFactory) RocksDBReducingState::create),
Tuple2.of(FoldingStateDescriptor.class, (StateFactory) RocksDBFoldingState::create)
).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
private interface StateFactory {
<K, N, SV, S extends State, IS extends S> IS createState(
StateDescriptor<S, SV> stateDesc,
Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> registerResult,
RocksDBKeyedStateBackend<K> backend) throws Exception;
}
/** String that identifies the operator that owns this backend. */
private final String operatorIdentifier;
/** The column family options from the options factory. */
private final ColumnFamilyOptions columnOptions;
/** The DB options from the options factory. */
private final DBOptions dbOptions;
/** Path where this configured instance stores its data directory. */
private final File instanceBasePath;
/** Path where this configured instance stores its RocksDB database. */
private final File instanceRocksDBPath;
/**
* Protects access to RocksDB in other threads, like the checkpointing thread from parallel call that disposes the
* RocksDb object.
*/
private final ResourceGuard rocksDBResourceGuard;
/**
* Our RocksDB database, this is used by the actual subclasses of {@link AbstractRocksDBState}
* to store state. The different k/v states that we have don't each have their own RocksDB
* instance. They all write to this instance but to their own column family.
*/
protected RocksDB db;
/**
* We are not using the default column family for Flink state ops, but we still need to remember this handle so that
* we can close it properly when the backend is closed. This is required by RocksDB's native memory management.
*/
private ColumnFamilyHandle defaultColumnFamily;
/**
* The write options to use in the states. We disable write ahead logging.
*/
private final WriteOptions writeOptions;
/**
* Information about the k/v states as we create them. This is used to retrieve the
* column family that is used for a state and also for sanity checks when restoring.
*/
private final Map<String, Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase>> kvStateInformation;
/**
* Map of state names to their corresponding restored state meta info.
*
* <p>TODO this map can be removed when eager-state registration is in place.
* TODO we currently need this cached to check state migration strategies when new serializers are registered.
*/
private final Map<String, StateMetaInfoSnapshot> restoredKvStateMetaInfos;
/** Number of bytes required to prefix the key groups. */
private final int keyGroupPrefixBytes;
/** True if incremental checkpointing is enabled. */
private final boolean enableIncrementalCheckpointing;
/** The state handle ids of all sst files materialized in snapshots for previous checkpoints. */
private final SortedMap<Long, Set<StateHandleID>> materializedSstFiles;
/** The identifier of the last completed checkpoint. */
private long lastCompletedCheckpointId = -1L;
/** Unique ID of this backend. */
private UUID backendUID;
/** The configuration of local recovery. */
private final LocalRecoveryConfig localRecoveryConfig;
/** The snapshot strategy, e.g., if we use full or incremental checkpoints, local state, and so on. */
private final SnapshotStrategy<SnapshotResult<KeyedStateHandle>> snapshotStrategy;
/** Factory for priority queue state. */
private final PriorityQueueSetFactory priorityQueueFactory;
/** Shared wrapper for batch writes to the RocksDB instance. */
private RocksDBWriteBatchWrapper writeBatchWrapper;
public RocksDBKeyedStateBackend(
String operatorIdentifier,
ClassLoader userCodeClassLoader,
File instanceBasePath,
DBOptions dbOptions,
ColumnFamilyOptions columnFamilyOptions,
TaskKvStateRegistry kvStateRegistry,
TypeSerializer<K> keySerializer,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
ExecutionConfig executionConfig,
boolean enableIncrementalCheckpointing,
LocalRecoveryConfig localRecoveryConfig,
RocksDBStateBackend.PriorityQueueStateType priorityQueueStateType,
TtlTimeProvider ttlTimeProvider
) throws IOException {
super(kvStateRegistry, keySerializer, userCodeClassLoader,
numberOfKeyGroups, keyGroupRange, executionConfig, ttlTimeProvider);
this.operatorIdentifier = Preconditions.checkNotNull(operatorIdentifier);
this.enableIncrementalCheckpointing = enableIncrementalCheckpointing;
this.rocksDBResourceGuard = new ResourceGuard();
// ensure that we use the right merge operator, because other code relies on this
this.columnOptions = Preconditions.checkNotNull(columnFamilyOptions)
.setMergeOperatorName(MERGE_OPERATOR_NAME);
this.dbOptions = Preconditions.checkNotNull(dbOptions);
this.instanceBasePath = Preconditions.checkNotNull(instanceBasePath);
this.instanceRocksDBPath = new File(instanceBasePath, "db");
checkAndCreateDirectory(instanceBasePath);
if (instanceRocksDBPath.exists()) {
// Clear the base directory when the backend is created
// in case something crashed and the backend never reached dispose()
cleanInstanceBasePath();
}
this.localRecoveryConfig = Preconditions.checkNotNull(localRecoveryConfig);
this.keyGroupPrefixBytes =
RocksDBKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix(getNumberOfKeyGroups());
this.kvStateInformation = new LinkedHashMap<>();
this.restoredKvStateMetaInfos = new HashMap<>();
this.materializedSstFiles = new TreeMap<>();
this.backendUID = UUID.randomUUID();
this.snapshotStrategy = enableIncrementalCheckpointing ?
new IncrementalSnapshotStrategy() :
new FullSnapshotStrategy();
this.writeOptions = new WriteOptions().setDisableWAL(true);
switch (priorityQueueStateType) {
case HEAP:
this.priorityQueueFactory = new HeapPriorityQueueSetFactory(keyGroupRange, numberOfKeyGroups, 128);
break;
case ROCKSDB:
this.priorityQueueFactory = new RocksDBPriorityQueueSetFactory();
break;
default:
throw new IllegalArgumentException("Unknown priority queue state type: " + priorityQueueStateType);
}
LOG.debug("Setting initial keyed backend uid for operator {} to {}.", this.operatorIdentifier, this.backendUID);
}
private static void checkAndCreateDirectory(File directory) throws IOException {
if (directory.exists()) {
if (!directory.isDirectory()) {
throw new IOException("Not a directory: " + directory);
}
} else {
if (!directory.mkdirs()) {
throw new IOException(
String.format("Could not create RocksDB data directory at %s.", directory));
}
}
}
@SuppressWarnings("unchecked")
@Override
public <N> Stream<K> getKeys(String state, N namespace) {
Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> columnInfo = kvStateInformation.get(state);
if (columnInfo == null || !(columnInfo.f1 instanceof RegisteredKeyValueStateBackendMetaInfo)) {
return Stream.empty();
}
RegisteredKeyValueStateBackendMetaInfo<N, ?> registeredKeyValueStateBackendMetaInfo =
(RegisteredKeyValueStateBackendMetaInfo<N, ?>) columnInfo.f1;
final TypeSerializer<N> namespaceSerializer = registeredKeyValueStateBackendMetaInfo.getNamespaceSerializer();
final ByteArrayOutputStreamWithPos namespaceOutputStream = new ByteArrayOutputStreamWithPos(8);
boolean ambiguousKeyPossible = RocksDBKeySerializationUtils.isAmbiguousKeyPossible(keySerializer, namespaceSerializer);
final byte[] nameSpaceBytes;
try {
RocksDBKeySerializationUtils.writeNameSpace(
namespace,
namespaceSerializer,
namespaceOutputStream,
new DataOutputViewStreamWrapper(namespaceOutputStream),
ambiguousKeyPossible);
nameSpaceBytes = namespaceOutputStream.toByteArray();
} catch (IOException ex) {
throw new FlinkRuntimeException("Failed to get keys from RocksDB state backend.", ex);
}
RocksIteratorWrapper iterator = getRocksIterator(db, columnInfo.f0);
iterator.seekToFirst();
final RocksIteratorForKeysWrapper<K> iteratorWrapper = new RocksIteratorForKeysWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes,
ambiguousKeyPossible, nameSpaceBytes);
Stream<K> targetStream = StreamSupport.stream(Spliterators.spliteratorUnknownSize(iteratorWrapper, Spliterator.ORDERED), false);
return targetStream.onClose(iteratorWrapper::close);
}
@VisibleForTesting
ColumnFamilyHandle getColumnFamilyHandle(String state) {
Tuple2<ColumnFamilyHandle, ?> columnInfo = kvStateInformation.get(state);
return columnInfo != null ? columnInfo.f0 : null;
}
/**
* Should only be called by one thread, and only after all accesses to the DB happened.
*/
@Override
public void dispose() {
super.dispose();
// This call will block until all clients that still acquire access to the RocksDB instance have released it,
// so that we cannot release the native resources while clients are still working with it in parallel.
rocksDBResourceGuard.close();
// IMPORTANT: null reference to signal potential async checkpoint workers that the db was disposed, as
// working on the disposed object results in SEGFAULTS.
if (db != null) {
IOUtils.closeQuietly(writeBatchWrapper);
// RocksDB's native memory management requires that *all* CFs (including default) are closed before the
// DB is closed. See:
// https://github.com/facebook/rocksdb/wiki/RocksJava-Basics#opening-a-database-with-column-families
// Start with default CF ...
IOUtils.closeQuietly(defaultColumnFamily);
// ... continue with the ones created by Flink...
for (Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> columnMetaData :
kvStateInformation.values()) {
IOUtils.closeQuietly(columnMetaData.f0);
}
// ... and finally close the DB instance ...
IOUtils.closeQuietly(db);
// invalidate the reference
db = null;
IOUtils.closeQuietly(columnOptions);
IOUtils.closeQuietly(dbOptions);
IOUtils.closeQuietly(writeOptions);
kvStateInformation.clear();
restoredKvStateMetaInfos.clear();
cleanInstanceBasePath();
}
}
@Nonnull
@Override
public <T extends HeapPriorityQueueElement & PriorityComparable & Keyed> KeyGroupedInternalPriorityQueue<T>
create(
@Nonnull String stateName,
@Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
return priorityQueueFactory.create(stateName, byteOrderedElementSerializer);
}
private void cleanInstanceBasePath() {
LOG.info("Deleting existing instance base directory {}.", instanceBasePath);
try {
FileUtils.deleteDirectory(instanceBasePath);
} catch (IOException ex) {
LOG.warn("Could not delete instance base path for RocksDB: " + instanceBasePath, ex);
}
}
public int getKeyGroupPrefixBytes() {
return keyGroupPrefixBytes;
}
@VisibleForTesting
PriorityQueueSetFactory getPriorityQueueFactory() {
return priorityQueueFactory;
}
public WriteOptions getWriteOptions() {
return writeOptions;
}
/**
* Triggers an asynchronous snapshot of the keyed state backend from RocksDB. This snapshot can be canceled and
* is also stopped when the backend is closed through {@link #dispose()}. For each backend, this method must always
* be called by the same thread.
*
* @param checkpointId The Id of the checkpoint.
* @param timestamp The timestamp of the checkpoint.
* @param streamFactory The factory that we can use for writing our state to streams.
* @param checkpointOptions Options for how to perform this checkpoint.
* @return Future to the state handle of the snapshot data.
* @throws Exception indicating a problem in the synchronous part of the checkpoint.
*/
@Override
public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(
final long checkpointId,
final long timestamp,
final CheckpointStreamFactory streamFactory,
CheckpointOptions checkpointOptions) throws Exception {
// flush everything into db before taking a snapshot
writeBatchWrapper.flush();
return snapshotStrategy.performSnapshot(checkpointId, timestamp, streamFactory, checkpointOptions);
}
@Override
public void restore(Collection<KeyedStateHandle> restoreState) throws Exception {
LOG.info("Initializing RocksDB keyed state backend.");
if (LOG.isDebugEnabled()) {
LOG.debug("Restoring snapshot from state handles: {}.", restoreState);
}
// clear all meta data
kvStateInformation.clear();
restoredKvStateMetaInfos.clear();
try {
if (restoreState == null || restoreState.isEmpty()) {
createDB();
} else {
KeyedStateHandle firstStateHandle = restoreState.iterator().next();
if (firstStateHandle instanceof IncrementalKeyedStateHandle
|| firstStateHandle instanceof IncrementalLocalKeyedStateHandle) {
RocksDBIncrementalRestoreOperation<K> restoreOperation = new RocksDBIncrementalRestoreOperation<>(this);
restoreOperation.restore(restoreState);
} else {
RocksDBFullRestoreOperation<K> restoreOperation = new RocksDBFullRestoreOperation<>(this);
restoreOperation.doRestore(restoreState);
}
}
} catch (Exception ex) {
dispose();
throw ex;
}
}
@Override
public void notifyCheckpointComplete(long completedCheckpointId) {
if (!enableIncrementalCheckpointing) {
return;
}
synchronized (materializedSstFiles) {
if (completedCheckpointId < lastCompletedCheckpointId) {
return;
}
materializedSstFiles.keySet().removeIf(checkpointId -> checkpointId < completedCheckpointId);
lastCompletedCheckpointId = completedCheckpointId;
}
}
private void createDB() throws IOException {
List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(1);
this.db = openDB(instanceRocksDBPath.getAbsolutePath(), Collections.emptyList(), columnFamilyHandles);
this.writeBatchWrapper = new RocksDBWriteBatchWrapper(db, writeOptions);
this.defaultColumnFamily = columnFamilyHandles.get(0);
}
private RocksDB openDB(
String path,
List<ColumnFamilyDescriptor> stateColumnFamilyDescriptors,
List<ColumnFamilyHandle> stateColumnFamilyHandles) throws IOException {
List<ColumnFamilyDescriptor> columnFamilyDescriptors =
new ArrayList<>(1 + stateColumnFamilyDescriptors.size());
// we add the required descriptor for the default CF in FIRST position, see
// https://github.com/facebook/rocksdb/wiki/RocksJava-Basics#opening-a-database-with-column-families
columnFamilyDescriptors.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnOptions));
columnFamilyDescriptors.addAll(stateColumnFamilyDescriptors);
RocksDB dbRef;
try {
dbRef = RocksDB.open(
Preconditions.checkNotNull(dbOptions),
Preconditions.checkNotNull(path),
columnFamilyDescriptors,
stateColumnFamilyHandles);
} catch (RocksDBException e) {
throw new IOException("Error while opening RocksDB instance.", e);
}
// requested + default CF
Preconditions.checkState(1 + stateColumnFamilyDescriptors.size() == stateColumnFamilyHandles.size(),
"Not all requested column family handles have been created");
return dbRef;
}
/**
* Encapsulates the process of restoring a RocksDBKeyedStateBackend from a full snapshot.
*/
private static final class RocksDBFullRestoreOperation<K> {
private final RocksDBKeyedStateBackend<K> rocksDBKeyedStateBackend;
/** Current key-groups state handle from which we restore key-groups. */
private KeyGroupsStateHandle currentKeyGroupsStateHandle;
/** Current input stream we obtained from currentKeyGroupsStateHandle. */
private FSDataInputStream currentStateHandleInStream;
/** Current data input view that wraps currentStateHandleInStream. */
private DataInputView currentStateHandleInView;
/** Current list of ColumnFamilyHandles for all column families we restore from currentKeyGroupsStateHandle. */
private List<ColumnFamilyHandle> currentStateHandleKVStateColumnFamilies;
/** The compression decorator that was used for writing the state, as determined by the meta data. */
private StreamCompressionDecorator keygroupStreamCompressionDecorator;
/**
* Creates a restore operation object for the given state backend instance.
*
* @param rocksDBKeyedStateBackend the state backend into which we restore
*/
public RocksDBFullRestoreOperation(RocksDBKeyedStateBackend<K> rocksDBKeyedStateBackend) {
this.rocksDBKeyedStateBackend = Preconditions.checkNotNull(rocksDBKeyedStateBackend);
}
/**
* Restores all key-groups data that is referenced by the passed state handles.
*
* @param keyedStateHandles List of all key groups state handles that shall be restored.
*/
public void doRestore(Collection<KeyedStateHandle> keyedStateHandles)
throws IOException, StateMigrationException, RocksDBException {
rocksDBKeyedStateBackend.createDB();
for (KeyedStateHandle keyedStateHandle : keyedStateHandles) {
if (keyedStateHandle != null) {
if (!(keyedStateHandle instanceof KeyGroupsStateHandle)) {
throw new IllegalStateException("Unexpected state handle type, " +
"expected: " + KeyGroupsStateHandle.class +
", but found: " + keyedStateHandle.getClass());
}
this.currentKeyGroupsStateHandle = (KeyGroupsStateHandle) keyedStateHandle;
restoreKeyGroupsInStateHandle();
}
}
}
/**
* Restore one key groups state handle.
*/
private void restoreKeyGroupsInStateHandle()
throws IOException, StateMigrationException, RocksDBException {
try {
currentStateHandleInStream = currentKeyGroupsStateHandle.openInputStream();
rocksDBKeyedStateBackend.cancelStreamRegistry.registerCloseable(currentStateHandleInStream);
currentStateHandleInView = new DataInputViewStreamWrapper(currentStateHandleInStream);
restoreKVStateMetaData();
restoreKVStateData();
} finally {
if (rocksDBKeyedStateBackend.cancelStreamRegistry.unregisterCloseable(currentStateHandleInStream)) {
IOUtils.closeQuietly(currentStateHandleInStream);
}
}
}
/**
* Restore the KV-state / ColumnFamily meta data for all key-groups referenced by the current state handle.
*
* @throws IOException
* @throws ClassNotFoundException
* @throws RocksDBException
*/
private void restoreKVStateMetaData() throws IOException, StateMigrationException, RocksDBException {
// isSerializerPresenceRequired flag is set to false, since for the RocksDB state backend,
// deserialization of state happens lazily during runtime; we depend on the fact
// that the new serializer for states could be compatible, and therefore the restore can continue
// without old serializers required to be present.
KeyedBackendSerializationProxy<K> serializationProxy =
new KeyedBackendSerializationProxy<>(rocksDBKeyedStateBackend.userCodeClassLoader, false);
serializationProxy.read(currentStateHandleInView);
// check for key serializer compatibility; this also reconfigures the
// key serializer to be compatible, if it is required and is possible
if (CompatibilityUtil.resolveCompatibilityResult(
serializationProxy.getKeySerializer(),
UnloadableDummyTypeSerializer.class,
serializationProxy.getKeySerializerConfigSnapshot(),
rocksDBKeyedStateBackend.keySerializer)
.isRequiresMigration()) {
// TODO replace with state migration; note that key hash codes need to remain the same after migration
throw new StateMigrationException("The new key serializer is not compatible to read previous keys. " +
"Aborting now since state migration is currently not available");
}
this.keygroupStreamCompressionDecorator = serializationProxy.isUsingKeyGroupCompression() ?
SnappyStreamCompressionDecorator.INSTANCE : UncompressedStreamCompressionDecorator.INSTANCE;
List<StateMetaInfoSnapshot> restoredMetaInfos =
serializationProxy.getStateMetaInfoSnapshots();
currentStateHandleKVStateColumnFamilies = new ArrayList<>(restoredMetaInfos.size());
for (StateMetaInfoSnapshot restoredMetaInfo : restoredMetaInfos) {
Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> registeredColumn =
rocksDBKeyedStateBackend.kvStateInformation.get(restoredMetaInfo.getName());
if (registeredColumn == null) {
byte[] nameBytes = restoredMetaInfo.getName().getBytes(ConfigConstants.DEFAULT_CHARSET);
ColumnFamilyDescriptor columnFamilyDescriptor = new ColumnFamilyDescriptor(
nameBytes,
rocksDBKeyedStateBackend.columnOptions);
RegisteredStateMetaInfoBase stateMetaInfo =
RegisteredStateMetaInfoBase.fromMetaInfoSnapshot(restoredMetaInfo);
rocksDBKeyedStateBackend.restoredKvStateMetaInfos.put(restoredMetaInfo.getName(), restoredMetaInfo);
ColumnFamilyHandle columnFamily = rocksDBKeyedStateBackend.db.createColumnFamily(columnFamilyDescriptor);
registeredColumn = new Tuple2<>(columnFamily, stateMetaInfo);
rocksDBKeyedStateBackend.kvStateInformation.put(stateMetaInfo.getName(), registeredColumn);
} else {
// TODO with eager state registration in place, check here for serializer migration strategies
}
currentStateHandleKVStateColumnFamilies.add(registeredColumn.f0);
}
}
/**
* Restore the KV-state / ColumnFamily data for all key-groups referenced by the current state handle.
*
* @throws IOException
* @throws RocksDBException
*/
private void restoreKVStateData() throws IOException, RocksDBException {
//for all key-groups in the current state handle...
try (RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(rocksDBKeyedStateBackend.db)) {
for (Tuple2<Integer, Long> keyGroupOffset : currentKeyGroupsStateHandle.getGroupRangeOffsets()) {
int keyGroup = keyGroupOffset.f0;
// Check that restored key groups all belong to the backend
Preconditions.checkState(rocksDBKeyedStateBackend.getKeyGroupRange().contains(keyGroup),
"The key group must belong to the backend");
long offset = keyGroupOffset.f1;
//not empty key-group?
if (0L != offset) {
currentStateHandleInStream.seek(offset);
try (InputStream compressedKgIn = keygroupStreamCompressionDecorator.decorateWithCompression(currentStateHandleInStream)) {
DataInputViewStreamWrapper compressedKgInputView = new DataInputViewStreamWrapper(compressedKgIn);
//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
int kvStateId = compressedKgInputView.readShort();
ColumnFamilyHandle handle = currentStateHandleKVStateColumnFamilies.get(kvStateId);
//insert all k/v pairs into DB
boolean keyGroupHasMoreKeys = true;
while (keyGroupHasMoreKeys) {
byte[] key = BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
byte[] value = BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
if (RocksDBFullSnapshotOperation.hasMetaDataFollowsFlag(key)) {
//clear the signal bit in the key to make it ready for insertion again
RocksDBFullSnapshotOperation.clearMetaDataFollowsFlag(key);
writeBatchWrapper.put(handle, key, value);
//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
kvStateId = RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK
& compressedKgInputView.readShort();
if (RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK == kvStateId) {
keyGroupHasMoreKeys = false;
} else {
handle = currentStateHandleKVStateColumnFamilies.get(kvStateId);
}
} else {
writeBatchWrapper.put(handle, key, value);
}
}
}
}
}
}
}
}
/**
* Encapsulates the process of restoring a RocksDBKeyedStateBackend from an incremental snapshot.
*/
private static class RocksDBIncrementalRestoreOperation<T> {
private final RocksDBKeyedStateBackend<T> stateBackend;
private RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend<T> stateBackend) {
this.stateBackend = stateBackend;
}
/**
* Root method that branches for different implementations of {@link KeyedStateHandle}.
*/
void restore(Collection<KeyedStateHandle> restoreStateHandles) throws Exception {
if (restoreStateHandles.isEmpty()) {
return;
}
final KeyedStateHandle theFirstStateHandle = restoreStateHandles.iterator().next();
boolean isRescaling = (restoreStateHandles.size() > 1 ||
!Objects.equals(theFirstStateHandle.getKeyGroupRange(), stateBackend.keyGroupRange));
if (!isRescaling) {
restoreWithoutRescaling(theFirstStateHandle);
} else {
restoreWithRescaling(restoreStateHandles);
}
}
/**
* Recovery from a single remote incremental state without rescaling.
*/
void restoreWithoutRescaling(KeyedStateHandle rawStateHandle) throws Exception {
IncrementalLocalKeyedStateHandle localKeyedStateHandle;
List<StateMetaInfoSnapshot> stateMetaInfoSnapshots;
List<ColumnFamilyDescriptor> columnFamilyDescriptors;
// Recovery from remote incremental state.
Path temporaryRestoreInstancePath = new Path(
stateBackend.instanceBasePath.getAbsolutePath(),
UUID.randomUUID().toString());
try {
if (rawStateHandle instanceof IncrementalKeyedStateHandle) {
IncrementalKeyedStateHandle restoreStateHandle = (IncrementalKeyedStateHandle) rawStateHandle;
// read state data.
transferAllStateDataToDirectory(restoreStateHandle, temporaryRestoreInstancePath);
stateMetaInfoSnapshots = readMetaData(restoreStateHandle.getMetaStateHandle());
columnFamilyDescriptors = createAndRegisterColumnFamilyDescriptors(stateMetaInfoSnapshots);
// since we transferred all remote state to a local directory, we can use the same code as for
// local recovery.
localKeyedStateHandle = new IncrementalLocalKeyedStateHandle(
restoreStateHandle.getBackendIdentifier(),
restoreStateHandle.getCheckpointId(),
new DirectoryStateHandle(temporaryRestoreInstancePath),
restoreStateHandle.getKeyGroupRange(),
restoreStateHandle.getMetaStateHandle(),
restoreStateHandle.getSharedState().keySet());
} else if (rawStateHandle instanceof IncrementalLocalKeyedStateHandle) {
// Recovery from local incremental state.
localKeyedStateHandle = (IncrementalLocalKeyedStateHandle) rawStateHandle;
stateMetaInfoSnapshots = readMetaData(localKeyedStateHandle.getMetaDataState());
columnFamilyDescriptors = createAndRegisterColumnFamilyDescriptors(stateMetaInfoSnapshots);
} else {
throw new IllegalStateException("Unexpected state handle type, " +
"expected " + IncrementalKeyedStateHandle.class + " or " + IncrementalLocalKeyedStateHandle.class +
", but found " + rawStateHandle.getClass());
}
restoreLocalStateIntoFullInstance(
localKeyedStateHandle,
columnFamilyDescriptors,
stateMetaInfoSnapshots);
} finally {
FileSystem restoreFileSystem = temporaryRestoreInstancePath.getFileSystem();
if (restoreFileSystem.exists(temporaryRestoreInstancePath)) {
restoreFileSystem.delete(temporaryRestoreInstancePath, true);
}
}
}
/**
* Recovery from multi incremental states with rescaling. For rescaling, this method creates a temporary
* RocksDB instance for a key-groups shard. All contents from the temporary instance are copied into the
* real restore instance and then the temporary instance is discarded.
*/
void restoreWithRescaling(Collection<KeyedStateHandle> restoreStateHandles) throws Exception {
initTargetDB(restoreStateHandles, stateBackend.keyGroupRange);
byte[] startKeyGroupPrefixBytes = new byte[stateBackend.keyGroupPrefixBytes];
RocksDBKeySerializationUtils.serializeKeyGroup(stateBackend.getKeyGroupRange().getStartKeyGroup(), startKeyGroupPrefixBytes);
byte[] stopKeyGroupPrefixBytes = new byte[stateBackend.keyGroupPrefixBytes];
RocksDBKeySerializationUtils.serializeKeyGroup(stateBackend.getKeyGroupRange().getEndKeyGroup() + 1, stopKeyGroupPrefixBytes);
for (KeyedStateHandle rawStateHandle : restoreStateHandles) {
if (!(rawStateHandle instanceof IncrementalKeyedStateHandle)) {
throw new IllegalStateException("Unexpected state handle type, " +
"expected " + IncrementalKeyedStateHandle.class +
", but found " + rawStateHandle.getClass());
}
Path temporaryRestoreInstancePath = new Path(stateBackend.instanceBasePath.getAbsolutePath() + UUID.randomUUID().toString());
try (RestoredDBInstance tmpRestoreDBInfo = restoreDBInstanceFromStateHandle(
(IncrementalKeyedStateHandle) rawStateHandle,
temporaryRestoreInstancePath);
RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(stateBackend.db)) {
List<ColumnFamilyDescriptor> tmpColumnFamilyDescriptors = tmpRestoreDBInfo.columnFamilyDescriptors;
List<ColumnFamilyHandle> tmpColumnFamilyHandles = tmpRestoreDBInfo.columnFamilyHandles;
// iterating only the requested descriptors automatically skips the default column family handle
for (int i = 0; i < tmpColumnFamilyDescriptors.size(); ++i) {
ColumnFamilyHandle tmpColumnFamilyHandle = tmpColumnFamilyHandles.get(i);
ColumnFamilyDescriptor tmpColumnFamilyDescriptor = tmpColumnFamilyDescriptors.get(i);
ColumnFamilyHandle targetColumnFamilyHandle = getOrRegisterColumnFamilyHandle(
tmpColumnFamilyDescriptor, null, tmpRestoreDBInfo.stateMetaInfoSnapshots.get(i));
try (RocksIteratorWrapper iterator = getRocksIterator(tmpRestoreDBInfo.db, tmpColumnFamilyHandle)) {
iterator.seek(startKeyGroupPrefixBytes);
while (iterator.isValid()) {
if (RocksDBIncrementalCheckpointUtils.beforeThePrefixBytes(iterator.key(), stopKeyGroupPrefixBytes)) {
writeBatchWrapper.put(targetColumnFamilyHandle, iterator.key(), iterator.value());
} else {
// Since the iterator will visit the record according to the sorted order,
// we can just break here.
break;
}
iterator.next();
}
} // releases native iterator resources
}
} finally {
FileSystem restoreFileSystem = temporaryRestoreInstancePath.getFileSystem();
if (restoreFileSystem.exists(temporaryRestoreInstancePath)) {
restoreFileSystem.delete(temporaryRestoreInstancePath, true);
}
}
}
}
private class RestoredDBInstance implements AutoCloseable {
@Nonnull
private final RocksDB db;
@Nonnull
private final ColumnFamilyHandle defaultColumnFamilyHandle;
@Nonnull
private final List<ColumnFamilyHandle> columnFamilyHandles;
@Nonnull
private final List<ColumnFamilyDescriptor> columnFamilyDescriptors;
@Nonnull
private final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots;
RestoredDBInstance(
@Nonnull RocksDB db,
@Nonnull List<ColumnFamilyHandle> columnFamilyHandles,
@Nonnull List<ColumnFamilyDescriptor> columnFamilyDescriptors,
@Nonnull List<StateMetaInfoSnapshot> stateMetaInfoSnapshots) {
this.db = db;
this.columnFamilyHandles = columnFamilyHandles;
this.defaultColumnFamilyHandle = this.columnFamilyHandles.remove(0);
this.columnFamilyDescriptors = columnFamilyDescriptors;
this.stateMetaInfoSnapshots = stateMetaInfoSnapshots;
}
@Override
public void close() {
IOUtils.closeQuietly(defaultColumnFamilyHandle);
for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) {
IOUtils.closeQuietly(columnFamilyHandle);
}
IOUtils.closeQuietly(db);
}
}
private RestoredDBInstance restoreDBInstanceFromStateHandle(
IncrementalKeyedStateHandle restoreStateHandle,
Path temporaryRestoreInstancePath) throws Exception {
transferAllStateDataToDirectory(restoreStateHandle, temporaryRestoreInstancePath);
// read meta data
List<StateMetaInfoSnapshot> stateMetaInfoSnapshots =
readMetaData(restoreStateHandle.getMetaStateHandle());
List<ColumnFamilyDescriptor> columnFamilyDescriptors =
createAndRegisterColumnFamilyDescriptors(stateMetaInfoSnapshots);
List<ColumnFamilyHandle> columnFamilyHandles =
new ArrayList<>(stateMetaInfoSnapshots.size() + 1);
RocksDB restoreDb = stateBackend.openDB(
temporaryRestoreInstancePath.getPath(),
columnFamilyDescriptors,
columnFamilyHandles);
return new RestoredDBInstance(restoreDb, columnFamilyHandles, columnFamilyDescriptors, stateMetaInfoSnapshots);
}
private ColumnFamilyHandle getOrRegisterColumnFamilyHandle(
ColumnFamilyDescriptor columnFamilyDescriptor,
ColumnFamilyHandle columnFamilyHandle,
StateMetaInfoSnapshot stateMetaInfoSnapshot) throws RocksDBException {
Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> registeredStateMetaInfoEntry =
stateBackend.kvStateInformation.get(stateMetaInfoSnapshot.getName());
if (null == registeredStateMetaInfoEntry) {
RegisteredStateMetaInfoBase stateMetaInfo =
RegisteredStateMetaInfoBase.fromMetaInfoSnapshot(stateMetaInfoSnapshot);
registeredStateMetaInfoEntry =
new Tuple2<>(
columnFamilyHandle != null ? columnFamilyHandle : stateBackend.db.createColumnFamily(columnFamilyDescriptor),
stateMetaInfo);
stateBackend.kvStateInformation.put(
stateMetaInfoSnapshot.getName(),
registeredStateMetaInfoEntry);
}
return registeredStateMetaInfoEntry.f0;
}
/**
* This method first try to find a initial handle to init the target db, if the initial handle
* is not null, we just init the target db with the handle and clip it with the target key-group
* range. If the initial handle is null we create a empty db as the target db.
*/
private void initTargetDB(
Collection<KeyedStateHandle> restoreStateHandles,
KeyGroupRange targetKeyGroupRange) throws Exception {
IncrementalKeyedStateHandle initialHandle = (IncrementalKeyedStateHandle) RocksDBIncrementalCheckpointUtils.chooseTheBestStateHandleForInitial(
restoreStateHandles, targetKeyGroupRange);
if (initialHandle != null) {
restoreStateHandles.remove(initialHandle);
RestoredDBInstance restoreDBInfo = null;
Path instancePath = new Path(stateBackend.instanceRocksDBPath.getAbsolutePath());
try {
restoreDBInfo = restoreDBInstanceFromStateHandle(
initialHandle,
instancePath);
RocksDBIncrementalCheckpointUtils.clipDBWithKeyGroupRange(
restoreDBInfo.db,
restoreDBInfo.columnFamilyHandles,
targetKeyGroupRange,
initialHandle.getKeyGroupRange(),
stateBackend.keyGroupPrefixBytes);
stateBackend.db = restoreDBInfo.db;
stateBackend.defaultColumnFamily = restoreDBInfo.defaultColumnFamilyHandle;
stateBackend.writeBatchWrapper =
new RocksDBWriteBatchWrapper(stateBackend.db, stateBackend.writeOptions);
for (int i = 0; i < restoreDBInfo.stateMetaInfoSnapshots.size(); ++i) {
getOrRegisterColumnFamilyHandle(
restoreDBInfo.columnFamilyDescriptors.get(i),
restoreDBInfo.columnFamilyHandles.get(i),
restoreDBInfo.stateMetaInfoSnapshots.get(i));
}
} catch (Exception e) {
if (restoreDBInfo != null) {
restoreDBInfo.close();
}
FileSystem restoreFileSystem = instancePath.getFileSystem();
if (restoreFileSystem.exists(instancePath)) {
restoreFileSystem.delete(instancePath, true);
}
throw e;
}
} else {
List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(1);
stateBackend.db = stateBackend.openDB(
stateBackend.instanceRocksDBPath.getAbsolutePath(),
Collections.emptyList(),
columnFamilyHandles);
stateBackend.defaultColumnFamily = columnFamilyHandles.get(0);
stateBackend.writeBatchWrapper =
new RocksDBWriteBatchWrapper(stateBackend.db, stateBackend.writeOptions);
}
}
/**
* This method recreates and registers all {@link ColumnFamilyDescriptor} from Flink's state meta data snapshot.
*/
private List<ColumnFamilyDescriptor> createAndRegisterColumnFamilyDescriptors(
List<StateMetaInfoSnapshot> stateMetaInfoSnapshots) {
List<ColumnFamilyDescriptor> columnFamilyDescriptors =
new ArrayList<>(stateMetaInfoSnapshots.size());
for (StateMetaInfoSnapshot stateMetaInfoSnapshot : stateMetaInfoSnapshots) {
ColumnFamilyDescriptor columnFamilyDescriptor = new ColumnFamilyDescriptor(
stateMetaInfoSnapshot.getName().getBytes(ConfigConstants.DEFAULT_CHARSET),
stateBackend.columnOptions);
columnFamilyDescriptors.add(columnFamilyDescriptor);
stateBackend.restoredKvStateMetaInfos.put(stateMetaInfoSnapshot.getName(), stateMetaInfoSnapshot);
}
return columnFamilyDescriptors;
}
/**
* This method implements the core of the restore logic that unifies how local and remote state are recovered.
*/
private void restoreLocalStateIntoFullInstance(
IncrementalLocalKeyedStateHandle restoreStateHandle,
List<ColumnFamilyDescriptor> columnFamilyDescriptors,
List<StateMetaInfoSnapshot> stateMetaInfoSnapshots) throws Exception {
// pick up again the old backend id, so the we can reference existing state
stateBackend.backendUID = restoreStateHandle.getBackendIdentifier();
LOG.debug("Restoring keyed backend uid in operator {} from incremental snapshot to {}.",
stateBackend.operatorIdentifier, stateBackend.backendUID);
// create hard links in the instance directory
if (!stateBackend.instanceRocksDBPath.mkdirs()) {
throw new IOException("Could not create RocksDB data directory.");
}
Path restoreSourcePath = restoreStateHandle.getDirectoryStateHandle().getDirectory();
restoreInstanceDirectoryFromPath(restoreSourcePath);
List<ColumnFamilyHandle> columnFamilyHandles =
new ArrayList<>(1 + columnFamilyDescriptors.size());
stateBackend.db = stateBackend.openDB(
stateBackend.instanceRocksDBPath.getAbsolutePath(),
columnFamilyDescriptors, columnFamilyHandles);
// extract and store the default column family which is located at the first index
stateBackend.defaultColumnFamily = columnFamilyHandles.remove(0);
stateBackend.writeBatchWrapper = new RocksDBWriteBatchWrapper(stateBackend.db, stateBackend.writeOptions);
for (int i = 0; i < columnFamilyDescriptors.size(); ++i) {
StateMetaInfoSnapshot stateMetaInfoSnapshot = stateMetaInfoSnapshots.get(i);
ColumnFamilyHandle columnFamilyHandle = columnFamilyHandles.get(i);
RegisteredStateMetaInfoBase stateMetaInfo =
RegisteredStateMetaInfoBase.fromMetaInfoSnapshot(stateMetaInfoSnapshot);
stateBackend.kvStateInformation.put(
stateMetaInfoSnapshot.getName(),
new Tuple2<>(columnFamilyHandle, stateMetaInfo));
}
// use the restore sst files as the base for succeeding checkpoints
synchronized (stateBackend.materializedSstFiles) {
stateBackend.materializedSstFiles.put(
restoreStateHandle.getCheckpointId(),
restoreStateHandle.getSharedStateHandleIDs());
}
stateBackend.lastCompletedCheckpointId = restoreStateHandle.getCheckpointId();
}
/**
* This recreates the new working directory of the recovered RocksDB instance and links/copies the contents from
* a local state.
*/
private void restoreInstanceDirectoryFromPath(Path source) throws IOException {
FileSystem fileSystem = source.getFileSystem();
final FileStatus[] fileStatuses = fileSystem.listStatus(source);
if (fileStatuses == null) {
throw new IOException("Cannot list file statues. Directory " + source + " does not exist.");
}
for (FileStatus fileStatus : fileStatuses) {
final Path filePath = fileStatus.getPath();
final String fileName = filePath.getName();
File restoreFile = new File(source.getPath(), fileName);
File targetFile = new File(stateBackend.instanceRocksDBPath.getPath(), fileName);
if (fileName.endsWith(SST_FILE_SUFFIX)) {
// hardlink'ing the immutable sst-files.
Files.createLink(targetFile.toPath(), restoreFile.toPath());
} else {
// true copy for all other files.
Files.copy(restoreFile.toPath(), targetFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
}
}
}
/**
* Reads Flink's state meta data file from the state handle.
*/
private List<StateMetaInfoSnapshot> readMetaData(
StreamStateHandle metaStateHandle) throws Exception {
FSDataInputStream inputStream = null;
try {
inputStream = metaStateHandle.openInputStream();
stateBackend.cancelStreamRegistry.registerCloseable(inputStream);
// isSerializerPresenceRequired flag is set to false, since for the RocksDB state backend,
// deserialization of state happens lazily during runtime; we depend on the fact
// that the new serializer for states could be compatible, and therefore the restore can continue
// without old serializers required to be present.
KeyedBackendSerializationProxy<T> serializationProxy =
new KeyedBackendSerializationProxy<>(stateBackend.userCodeClassLoader, false);
DataInputView in = new DataInputViewStreamWrapper(inputStream);
serializationProxy.read(in);
// check for key serializer compatibility; this also reconfigures the
// key serializer to be compatible, if it is required and is possible
if (CompatibilityUtil.resolveCompatibilityResult(
serializationProxy.getKeySerializer(),
UnloadableDummyTypeSerializer.class,
serializationProxy.getKeySerializerConfigSnapshot(),
stateBackend.keySerializer)
.isRequiresMigration()) {
// TODO replace with state migration; note that key hash codes need to remain the same after migration
throw new StateMigrationException("The new key serializer is not compatible to read previous keys. " +
"Aborting now since state migration is currently not available");
}
return serializationProxy.getStateMetaInfoSnapshots();
} finally {
if (stateBackend.cancelStreamRegistry.unregisterCloseable(inputStream)) {
inputStream.close();
}
}
}
private void transferAllStateDataToDirectory(
IncrementalKeyedStateHandle restoreStateHandle,
Path dest) throws IOException {
final Map<StateHandleID, StreamStateHandle> sstFiles =
restoreStateHandle.getSharedState();
final Map<StateHandleID, StreamStateHandle> miscFiles =
restoreStateHandle.getPrivateState();
transferAllDataFromStateHandles(sstFiles, dest);
transferAllDataFromStateHandles(miscFiles, dest);
}
/**
* Copies all the files from the given stream state handles to the given path, renaming the files w.r.t. their
* {@link StateHandleID}.
*/
private void transferAllDataFromStateHandles(
Map<StateHandleID, StreamStateHandle> stateHandleMap,
Path restoreInstancePath) throws IOException {
for (Map.Entry<StateHandleID, StreamStateHandle> entry : stateHandleMap.entrySet()) {
StateHandleID stateHandleID = entry.getKey();
StreamStateHandle remoteFileHandle = entry.getValue();
copyStateDataHandleData(new Path(restoreInstancePath, stateHandleID.toString()), remoteFileHandle);
}
}
/**
* Copies the file from a single state handle to the given path.
*/
private void copyStateDataHandleData(
Path restoreFilePath,
StreamStateHandle remoteFileHandle) throws IOException {
FileSystem restoreFileSystem = restoreFilePath.getFileSystem();
FSDataInputStream inputStream = null;
FSDataOutputStream outputStream = null;
try {
inputStream = remoteFileHandle.openInputStream();
stateBackend.cancelStreamRegistry.registerCloseable(inputStream);
outputStream = restoreFileSystem.create(restoreFilePath, FileSystem.WriteMode.OVERWRITE);
stateBackend.cancelStreamRegistry.registerCloseable(outputStream);
byte[] buffer = new byte[8 * 1024];
while (true) {
int numBytes = inputStream.read(buffer);
if (numBytes == -1) {
break;
}
outputStream.write(buffer, 0, numBytes);
}
} finally {
if (stateBackend.cancelStreamRegistry.unregisterCloseable(inputStream)) {
inputStream.close();
}
if (stateBackend.cancelStreamRegistry.unregisterCloseable(outputStream)) {
outputStream.close();
}
}
}
}
// ------------------------------------------------------------------------
// State factories
// ------------------------------------------------------------------------
/**
* Registers a k/v state information, which includes its state id, type, RocksDB column family handle, and serializers.
*
* <p>When restoring from a snapshot, we don’t restore the individual k/v states, just the global RocksDB database and
* the list of k/v state information. When a k/v state is first requested we check here whether we
* already have a registered entry for that and return it (after some necessary state compatibility checks)
* or create a new one if it does not exist.
*/
private <N, S> Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, S>> tryRegisterKvStateInformation(
StateDescriptor<?, S> stateDesc,
TypeSerializer<N> namespaceSerializer,
@Nullable StateSnapshotTransformer<S> snapshotTransformer) throws StateMigrationException {
Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> stateInfo =
kvStateInformation.get(stateDesc.getName());
RegisteredKeyValueStateBackendMetaInfo<N, S> newMetaInfo;
if (stateInfo != null) {
StateMetaInfoSnapshot restoredMetaInfoSnapshot = restoredKvStateMetaInfos.get(stateDesc.getName());
Preconditions.checkState(
restoredMetaInfoSnapshot != null,
"Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo," +
" but its corresponding restored snapshot cannot be found.");
newMetaInfo = RegisteredKeyValueStateBackendMetaInfo.resolveKvStateCompatibility(
restoredMetaInfoSnapshot,
namespaceSerializer,
stateDesc,
snapshotTransformer);
stateInfo.f1 = newMetaInfo;
} else {
String stateName = stateDesc.getName();
newMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>(
stateDesc.getType(),
stateName,
namespaceSerializer,
stateDesc.getSerializer(),
snapshotTransformer);
ColumnFamilyHandle columnFamily = createColumnFamily(stateName);
stateInfo = Tuple2.of(columnFamily, newMetaInfo);
kvStateInformation.put(stateDesc.getName(), stateInfo);
}
return Tuple2.of(stateInfo.f0, newMetaInfo);
}
/**
* Creates a column family handle for use with a k/v state.
*/
private ColumnFamilyHandle createColumnFamily(String stateName) {
byte[] nameBytes = stateName.getBytes(ConfigConstants.DEFAULT_CHARSET);
Preconditions.checkState(!Arrays.equals(RocksDB.DEFAULT_COLUMN_FAMILY, nameBytes),
"The chosen state name 'default' collides with the name of the default column family!");
ColumnFamilyDescriptor columnDescriptor = new ColumnFamilyDescriptor(nameBytes, columnOptions);
try {
return db.createColumnFamily(columnDescriptor);
} catch (RocksDBException e) {
throw new FlinkRuntimeException("Error creating ColumnFamilyHandle.", e);
}
}
@Override
@Nonnull
public <N, SV, SEV, S extends State, IS extends S> IS createInternalState(
@Nonnull TypeSerializer<N> namespaceSerializer,
@Nonnull StateDescriptor<S, SV> stateDesc,
@Nonnull StateSnapshotTransformFactory<SEV> snapshotTransformFactory) throws Exception {
StateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getClass());
if (stateFactory == null) {
String message = String.format("State %s is not supported by %s",
stateDesc.getClass(), this.getClass());
throw new FlinkRuntimeException(message);
}
Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> registerResult = tryRegisterKvStateInformation(
stateDesc, namespaceSerializer, getStateSnapshotTransformer(stateDesc, snapshotTransformFactory));
return stateFactory.createState(stateDesc, registerResult, RocksDBKeyedStateBackend.this);
}
@SuppressWarnings("unchecked")
private <SV, SEV> StateSnapshotTransformer<SV> getStateSnapshotTransformer(
StateDescriptor<?, SV> stateDesc,
StateSnapshotTransformFactory<SEV> snapshotTransformFactory) {
if (stateDesc instanceof ListStateDescriptor) {
Optional<StateSnapshotTransformer<SEV>> original = snapshotTransformFactory.createForDeserializedState();
return original.map(est -> createRocksDBListStateTransformer(stateDesc, est)).orElse(null);
} else {
Optional<StateSnapshotTransformer<byte[]>> original = snapshotTransformFactory.createForSerializedState();
return (StateSnapshotTransformer<SV>) original.orElse(null);
}
}
@SuppressWarnings("unchecked")
private <SV, SEV> StateSnapshotTransformer<SV> createRocksDBListStateTransformer(
StateDescriptor<?, SV> stateDesc,
StateSnapshotTransformer<SEV> elementTransformer) {
return (StateSnapshotTransformer<SV>) new RocksDBListState.StateSnapshotTransformerWrapper<>(
elementTransformer, ((ListStateDescriptor<SEV>) stateDesc).getElementSerializer());
}
/**
* Only visible for testing, DO NOT USE.
*/
public File getInstanceBasePath() {
return instanceBasePath;
}
@Override
public boolean supportsAsynchronousSnapshots() {
return true;
}
@VisibleForTesting
@SuppressWarnings("unchecked")
@Override
public int numKeyValueStateEntries() {
int count = 0;
for (Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> column : kvStateInformation.values()) {
//TODO maybe filterOrTransform only for k/v states
try (RocksIteratorWrapper rocksIterator = getRocksIterator(db, column.f0)) {
rocksIterator.seekToFirst();
while (rocksIterator.isValid()) {
count++;
rocksIterator.next();
}
}
}
return count;
}
/**
* Iterator that merges multiple RocksDB iterators to partition all states into contiguous key-groups.
* The resulting iteration sequence is ordered by (key-group, kv-state).
*/
@VisibleForTesting
static class RocksDBMergeIterator implements AutoCloseable {
private final PriorityQueue<RocksDBKeyedStateBackend.MergeIterator> heap;
private final int keyGroupPrefixByteCount;
private boolean newKeyGroup;
private boolean newKVState;
private boolean valid;
MergeIterator currentSubIterator;
private static final List<Comparator<MergeIterator>> COMPARATORS;
static {
int maxBytes = 2;
COMPARATORS = new ArrayList<>(maxBytes);
for (int i = 0; i < maxBytes; ++i) {
final int currentBytes = i + 1;
COMPARATORS.add((o1, o2) -> {
int arrayCmpRes = compareKeyGroupsForByteArrays(
o1.currentKey, o2.currentKey, currentBytes);
return arrayCmpRes == 0 ? o1.getKvStateId() - o2.getKvStateId() : arrayCmpRes;
});
}
}
RocksDBMergeIterator(
List<Tuple2<RocksIteratorWrapper, Integer>> kvStateIterators,
final int keyGroupPrefixByteCount) {
Preconditions.checkNotNull(kvStateIterators);
Preconditions.checkArgument(keyGroupPrefixByteCount >= 1);
this.keyGroupPrefixByteCount = keyGroupPrefixByteCount;
Comparator<MergeIterator> iteratorComparator = COMPARATORS.get(keyGroupPrefixByteCount - 1);
if (kvStateIterators.size() > 0) {
PriorityQueue<MergeIterator> iteratorPriorityQueue =
new PriorityQueue<>(kvStateIterators.size(), iteratorComparator);
for (Tuple2<RocksIteratorWrapper, Integer> rocksIteratorWithKVStateId : kvStateIterators) {
final RocksIteratorWrapper rocksIterator = rocksIteratorWithKVStateId.f0;
rocksIterator.seekToFirst();
if (rocksIterator.isValid()) {
iteratorPriorityQueue.offer(new MergeIterator(rocksIterator, rocksIteratorWithKVStateId.f1));
} else {
IOUtils.closeQuietly(rocksIterator);
}
}
kvStateIterators.clear();
this.heap = iteratorPriorityQueue;
this.valid = !heap.isEmpty();
this.currentSubIterator = heap.poll();
} else {
// creating a PriorityQueue of size 0 results in an exception.
this.heap = null;
this.valid = false;
}
this.newKeyGroup = true;
this.newKVState = true;
}
/**
* Advance the iterator. Should only be called if {@link #isValid()} returned true. Valid can only chance after
* calls to {@link #next()}.
*/
public void next() {
newKeyGroup = false;
newKVState = false;
final RocksIteratorWrapper rocksIterator = currentSubIterator.getIterator();
rocksIterator.next();
byte[] oldKey = currentSubIterator.getCurrentKey();
if (rocksIterator.isValid()) {
currentSubIterator.currentKey = rocksIterator.key();
if (isDifferentKeyGroup(oldKey, currentSubIterator.getCurrentKey())) {
heap.offer(currentSubIterator);
currentSubIterator = heap.poll();
newKVState = currentSubIterator.getIterator() != rocksIterator;
detectNewKeyGroup(oldKey);
}
} else {
IOUtils.closeQuietly(rocksIterator);
if (heap.isEmpty()) {
currentSubIterator = null;
valid = false;
} else {
currentSubIterator = heap.poll();
newKVState = true;
detectNewKeyGroup(oldKey);
}
}
}
private boolean isDifferentKeyGroup(byte[] a, byte[] b) {
return 0 != compareKeyGroupsForByteArrays(a, b, keyGroupPrefixByteCount);
}
private void detectNewKeyGroup(byte[] oldKey) {
if (isDifferentKeyGroup(oldKey, currentSubIterator.currentKey)) {
newKeyGroup = true;
}
}
/**
* @return key-group for the current key
*/
public int keyGroup() {
int result = 0;
//big endian decode
for (int i = 0; i < keyGroupPrefixByteCount; ++i) {
result <<= 8;
result |= (currentSubIterator.currentKey[i] & 0xFF);
}
return result;
}
public byte[] key() {
return currentSubIterator.getCurrentKey();
}
public byte[] value() {
return currentSubIterator.getIterator().value();
}
/**
* @return Id of K/V state to which the current key belongs.
*/
public int kvStateId() {
return currentSubIterator.getKvStateId();
}
/**
* Indicates if current key starts a new k/v-state, i.e. belong to a different k/v-state than it's predecessor.
* @return true iff the current key belong to a different k/v-state than it's predecessor.
*/
public boolean isNewKeyValueState() {
return newKVState;
}
/**
* Indicates if current key starts a new key-group, i.e. belong to a different key-group than it's predecessor.
* @return true iff the current key belong to a different key-group than it's predecessor.
*/
public boolean isNewKeyGroup() {
return newKeyGroup;
}
/**
* Check if the iterator is still valid. Getters like {@link #key()}, {@link #value()}, etc. as well as
* {@link #next()} should only be called if valid returned true. Should be checked after each call to
* {@link #next()} before accessing iterator state.
* @return True iff this iterator is valid.
*/
public boolean isValid() {
return valid;
}
private static int compareKeyGroupsForByteArrays(byte[] a, byte[] b, int len) {
for (int i = 0; i < len; ++i) {
int diff = (a[i] & 0xFF) - (b[i] & 0xFF);
if (diff != 0) {
return diff;
}
}
return 0;
}
@Override
public void close() {
IOUtils.closeQuietly(currentSubIterator);
currentSubIterator = null;
IOUtils.closeAllQuietly(heap);
heap.clear();
}
}
/**
* Wraps a RocksDB iterator to cache it's current key and assigns an id for the key/value state to the iterator.
* Used by #MergeIterator.
*/
@VisibleForTesting
protected static final class MergeIterator implements AutoCloseable {
/**
* @param iterator The #RocksIterator to wrap .
* @param kvStateId Id of the K/V state to which this iterator belongs.
*/
MergeIterator(RocksIteratorWrapper iterator, int kvStateId) {
this.iterator = Preconditions.checkNotNull(iterator);
this.currentKey = iterator.key();
this.kvStateId = kvStateId;
}
private final RocksIteratorWrapper iterator;
private byte[] currentKey;
private final int kvStateId;
public byte[] getCurrentKey() {
return currentKey;
}
public void setCurrentKey(byte[] currentKey) {
this.currentKey = currentKey;
}
public RocksIteratorWrapper getIterator() {
return iterator;
}
public int getKvStateId() {
return kvStateId;
}
@Override
public void close() {
IOUtils.closeQuietly(iterator);
}
}
private static final class TransformingRocksIteratorWrapper extends RocksIteratorWrapper {
@Nonnull
private final StateSnapshotTransformer<byte[]> stateSnapshotTransformer;
private byte[] current;
public TransformingRocksIteratorWrapper(
@Nonnull RocksIterator iterator,
@Nonnull StateSnapshotTransformer<byte[]> stateSnapshotTransformer) {
super(iterator);
this.stateSnapshotTransformer = stateSnapshotTransformer;
}
@Override
public void seekToFirst() {
super.seekToFirst();
filterOrTransform(super::next);
}
@Override
public void seekToLast() {
super.seekToLast();
filterOrTransform(super::prev);
}
@Override
public void next() {
super.next();
filterOrTransform(super::next);
}
@Override
public void prev() {
super.prev();
filterOrTransform(super::prev);
}
private void filterOrTransform(Runnable advance) {
while (isValid() && (current = stateSnapshotTransformer.filterOrTransform(super.value())) == null) {
advance.run();
}
}
@Override
public byte[] value() {
if (!isValid()) {
throw new IllegalStateException("value() method cannot be called if isValid() is false");
}
return current;
}
}
/**
* Adapter class to bridge between {@link RocksIteratorWrapper} and {@link Iterator} to iterate over the keys. This class
* is not thread safe.
*
* @param <K> the type of the iterated objects, which are keys in RocksDB.
*/
static class RocksIteratorForKeysWrapper<K> implements Iterator<K>, AutoCloseable {
private final RocksIteratorWrapper iterator;
private final String state;
private final TypeSerializer<K> keySerializer;
private final int keyGroupPrefixBytes;
private final byte[] namespaceBytes;
private final boolean ambiguousKeyPossible;
private K nextKey;
private K previousKey;
RocksIteratorForKeysWrapper(
RocksIteratorWrapper iterator,
String state,
TypeSerializer<K> keySerializer,
int keyGroupPrefixBytes,
boolean ambiguousKeyPossible,
byte[] namespaceBytes) {
this.iterator = Preconditions.checkNotNull(iterator);
this.state = Preconditions.checkNotNull(state);
this.keySerializer = Preconditions.checkNotNull(keySerializer);
this.keyGroupPrefixBytes = Preconditions.checkNotNull(keyGroupPrefixBytes);
this.namespaceBytes = Preconditions.checkNotNull(namespaceBytes);
this.nextKey = null;
this.previousKey = null;
this.ambiguousKeyPossible = ambiguousKeyPossible;
}
@Override
public boolean hasNext() {
try {
while (nextKey == null && iterator.isValid()) {
byte[] key = iterator.key();
ByteArrayInputStreamWithPos inputStream =
new ByteArrayInputStreamWithPos(key, keyGroupPrefixBytes, key.length - keyGroupPrefixBytes);
DataInputViewStreamWrapper dataInput = new DataInputViewStreamWrapper(inputStream);
K value = RocksDBKeySerializationUtils.readKey(
keySerializer,
inputStream,
dataInput,
ambiguousKeyPossible);
int namespaceByteStartPos = inputStream.getPosition();
if (isMatchingNameSpace(key, namespaceByteStartPos) && !Objects.equals(previousKey, value)) {
previousKey = value;
nextKey = value;
}
iterator.next();
}
} catch (Exception e) {
throw new FlinkRuntimeException("Failed to access state [" + state + "]", e);
}
return nextKey != null;
}
@Override
public K next() {
if (!hasNext()) {
throw new NoSuchElementException("Failed to access state [" + state + "]");
}
K tmpKey = nextKey;
nextKey = null;
return tmpKey;
}
private boolean isMatchingNameSpace(@Nonnull byte[] key, int beginPos) {
final int namespaceBytesLength = namespaceBytes.length;
final int basicLength = namespaceBytesLength + beginPos;
if (key.length >= basicLength) {
for (int i = 0; i < namespaceBytesLength; ++i) {
if (key[beginPos + i] != namespaceBytes[i]) {
return false;
}
}
return true;
}
return false;
}
@Override
public void close() {
iterator.close();
}
}
private class FullSnapshotStrategy implements SnapshotStrategy<SnapshotResult<KeyedStateHandle>> {
@Override
public RunnableFuture<SnapshotResult<KeyedStateHandle>> performSnapshot(
long checkpointId,
long timestamp,
CheckpointStreamFactory primaryStreamFactory,
CheckpointOptions checkpointOptions) throws Exception {
long startTime = System.currentTimeMillis();
if (kvStateInformation.isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at {}. Returning null.",
timestamp);
}
return DoneFuture.of(SnapshotResult.empty());
}
final SupplierWithException<CheckpointStreamWithResultProvider, Exception> supplier =
localRecoveryConfig.isLocalRecoveryEnabled() &&
(CheckpointType.SAVEPOINT != checkpointOptions.getCheckpointType()) ?
() -> CheckpointStreamWithResultProvider.createDuplicatingStream(
checkpointId,
CheckpointedStateScope.EXCLUSIVE,
primaryStreamFactory,
localRecoveryConfig.getLocalStateDirectoryProvider()) :
() -> CheckpointStreamWithResultProvider.createSimpleStream(
CheckpointedStateScope.EXCLUSIVE,
primaryStreamFactory);
final CloseableRegistry snapshotCloseableRegistry = new CloseableRegistry();
final RocksDBFullSnapshotOperation<K> snapshotOperation =
new RocksDBFullSnapshotOperation<>(
RocksDBKeyedStateBackend.this,
supplier,
snapshotCloseableRegistry);
snapshotOperation.takeDBSnapShot();
// implementation of the async IO operation, based on FutureTask
AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>> ioCallable =
new AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>>() {
@Override
protected void acquireResources() throws Exception {
cancelStreamRegistry.registerCloseable(snapshotCloseableRegistry);
snapshotOperation.openCheckpointStream();
}
@Override
protected void releaseResources() throws Exception {
closeLocalRegistry();
releaseSnapshotOperationResources();
}
private void releaseSnapshotOperationResources() {
// hold the db lock while operation on the db to guard us against async db disposal
snapshotOperation.releaseSnapshotResources();
}
@Override
protected void stopOperation() throws Exception {
closeLocalRegistry();
}
private void closeLocalRegistry() {
if (cancelStreamRegistry.unregisterCloseable(snapshotCloseableRegistry)) {
try {
snapshotCloseableRegistry.close();
} catch (Exception ex) {
LOG.warn("Error closing local registry", ex);
}
}
}
@Nonnull
@Override
public SnapshotResult<KeyedStateHandle> performOperation() throws Exception {
long startTime = System.currentTimeMillis();
if (isStopped()) {
throw new IOException("RocksDB closed.");
}
snapshotOperation.writeDBSnapshot();
LOG.info("Asynchronous RocksDB snapshot ({}, asynchronous part) in thread {} took {} ms.",
primaryStreamFactory, Thread.currentThread(), (System.currentTimeMillis() - startTime));
return snapshotOperation.getSnapshotResultStateHandle();
}
};
LOG.info("Asynchronous RocksDB snapshot ({}, synchronous part) in thread {} took {} ms.",
primaryStreamFactory, Thread.currentThread(), (System.currentTimeMillis() - startTime));
return AsyncStoppableTaskWithCallback.from(ioCallable);
}
}
private class IncrementalSnapshotStrategy implements SnapshotStrategy<SnapshotResult<KeyedStateHandle>> {
private final SnapshotStrategy<SnapshotResult<KeyedStateHandle>> savepointDelegate;
public IncrementalSnapshotStrategy() {
this.savepointDelegate = new FullSnapshotStrategy();
}
@Override
public RunnableFuture<SnapshotResult<KeyedStateHandle>> performSnapshot(
long checkpointId,
long checkpointTimestamp,
CheckpointStreamFactory checkpointStreamFactory,
CheckpointOptions checkpointOptions) throws Exception {
// for savepoints, we delegate to the full snapshot strategy because savepoints are always self-contained.
if (CheckpointType.SAVEPOINT == checkpointOptions.getCheckpointType()) {
return savepointDelegate.performSnapshot(
checkpointId,
checkpointTimestamp,
checkpointStreamFactory,
checkpointOptions);
}
if (db == null) {
throw new IOException("RocksDB closed.");
}
if (kvStateInformation.isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at {}. Returning null.", checkpointTimestamp);
}
return DoneFuture.of(SnapshotResult.empty());
}
SnapshotDirectory snapshotDirectory;
if (localRecoveryConfig.isLocalRecoveryEnabled()) {
// create a "permanent" snapshot directory for local recovery.
LocalRecoveryDirectoryProvider directoryProvider = localRecoveryConfig.getLocalStateDirectoryProvider();
File directory = directoryProvider.subtaskSpecificCheckpointDirectory(checkpointId);
if (directory.exists()) {
FileUtils.deleteDirectory(directory);
}
if (!directory.mkdirs()) {
throw new IOException("Local state base directory for checkpoint " + checkpointId +
" already exists: " + directory);
}
// introduces an extra directory because RocksDB wants a non-existing directory for native checkpoints.
File rdbSnapshotDir = new File(directory, "rocks_db");
Path path = new Path(rdbSnapshotDir.toURI());
// create a "permanent" snapshot directory because local recovery is active.
snapshotDirectory = SnapshotDirectory.permanent(path);
} else {
// create a "temporary" snapshot directory because local recovery is inactive.
Path path = new Path(instanceBasePath.getAbsolutePath(), "chk-" + checkpointId);
snapshotDirectory = SnapshotDirectory.temporary(path);
}
final RocksDBIncrementalSnapshotOperation<K> snapshotOperation =
new RocksDBIncrementalSnapshotOperation<>(
RocksDBKeyedStateBackend.this,
checkpointStreamFactory,
snapshotDirectory,
checkpointId);
try {
snapshotOperation.takeSnapshot();
} catch (Exception e) {
snapshotOperation.stop();
snapshotOperation.releaseResources(true);
throw e;
}
return new FutureTask<SnapshotResult<KeyedStateHandle>>(
snapshotOperation::runSnapshot
) {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
snapshotOperation.stop();
return super.cancel(mayInterruptIfRunning);
}
@Override
protected void done() {
snapshotOperation.releaseResources(isCancelled());
}
};
}
}
/**
* Encapsulates the process to perform a full snapshot of a RocksDBKeyedStateBackend.
*/
@VisibleForTesting
static class RocksDBFullSnapshotOperation<K>
extends AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>> {
static final int FIRST_BIT_IN_BYTE_MASK = 0x80;
static final int END_OF_KEY_GROUP_MARK = 0xFFFF;
private final RocksDBKeyedStateBackend<K> stateBackend;
private final KeyGroupRangeOffsets keyGroupRangeOffsets;
private final SupplierWithException<CheckpointStreamWithResultProvider, Exception> checkpointStreamSupplier;
private final CloseableRegistry snapshotCloseableRegistry;
private final ResourceGuard.Lease dbLease;
private Snapshot snapshot;
private ReadOptions readOptions;
/**
* The state meta data.
*/
private List<StateMetaInfoSnapshot> stateMetaInfoSnapshots;
/**
* The copied column handle.
*/
private List<Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase>> copiedMeta;
private List<Tuple2<RocksIteratorWrapper, Integer>> kvStateIterators;
private CheckpointStreamWithResultProvider checkpointStreamWithResultProvider;
private DataOutputView outputView;
RocksDBFullSnapshotOperation(
RocksDBKeyedStateBackend<K> stateBackend,
SupplierWithException<CheckpointStreamWithResultProvider, Exception> checkpointStreamSupplier,
CloseableRegistry registry) throws IOException {
this.stateBackend = stateBackend;
this.checkpointStreamSupplier = checkpointStreamSupplier;
this.keyGroupRangeOffsets = new KeyGroupRangeOffsets(stateBackend.keyGroupRange);
this.snapshotCloseableRegistry = registry;
this.dbLease = this.stateBackend.rocksDBResourceGuard.acquireResource();
}
/**
* 1) Create a snapshot object from RocksDB.
*
*/
public void takeDBSnapShot() {
Preconditions.checkArgument(snapshot == null, "Only one ongoing snapshot allowed!");
this.stateMetaInfoSnapshots = new ArrayList<>(stateBackend.kvStateInformation.size());
this.copiedMeta = new ArrayList<>(stateBackend.kvStateInformation.size());
for (Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> tuple2 :
stateBackend.kvStateInformation.values()) {
// snapshot meta info
this.stateMetaInfoSnapshots.add(tuple2.f1.snapshot());
this.copiedMeta.add(tuple2);
}
this.snapshot = stateBackend.db.getSnapshot();
}
/**
* 2) Open CheckpointStateOutputStream through the checkpointStreamFactory into which we will write.
*
* @throws Exception
*/
public void openCheckpointStream() throws Exception {
Preconditions.checkArgument(checkpointStreamWithResultProvider == null,
"Output stream for snapshot is already set.");
checkpointStreamWithResultProvider = checkpointStreamSupplier.get();
snapshotCloseableRegistry.registerCloseable(checkpointStreamWithResultProvider);
outputView = new DataOutputViewStreamWrapper(
checkpointStreamWithResultProvider.getCheckpointOutputStream());
}
/**
* 3) Write the actual data from RocksDB from the time we took the snapshot object in (1).
*
* @throws IOException
*/
public void writeDBSnapshot() throws IOException, InterruptedException, RocksDBException {
if (null == snapshot) {
throw new IOException("No snapshot available. Might be released due to cancellation.");
}
Preconditions.checkNotNull(checkpointStreamWithResultProvider, "No output stream to write snapshot.");
writeKVStateMetaData();
writeKVStateData();
}
/**
* 4) Returns a snapshot result for the completed snapshot.
*
* @return snapshot result for the completed snapshot.
*/
@Nonnull
public SnapshotResult<KeyedStateHandle> getSnapshotResultStateHandle() throws IOException {
if (snapshotCloseableRegistry.unregisterCloseable(checkpointStreamWithResultProvider)) {
SnapshotResult<StreamStateHandle> res =
checkpointStreamWithResultProvider.closeAndFinalizeCheckpointStreamResult();
checkpointStreamWithResultProvider = null;
return CheckpointStreamWithResultProvider.toKeyedStateHandleSnapshotResult(res, keyGroupRangeOffsets);
}
return SnapshotResult.empty();
}
/**
* 5) Release the snapshot object for RocksDB and clean up.
*/
public void releaseSnapshotResources() {
checkpointStreamWithResultProvider = null;
if (null != kvStateIterators) {
for (Tuple2<RocksIteratorWrapper, Integer> kvStateIterator : kvStateIterators) {
IOUtils.closeQuietly(kvStateIterator.f0);
}
kvStateIterators = null;
}
if (null != snapshot) {
if (null != stateBackend.db) {
stateBackend.db.releaseSnapshot(snapshot);
}
IOUtils.closeQuietly(snapshot);
snapshot = null;
}
if (null != readOptions) {
IOUtils.closeQuietly(readOptions);
readOptions = null;
}
this.dbLease.close();
}
private void writeKVStateMetaData() throws IOException {
this.kvStateIterators = new ArrayList<>(copiedMeta.size());
int kvStateId = 0;
//retrieve iterator for this k/v states
readOptions = new ReadOptions();
readOptions.setSnapshot(snapshot);
for (Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> tuple2 : copiedMeta) {
RocksIteratorWrapper rocksIteratorWrapper =
getRocksIterator(stateBackend.db, tuple2.f0, tuple2.f1, readOptions);
kvStateIterators.add(new Tuple2<>(rocksIteratorWrapper, kvStateId));
++kvStateId;
}
KeyedBackendSerializationProxy<K> serializationProxy =
new KeyedBackendSerializationProxy<>(
// TODO: this code assumes that writing a serializer is threadsafe, we should support to
// get a serialized form already at state registration time in the future
stateBackend.getKeySerializer(),
stateMetaInfoSnapshots,
!Objects.equals(
UncompressedStreamCompressionDecorator.INSTANCE,
stateBackend.keyGroupCompressionDecorator));
serializationProxy.write(outputView);
}
private void writeKVStateData() throws IOException, InterruptedException {
byte[] previousKey = null;
byte[] previousValue = null;
DataOutputView kgOutView = null;
OutputStream kgOutStream = null;
CheckpointStreamFactory.CheckpointStateOutputStream checkpointOutputStream =
checkpointStreamWithResultProvider.getCheckpointOutputStream();
try {
// Here we transfer ownership of RocksIterators to the RocksDBMergeIterator
try (RocksDBMergeIterator mergeIterator = new RocksDBMergeIterator(
kvStateIterators, stateBackend.keyGroupPrefixBytes)) {
// handover complete, null out to prevent double close
kvStateIterators = null;
//preamble: setup with first key-group as our lookahead
if (mergeIterator.isValid()) {
//begin first key-group by recording the offset
keyGroupRangeOffsets.setKeyGroupOffset(
mergeIterator.keyGroup(),
checkpointOutputStream.getPos());
//write the k/v-state id as metadata
kgOutStream = stateBackend.keyGroupCompressionDecorator.
decorateWithCompression(checkpointOutputStream);
kgOutView = new DataOutputViewStreamWrapper(kgOutStream);
//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
kgOutView.writeShort(mergeIterator.kvStateId());
previousKey = mergeIterator.key();
previousValue = mergeIterator.value();
mergeIterator.next();
}
//main loop: write k/v pairs ordered by (key-group, kv-state), thereby tracking key-group offsets.
while (mergeIterator.isValid()) {
assert (!hasMetaDataFollowsFlag(previousKey));
//set signal in first key byte that meta data will follow in the stream after this k/v pair
if (mergeIterator.isNewKeyGroup() || mergeIterator.isNewKeyValueState()) {
//be cooperative and check for interruption from time to time in the hot loop
checkInterrupted();
setMetaDataFollowsFlagInKey(previousKey);
}
writeKeyValuePair(previousKey, previousValue, kgOutView);
//write meta data if we have to
if (mergeIterator.isNewKeyGroup()) {
//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
kgOutView.writeShort(END_OF_KEY_GROUP_MARK);
// this will just close the outer stream
kgOutStream.close();
//begin new key-group
keyGroupRangeOffsets.setKeyGroupOffset(
mergeIterator.keyGroup(),
checkpointOutputStream.getPos());
//write the kev-state
//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
kgOutStream = stateBackend.keyGroupCompressionDecorator.
decorateWithCompression(checkpointOutputStream);
kgOutView = new DataOutputViewStreamWrapper(kgOutStream);
kgOutView.writeShort(mergeIterator.kvStateId());
} else if (mergeIterator.isNewKeyValueState()) {
//write the k/v-state
//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
kgOutView.writeShort(mergeIterator.kvStateId());
}
//request next k/v pair
previousKey = mergeIterator.key();
previousValue = mergeIterator.value();
mergeIterator.next();
}
}
//epilogue: write last key-group
if (previousKey != null) {
assert (!hasMetaDataFollowsFlag(previousKey));
setMetaDataFollowsFlagInKey(previousKey);
writeKeyValuePair(previousKey, previousValue, kgOutView);
//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
kgOutView.writeShort(END_OF_KEY_GROUP_MARK);
// this will just close the outer stream
kgOutStream.close();
kgOutStream = null;
}
} finally {
// this will just close the outer stream
IOUtils.closeQuietly(kgOutStream);
}
}
private void writeKeyValuePair(byte[] key, byte[] value, DataOutputView out) throws IOException {
BytePrimitiveArraySerializer.INSTANCE.serialize(key, out);
BytePrimitiveArraySerializer.INSTANCE.serialize(value, out);
}
static void setMetaDataFollowsFlagInKey(byte[] key) {
key[0] |= FIRST_BIT_IN_BYTE_MASK;
}
static void clearMetaDataFollowsFlag(byte[] key) {
key[0] &= (~RocksDBFullSnapshotOperation.FIRST_BIT_IN_BYTE_MASK);
}
static boolean hasMetaDataFollowsFlag(byte[] key) {
return 0 != (key[0] & RocksDBFullSnapshotOperation.FIRST_BIT_IN_BYTE_MASK);
}
private static void checkInterrupted() throws InterruptedException {
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException("RocksDB snapshot interrupted.");
}
}
@Override
protected void acquireResources() throws Exception {
stateBackend.cancelStreamRegistry.registerCloseable(snapshotCloseableRegistry);
openCheckpointStream();
}
@Override
protected void releaseResources() {
closeLocalRegistry();
releaseSnapshotOperationResources();
}
private void releaseSnapshotOperationResources() {
// hold the db lock while operation on the db to guard us against async db disposal
releaseSnapshotResources();
}
@Override
protected void stopOperation() {
closeLocalRegistry();
}
private void closeLocalRegistry() {
if (stateBackend.cancelStreamRegistry.unregisterCloseable(snapshotCloseableRegistry)) {
try {
snapshotCloseableRegistry.close();
} catch (Exception ex) {
LOG.warn("Error closing local registry", ex);
}
}
}
@Nonnull
@Override
public SnapshotResult<KeyedStateHandle> performOperation() throws Exception {
long startTime = System.currentTimeMillis();
if (isStopped()) {
throw new IOException("RocksDB closed.");
}
writeDBSnapshot();
LOG.info("Asynchronous RocksDB snapshot ({}, asynchronous part) in thread {} took {} ms.",
checkpointStreamSupplier, Thread.currentThread(), (System.currentTimeMillis() - startTime));
return getSnapshotResultStateHandle();
}
}
/**
* Encapsulates the process to perform an incremental snapshot of a RocksDBKeyedStateBackend.
*/
private static final class RocksDBIncrementalSnapshotOperation<K> {
/** The backend which we snapshot. */
private final RocksDBKeyedStateBackend<K> stateBackend;
/** Stream factory that creates the outpus streams to DFS. */
private final CheckpointStreamFactory checkpointStreamFactory;
/** Id for the current checkpoint. */
private final long checkpointId;
/** All sst files that were part of the last previously completed checkpoint. */
private Set<StateHandleID> baseSstFiles;
/** The state meta data. */
private final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots = new ArrayList<>();
/** Local directory for the RocksDB native backup. */
private SnapshotDirectory localBackupDirectory;
// Registry for all opened i/o streams
private final CloseableRegistry closeableRegistry = new CloseableRegistry();
// new sst files since the last completed checkpoint
private final Map<StateHandleID, StreamStateHandle> sstFiles = new HashMap<>();
// handles to the misc files in the current snapshot
private final Map<StateHandleID, StreamStateHandle> miscFiles = new HashMap<>();
// This lease protects from concurrent disposal of the native rocksdb instance.
private final ResourceGuard.Lease dbLease;
private SnapshotResult<StreamStateHandle> metaStateHandle = null;
private RocksDBIncrementalSnapshotOperation(
RocksDBKeyedStateBackend<K> stateBackend,
CheckpointStreamFactory checkpointStreamFactory,
SnapshotDirectory localBackupDirectory,
long checkpointId) throws IOException {
this.stateBackend = stateBackend;
this.checkpointStreamFactory = checkpointStreamFactory;
this.checkpointId = checkpointId;
this.dbLease = this.stateBackend.rocksDBResourceGuard.acquireResource();
this.localBackupDirectory = localBackupDirectory;
}
private StreamStateHandle materializeStateData(Path filePath) throws Exception {
FSDataInputStream inputStream = null;
CheckpointStreamFactory.CheckpointStateOutputStream outputStream = null;
try {
final byte[] buffer = new byte[8 * 1024];
FileSystem backupFileSystem = localBackupDirectory.getFileSystem();
inputStream = backupFileSystem.open(filePath);
closeableRegistry.registerCloseable(inputStream);
outputStream = checkpointStreamFactory
.createCheckpointStateOutputStream(CheckpointedStateScope.SHARED);
closeableRegistry.registerCloseable(outputStream);
while (true) {
int numBytes = inputStream.read(buffer);
if (numBytes == -1) {
break;
}
outputStream.write(buffer, 0, numBytes);
}
StreamStateHandle result = null;
if (closeableRegistry.unregisterCloseable(outputStream)) {
result = outputStream.closeAndGetHandle();
outputStream = null;
}
return result;
} finally {
if (closeableRegistry.unregisterCloseable(inputStream)) {
inputStream.close();
}
if (closeableRegistry.unregisterCloseable(outputStream)) {
outputStream.close();
}
}
}
@Nonnull
private SnapshotResult<StreamStateHandle> materializeMetaData() throws Exception {
LocalRecoveryConfig localRecoveryConfig = stateBackend.localRecoveryConfig;
CheckpointStreamWithResultProvider streamWithResultProvider =
localRecoveryConfig.isLocalRecoveryEnabled() ?
CheckpointStreamWithResultProvider.createDuplicatingStream(
checkpointId,
CheckpointedStateScope.EXCLUSIVE,
checkpointStreamFactory,
localRecoveryConfig.getLocalStateDirectoryProvider()) :
CheckpointStreamWithResultProvider.createSimpleStream(
CheckpointedStateScope.EXCLUSIVE,
checkpointStreamFactory);
try {
closeableRegistry.registerCloseable(streamWithResultProvider);
//no need for compression scheme support because sst-files are already compressed
KeyedBackendSerializationProxy<K> serializationProxy =
new KeyedBackendSerializationProxy<>(
stateBackend.keySerializer,
stateMetaInfoSnapshots,
false);
DataOutputView out =
new DataOutputViewStreamWrapper(streamWithResultProvider.getCheckpointOutputStream());
serializationProxy.write(out);
if (closeableRegistry.unregisterCloseable(streamWithResultProvider)) {
SnapshotResult<StreamStateHandle> result =
streamWithResultProvider.closeAndFinalizeCheckpointStreamResult();
streamWithResultProvider = null;
return result;
} else {
throw new IOException("Stream already closed and cannot return a handle.");
}
} finally {
if (streamWithResultProvider != null) {
if (closeableRegistry.unregisterCloseable(streamWithResultProvider)) {
IOUtils.closeQuietly(streamWithResultProvider);
}
}
}
}
void takeSnapshot() throws Exception {
final long lastCompletedCheckpoint;
// use the last completed checkpoint as the comparison base.
synchronized (stateBackend.materializedSstFiles) {
lastCompletedCheckpoint = stateBackend.lastCompletedCheckpointId;
baseSstFiles = stateBackend.materializedSstFiles.get(lastCompletedCheckpoint);
}
LOG.trace("Taking incremental snapshot for checkpoint {}. Snapshot is based on last completed checkpoint {} " +
"assuming the following (shared) files as base: {}.", checkpointId, lastCompletedCheckpoint, baseSstFiles);
// save meta data
for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase>> stateMetaInfoEntry
: stateBackend.kvStateInformation.entrySet()) {
stateMetaInfoSnapshots.add(stateMetaInfoEntry.getValue().f1.snapshot());
}
LOG.trace("Local RocksDB checkpoint goes to backup path {}.", localBackupDirectory);
if (localBackupDirectory.exists()) {
throw new IllegalStateException("Unexpected existence of the backup directory.");
}
// create hard links of living files in the snapshot path
try (Checkpoint checkpoint = Checkpoint.create(stateBackend.db)) {
checkpoint.createCheckpoint(localBackupDirectory.getDirectory().getPath());
}
}
@Nonnull
SnapshotResult<KeyedStateHandle> runSnapshot() throws Exception {
stateBackend.cancelStreamRegistry.registerCloseable(closeableRegistry);
// write meta data
metaStateHandle = materializeMetaData();
// sanity checks - they should never fail
Preconditions.checkNotNull(metaStateHandle,
"Metadata was not properly created.");
Preconditions.checkNotNull(metaStateHandle.getJobManagerOwnedSnapshot(),
"Metadata for job manager was not properly created.");
// write state data
Preconditions.checkState(localBackupDirectory.exists());
FileStatus[] fileStatuses = localBackupDirectory.listStatus();
if (fileStatuses != null) {
for (FileStatus fileStatus : fileStatuses) {
final Path filePath = fileStatus.getPath();
final String fileName = filePath.getName();
final StateHandleID stateHandleID = new StateHandleID(fileName);
if (fileName.endsWith(SST_FILE_SUFFIX)) {
final boolean existsAlready =
baseSstFiles != null && baseSstFiles.contains(stateHandleID);
if (existsAlready) {
// we introduce a placeholder state handle, that is replaced with the
// original from the shared state registry (created from a previous checkpoint)
sstFiles.put(
stateHandleID,
new PlaceholderStreamStateHandle());
} else {
sstFiles.put(stateHandleID, materializeStateData(filePath));
}
} else {
StreamStateHandle fileHandle = materializeStateData(filePath);
miscFiles.put(stateHandleID, fileHandle);
}
}
}
synchronized (stateBackend.materializedSstFiles) {
stateBackend.materializedSstFiles.put(checkpointId, sstFiles.keySet());
}
IncrementalKeyedStateHandle jmIncrementalKeyedStateHandle = new IncrementalKeyedStateHandle(
stateBackend.backendUID,
stateBackend.keyGroupRange,
checkpointId,
sstFiles,
miscFiles,
metaStateHandle.getJobManagerOwnedSnapshot());
StreamStateHandle taskLocalSnapshotMetaDataStateHandle = metaStateHandle.getTaskLocalSnapshot();
DirectoryStateHandle directoryStateHandle = null;
try {
directoryStateHandle = localBackupDirectory.completeSnapshotAndGetHandle();
} catch (IOException ex) {
Exception collector = ex;
try {
taskLocalSnapshotMetaDataStateHandle.discardState();
} catch (Exception discardEx) {
collector = ExceptionUtils.firstOrSuppressed(discardEx, collector);
}
LOG.warn("Problem with local state snapshot.", collector);
}
if (directoryStateHandle != null && taskLocalSnapshotMetaDataStateHandle != null) {
IncrementalLocalKeyedStateHandle localDirKeyedStateHandle =
new IncrementalLocalKeyedStateHandle(
stateBackend.backendUID,
checkpointId,
directoryStateHandle,
stateBackend.keyGroupRange,
taskLocalSnapshotMetaDataStateHandle,
sstFiles.keySet());
return SnapshotResult.withLocalState(jmIncrementalKeyedStateHandle, localDirKeyedStateHandle);
} else {
return SnapshotResult.of(jmIncrementalKeyedStateHandle);
}
}
void stop() {
if (stateBackend.cancelStreamRegistry.unregisterCloseable(closeableRegistry)) {
try {
closeableRegistry.close();
} catch (IOException e) {
LOG.warn("Could not properly close io streams.", e);
}
}
}
void releaseResources(boolean canceled) {
dbLease.close();
if (stateBackend.cancelStreamRegistry.unregisterCloseable(closeableRegistry)) {
try {
closeableRegistry.close();
} catch (IOException e) {
LOG.warn("Exception on closing registry.", e);
}
}
try {
if (localBackupDirectory.exists()) {
LOG.trace("Running cleanup for local RocksDB backup directory {}.", localBackupDirectory);
boolean cleanupOk = localBackupDirectory.cleanup();
if (!cleanupOk) {
LOG.debug("Could not properly cleanup local RocksDB backup directory.");
}
}
} catch (IOException e) {
LOG.warn("Could not properly cleanup local RocksDB backup directory.", e);
}
if (canceled) {
Collection<StateObject> statesToDiscard =
new ArrayList<>(1 + miscFiles.size() + sstFiles.size());
statesToDiscard.add(metaStateHandle);
statesToDiscard.addAll(miscFiles.values());
statesToDiscard.addAll(sstFiles.values());
try {
StateUtil.bestEffortDiscardAllStateObjects(statesToDiscard);
} catch (Exception e) {
LOG.warn("Could not properly discard states.", e);
}
if (localBackupDirectory.isSnapshotCompleted()) {
try {
DirectoryStateHandle directoryStateHandle = localBackupDirectory.completeSnapshotAndGetHandle();
if (directoryStateHandle != null) {
directoryStateHandle.discardState();
}
} catch (Exception e) {
LOG.warn("Could not properly discard local state.", e);
}
}
}
}
}
public static RocksIteratorWrapper getRocksIterator(RocksDB db) {
return new RocksIteratorWrapper(db.newIterator());
}
public static RocksIteratorWrapper getRocksIterator(
RocksDB db,
ColumnFamilyHandle columnFamilyHandle) {
return new RocksIteratorWrapper(db.newIterator(columnFamilyHandle));
}
@SuppressWarnings("unchecked")
private static RocksIteratorWrapper getRocksIterator(
RocksDB db,
ColumnFamilyHandle columnFamilyHandle,
RegisteredStateMetaInfoBase metaInfo,
ReadOptions readOptions) {
StateSnapshotTransformer<byte[]> stateSnapshotTransformer = null;
if (metaInfo instanceof RegisteredKeyValueStateBackendMetaInfo) {
stateSnapshotTransformer = (StateSnapshotTransformer<byte[]>)
((RegisteredKeyValueStateBackendMetaInfo<?, ?>) metaInfo).getSnapshotTransformer();
}
RocksIterator rocksIterator = db.newIterator(columnFamilyHandle, readOptions);
return stateSnapshotTransformer == null ?
new RocksIteratorWrapper(rocksIterator) :
new TransformingRocksIteratorWrapper(rocksIterator, stateSnapshotTransformer);
}
/**
* Encapsulates the logic and resources in connection with creating priority queue state structures.
*/
class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory {
/** Default cache size per key-group. */
private static final int DEFAULT_CACHES_SIZE = 128; //TODO make this configurable
/** A shared buffer to serialize elements for the priority queue. */
@Nonnull
private final ByteArrayDataOutputView sharedElementOutView;
/** A shared buffer to de-serialize elements for the priority queue. */
@Nonnull
private final ByteArrayDataInputView sharedElementInView;
RocksDBPriorityQueueSetFactory() {
this.sharedElementOutView = new ByteArrayDataOutputView();
this.sharedElementInView = new ByteArrayDataInputView();
}
@Nonnull
@Override
public <T extends HeapPriorityQueueElement & PriorityComparable & Keyed> KeyGroupedInternalPriorityQueue<T>
create(@Nonnull String stateName, @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
final Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> metaInfoTuple =
tryRegisterPriorityQueueMetaInfo(stateName, byteOrderedElementSerializer);
final ColumnFamilyHandle columnFamilyHandle = metaInfoTuple.f0;
return new KeyGroupPartitionedPriorityQueue<>(
KeyExtractorFunction.forKeyedObjects(),
PriorityComparator.forPriorityComparableObjects(),
new KeyGroupPartitionedPriorityQueue.PartitionQueueSetFactory<T, RocksDBCachingPriorityQueueSet<T>>() {
@Nonnull
@Override
public RocksDBCachingPriorityQueueSet<T> create(
int keyGroupId,
int numKeyGroups,
@Nonnull KeyExtractorFunction<T> keyExtractor,
@Nonnull PriorityComparator<T> elementPriorityComparator) {
TreeOrderedSetCache orderedSetCache = new TreeOrderedSetCache(DEFAULT_CACHES_SIZE);
return new RocksDBCachingPriorityQueueSet<>(
keyGroupId,
keyGroupPrefixBytes,
db,
columnFamilyHandle,
byteOrderedElementSerializer,
sharedElementOutView,
sharedElementInView,
writeBatchWrapper,
orderedSetCache
);
}
},
keyGroupRange,
numberOfKeyGroups);
}
}
@Nonnull
private <T> Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> tryRegisterPriorityQueueMetaInfo(
@Nonnull String stateName,
@Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> metaInfoTuple =
kvStateInformation.get(stateName);
if (metaInfoTuple == null) {
final ColumnFamilyHandle columnFamilyHandle = createColumnFamily(stateName);
RegisteredPriorityQueueStateBackendMetaInfo<T> metaInfo =
new RegisteredPriorityQueueStateBackendMetaInfo<>(stateName, byteOrderedElementSerializer);
metaInfoTuple = new Tuple2<>(columnFamilyHandle, metaInfo);
kvStateInformation.put(stateName, metaInfoTuple);
} else {
// TODO we implement the simple way of supporting the current functionality, mimicking keyed state
// because this should be reworked in FLINK-9376 and then we should have a common algorithm over
// StateMetaInfoSnapshot that avoids this code duplication.
StateMetaInfoSnapshot restoredMetaInfoSnapshot = restoredKvStateMetaInfos.get(stateName);
Preconditions.checkState(
restoredMetaInfoSnapshot != null,
"Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo," +
" but its corresponding restored snapshot cannot be found.");
StateMetaInfoSnapshot.CommonSerializerKeys serializerKey =
StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER;
TypeSerializer<?> metaInfoTypeSerializer = restoredMetaInfoSnapshot.getTypeSerializer(serializerKey);
if (metaInfoTypeSerializer != byteOrderedElementSerializer) {
CompatibilityResult<T> compatibilityResult = CompatibilityUtil.resolveCompatibilityResult(
metaInfoTypeSerializer,
null,
restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(serializerKey),
byteOrderedElementSerializer);
if (compatibilityResult.isRequiresMigration()) {
throw new FlinkRuntimeException(StateMigrationException.notSupported());
}
// update meta info with new serializer
metaInfoTuple.f1 =
new RegisteredPriorityQueueStateBackendMetaInfo<>(stateName, byteOrderedElementSerializer);
}
}
return metaInfoTuple;
}
@Override
public boolean requiresLegacySynchronousTimerSnapshots() {
return priorityQueueFactory instanceof HeapPriorityQueueSetFactory;
}
}