blob: 38c7076638d24dcd0d728ee002e8ba6ce26e35cc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.contrib.streaming.state.snapshot;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.RocksDbKvStateInfo;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.DirectoryStateHandle;
import org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath;
import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.LocalSnapshotDirectoryProvider;
import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
import org.apache.flink.runtime.state.SnapshotDirectory;
import org.apache.flink.runtime.state.SnapshotResources;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.SnapshotStrategy;
import org.apache.flink.runtime.state.StateUtil;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.ResourceGuard;
import org.rocksdb.Checkpoint;
import org.rocksdb.RocksDB;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
/**
* Abstract base class for {@link SnapshotStrategy} implementations for RocksDB state backend.
*
* @param <K> type of the backend keys.
*/
public abstract class RocksDBSnapshotStrategyBase<K, R extends SnapshotResources>
implements CheckpointListener,
SnapshotStrategy<
KeyedStateHandle,
RocksDBSnapshotStrategyBase.NativeRocksDBSnapshotResources>,
AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(RocksDBSnapshotStrategyBase.class);
@Nonnull private final String description;
/** RocksDB instance from the backend. */
@Nonnull protected RocksDB db;
/** Resource guard for the RocksDB instance. */
@Nonnull protected final ResourceGuard rocksDBResourceGuard;
/** The key serializer of the backend. */
@Nonnull protected final TypeSerializer<K> keySerializer;
/** Key/Value state meta info from the backend. */
@Nonnull protected final LinkedHashMap<String, RocksDbKvStateInfo> kvStateInformation;
/** The key-group range for the task. */
@Nonnull protected final KeyGroupRange keyGroupRange;
/** Number of bytes in the key-group prefix. */
@Nonnegative protected final int keyGroupPrefixBytes;
/** The configuration for local recovery. */
@Nonnull protected final LocalRecoveryConfig localRecoveryConfig;
/** Base path of the RocksDB instance. */
@Nonnull protected final File instanceBasePath;
/** The local directory name of the current snapshot strategy. */
protected final String localDirectoryName;
/** The state handle ids of all sst files materialized in snapshots for previous checkpoints. */
@Nonnull protected final UUID backendUID;
public RocksDBSnapshotStrategyBase(
@Nonnull String description,
@Nonnull RocksDB db,
@Nonnull ResourceGuard rocksDBResourceGuard,
@Nonnull TypeSerializer<K> keySerializer,
@Nonnull LinkedHashMap<String, RocksDbKvStateInfo> kvStateInformation,
@Nonnull KeyGroupRange keyGroupRange,
@Nonnegative int keyGroupPrefixBytes,
@Nonnull LocalRecoveryConfig localRecoveryConfig,
@Nonnull File instanceBasePath,
@Nonnull UUID backendUID) {
this.db = db;
this.rocksDBResourceGuard = rocksDBResourceGuard;
this.keySerializer = keySerializer;
this.kvStateInformation = kvStateInformation;
this.keyGroupRange = keyGroupRange;
this.keyGroupPrefixBytes = keyGroupPrefixBytes;
this.localRecoveryConfig = localRecoveryConfig;
this.description = description;
this.instanceBasePath = instanceBasePath;
this.localDirectoryName = backendUID.toString().replaceAll("[\\-]", "");
this.backendUID = backendUID;
}
@Nonnull
public String getDescription() {
return description;
}
@Override
public NativeRocksDBSnapshotResources syncPrepareResources(long checkpointId) throws Exception {
final SnapshotDirectory snapshotDirectory = prepareLocalSnapshotDirectory(checkpointId);
LOG.trace("Local RocksDB checkpoint goes to backup path {}.", snapshotDirectory);
final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots =
new ArrayList<>(kvStateInformation.size());
final PreviousSnapshot previousSnapshot =
snapshotMetaData(checkpointId, stateMetaInfoSnapshots);
takeDBNativeCheckpoint(snapshotDirectory);
return new NativeRocksDBSnapshotResources(
snapshotDirectory, previousSnapshot, stateMetaInfoSnapshots);
}
protected abstract PreviousSnapshot snapshotMetaData(
long checkpointId, @Nonnull List<StateMetaInfoSnapshot> stateMetaInfoSnapshots);
private void takeDBNativeCheckpoint(@Nonnull SnapshotDirectory outputDirectory)
throws Exception {
// create hard links of living files in the output path
try (ResourceGuard.Lease ignored = rocksDBResourceGuard.acquireResource();
Checkpoint checkpoint = Checkpoint.create(db)) {
checkpoint.createCheckpoint(outputDirectory.getDirectory().toString());
} catch (Exception ex) {
try {
outputDirectory.cleanup();
} catch (IOException cleanupEx) {
ex = ExceptionUtils.firstOrSuppressed(cleanupEx, ex);
}
throw ex;
}
}
@Nonnull
protected SnapshotDirectory prepareLocalSnapshotDirectory(long checkpointId)
throws IOException {
if (localRecoveryConfig.isLocalBackupEnabled()) {
// create a "permanent" snapshot directory for local recovery.
LocalSnapshotDirectoryProvider directoryProvider =
localRecoveryConfig
.getLocalStateDirectoryProvider()
.orElseThrow(LocalRecoveryConfig.localRecoveryNotEnabled());
File directory = directoryProvider.subtaskSpecificCheckpointDirectory(checkpointId);
if (!directory.exists() && !directory.mkdirs()) {
throw new IOException(
"Local state base directory for checkpoint "
+ checkpointId
+ " does not exist and could not be created: "
+ directory);
}
// introduces an extra directory because RocksDB wants a non-existing directory for
// native checkpoints.
// append localDirectoryName here to solve directory collision problem when two stateful
// operators chained in one task.
File rdbSnapshotDir = new File(directory, localDirectoryName);
if (rdbSnapshotDir.exists()) {
FileUtils.deleteDirectory(rdbSnapshotDir);
}
Path path = rdbSnapshotDir.toPath();
// create a "permanent" snapshot directory because local recovery is active.
try {
return SnapshotDirectory.permanent(path);
} catch (IOException ex) {
try {
FileUtils.deleteDirectory(directory);
} catch (IOException delEx) {
ex = ExceptionUtils.firstOrSuppressed(delEx, ex);
}
throw ex;
}
} else {
// create a "temporary" snapshot directory because local recovery is inactive.
File snapshotDir = new File(instanceBasePath, "chk-" + checkpointId);
return SnapshotDirectory.temporary(snapshotDir);
}
}
protected void cleanupIncompleteSnapshot(
@Nonnull CloseableRegistry tmpResourcesRegistry,
@Nonnull SnapshotDirectory localBackupDirectory) {
try {
tmpResourcesRegistry.close();
} catch (Exception e) {
LOG.warn("Could not properly clean tmp resources.", e);
}
if (localBackupDirectory.isSnapshotCompleted()) {
try {
DirectoryStateHandle directoryStateHandle =
localBackupDirectory.completeSnapshotAndGetHandle();
if (directoryStateHandle != null) {
directoryStateHandle.discardState();
}
} catch (Exception e) {
LOG.warn("Could not properly discard local state.", e);
}
}
}
@Nonnull
protected SnapshotResult<StreamStateHandle> materializeMetaData(
@Nonnull CloseableRegistry snapshotCloseableRegistry,
@Nonnull CloseableRegistry tmpResourcesRegistry,
@Nonnull List<StateMetaInfoSnapshot> stateMetaInfoSnapshots,
long checkpointId,
@Nonnull CheckpointStreamFactory checkpointStreamFactory)
throws Exception {
CheckpointStreamWithResultProvider streamWithResultProvider =
localRecoveryConfig.isLocalBackupEnabled()
? CheckpointStreamWithResultProvider.createDuplicatingStream(
checkpointId,
CheckpointedStateScope.EXCLUSIVE,
checkpointStreamFactory,
localRecoveryConfig
.getLocalStateDirectoryProvider()
.orElseThrow(LocalRecoveryConfig.localRecoveryNotEnabled()))
: CheckpointStreamWithResultProvider.createSimpleStream(
CheckpointedStateScope.EXCLUSIVE, checkpointStreamFactory);
snapshotCloseableRegistry.registerCloseable(streamWithResultProvider);
try {
// no need for compression scheme support because sst-files are already compressed
KeyedBackendSerializationProxy<K> serializationProxy =
new KeyedBackendSerializationProxy<>(
keySerializer, stateMetaInfoSnapshots, false);
DataOutputView out =
new DataOutputViewStreamWrapper(
streamWithResultProvider.getCheckpointOutputStream());
serializationProxy.write(out);
if (snapshotCloseableRegistry.unregisterCloseable(streamWithResultProvider)) {
SnapshotResult<StreamStateHandle> result =
streamWithResultProvider.closeAndFinalizeCheckpointStreamResult();
streamWithResultProvider = null;
tmpResourcesRegistry.registerCloseable(
() -> StateUtil.discardStateObjectQuietly(result));
return result;
} else {
throw new IOException("Stream already closed and cannot return a handle.");
}
} finally {
if (snapshotCloseableRegistry.unregisterCloseable(streamWithResultProvider)) {
IOUtils.closeQuietly(streamWithResultProvider);
}
}
}
@Override
public abstract void close() throws IOException;
/** Common operation in native rocksdb snapshot result supplier. */
protected abstract class RocksDBSnapshotOperation
implements SnapshotResultSupplier<KeyedStateHandle> {
/** Id for the current checkpoint. */
protected final long checkpointId;
/** Stream factory that creates the output streams to DFS. */
@Nonnull protected final CheckpointStreamFactory checkpointStreamFactory;
/** The state meta data. */
@Nonnull protected final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots;
/** Local directory for the RocksDB native backup. */
@Nonnull protected final SnapshotDirectory localBackupDirectory;
@Nonnull protected final CloseableRegistry tmpResourcesRegistry;
protected RocksDBSnapshotOperation(
long checkpointId,
@Nonnull CheckpointStreamFactory checkpointStreamFactory,
@Nonnull SnapshotDirectory localBackupDirectory,
@Nonnull List<StateMetaInfoSnapshot> stateMetaInfoSnapshots) {
this.checkpointId = checkpointId;
this.checkpointStreamFactory = checkpointStreamFactory;
this.stateMetaInfoSnapshots = stateMetaInfoSnapshots;
this.localBackupDirectory = localBackupDirectory;
this.tmpResourcesRegistry = new CloseableRegistry();
}
protected Optional<KeyedStateHandle> getLocalSnapshot(
@Nullable StreamStateHandle localStreamStateHandle,
List<HandleAndLocalPath> sharedState)
throws IOException {
final DirectoryStateHandle directoryStateHandle =
localBackupDirectory.completeSnapshotAndGetHandle();
if (directoryStateHandle != null && localStreamStateHandle != null) {
return Optional.of(
new IncrementalLocalKeyedStateHandle(
backendUID,
checkpointId,
directoryStateHandle,
keyGroupRange,
localStreamStateHandle,
sharedState));
} else {
return Optional.empty();
}
}
}
/** A {@link SnapshotResources} for native rocksdb snapshot. */
protected static class NativeRocksDBSnapshotResources implements SnapshotResources {
@Nonnull protected final SnapshotDirectory snapshotDirectory;
@Nonnull protected final PreviousSnapshot previousSnapshot;
@Nonnull protected final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots;
public NativeRocksDBSnapshotResources(
SnapshotDirectory snapshotDirectory,
PreviousSnapshot previousSnapshot,
List<StateMetaInfoSnapshot> stateMetaInfoSnapshots) {
this.snapshotDirectory = snapshotDirectory;
this.previousSnapshot = previousSnapshot;
this.stateMetaInfoSnapshots = stateMetaInfoSnapshots;
}
@Override
public void release() {
try {
if (snapshotDirectory.exists()) {
LOG.trace(
"Running cleanup for local RocksDB backup directory {}.",
snapshotDirectory);
boolean cleanupOk = snapshotDirectory.cleanup();
if (!cleanupOk) {
LOG.debug("Could not properly cleanup local RocksDB backup directory.");
}
}
} catch (IOException e) {
LOG.warn("Could not properly cleanup local RocksDB backup directory.", e);
}
}
}
protected static final PreviousSnapshot EMPTY_PREVIOUS_SNAPSHOT =
new PreviousSnapshot(Collections.emptyList());
/** Previous snapshot with uploaded sst files. */
protected static class PreviousSnapshot {
@Nonnull private final Map<String, StreamStateHandle> confirmedSstFiles;
protected PreviousSnapshot(@Nullable Collection<HandleAndLocalPath> confirmedSstFiles) {
this.confirmedSstFiles =
confirmedSstFiles != null
? confirmedSstFiles.stream()
.collect(
Collectors.toMap(
HandleAndLocalPath::getLocalPath,
HandleAndLocalPath::getHandle))
: Collections.emptyMap();
}
protected Optional<StreamStateHandle> getUploaded(String filename) {
if (confirmedSstFiles.containsKey(filename)) {
StreamStateHandle handle = confirmedSstFiles.get(filename);
// We introduce a placeholder state handle to reduce network transfer overhead,
// it will be replaced by the original handle from the shared state registry
// (created from a previous checkpoint).
return Optional.of(
new PlaceholderStreamStateHandle(
handle.getStreamStateHandleID(),
handle.getStateSize(),
FileMergingSnapshotManager.isFileMergingHandle(handle)));
} else {
// Don't use any uploaded but not confirmed handles because they might be deleted
// (by TM) if the previous checkpoint failed. See FLINK-25395
return Optional.empty();
}
}
}
}