blob: 8d2b41a532d1004f1daa7c8b304e2339107ff1cf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.state.changelog;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.InternalCheckpointListener;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.SnapshotType;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.runtime.state.Keyed;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.KeyedStateFunction;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.PriorityComparable;
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
import org.apache.flink.runtime.state.SavepointResources;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateSnapshotTransformer;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.TestableKeyedStateBackend;
import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle;
import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle.ChangelogStateBackendHandleImpl;
import org.apache.flink.runtime.state.changelog.ChangelogStateBackendLocalHandle;
import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
import org.apache.flink.runtime.state.changelog.SequenceNumber;
import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot.BackendStateType;
import org.apache.flink.runtime.state.metrics.LatencyTrackingStateFactory;
import org.apache.flink.runtime.state.ttl.TtlStateFactory;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.state.changelog.restore.ChangelogRestoreTarget;
import org.apache.flink.state.changelog.restore.FunctionDelegationHelper;
import org.apache.flink.state.common.PeriodicMaterializationManager.MaterializationTarget;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.shaded.guava31.com.google.common.io.Closer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory.noTransform;
import static org.apache.flink.state.common.PeriodicMaterializationManager.MaterializationRunnable;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* A {@link KeyedStateBackend} that keeps state on the underlying delegated keyed state backend as
* well as on the state change log.
*
* @param <K> The key by which state is keyed.
*/
@Internal
public class ChangelogKeyedStateBackend<K>
implements CheckpointableKeyedStateBackend<K>,
CheckpointListener,
TestableKeyedStateBackend<K>,
InternalCheckpointListener,
MaterializationTarget {
private static final Logger LOG = LoggerFactory.getLogger(ChangelogKeyedStateBackend.class);
/**
* ChangelogStateBackend only supports CheckpointType.CHECKPOINT; The rest of information in
* CheckpointOptions is not used in Snapshotable#snapshot(). More details in FLINK-23441.
*/
private static final CheckpointOptions CHECKPOINT_OPTIONS =
new CheckpointOptions(
CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault());
/** delegated keyedStateBackend. */
private final AbstractKeyedStateBackend<K> keyedStateBackend;
/**
* This is the cache maintained by the DelegateKeyedStateBackend itself. It is not the same as
* the underlying delegated keyedStateBackend. InternalKvState is a delegated state.
*/
private final Map<String, InternalKvState<K, ?, ?>> keyValueStatesByName;
private final ChangelogStateFactory changelogStateFactory;
private final ExecutionConfig executionConfig;
private final TtlTimeProvider ttlTimeProvider;
private final StateChangelogWriter<? extends ChangelogStateHandle> stateChangelogWriter;
private final Closer closer = Closer.create();
private final CheckpointStreamFactory streamFactory;
private ChangelogSnapshotState changelogSnapshotState;
private long lastCheckpointId = -1L;
private long materializedId = 0;
/** last accessed partitioned state. */
@SuppressWarnings("rawtypes")
private InternalKvState lastState;
/** For caching the last accessed partitioned state. */
private String lastName;
private final FunctionDelegationHelper functionDelegationHelper =
new FunctionDelegationHelper();
private final ChangelogStateBackendMetricGroup metrics;
/**
* {@link SequenceNumber} denoting last upload range <b>start</b>, inclusive. Updated to {@link
* ChangelogSnapshotState#materializedTo} when {@link #snapshot(long, long,
* CheckpointStreamFactory, CheckpointOptions) starting snapshot}. Used to notify {@link
* #stateChangelogWriter} about changelog ranges that were confirmed or aborted by JM.
*/
@Nullable private SequenceNumber lastUploadedFrom;
/**
* {@link SequenceNumber} denoting last upload range <b>end</b>, exclusive. Updated to {@link
* StateChangelogWriter#nextSequenceNumber()} when {@link #snapshot(long, long,
* CheckpointStreamFactory, CheckpointOptions) starting snapshot}. Used to notify {@link
* #stateChangelogWriter} about changelog ranges that were confirmed or aborted by JM.
*/
@Nullable private SequenceNumber lastUploadedTo;
private final String subtaskName;
/**
* Provides a unique ID for each state created by this backend instance. A mapping from this ID
* to state name is written once along with metadata; afterwards, only ID is written with each
* state change for efficiency.
*/
private short lastCreatedStateId = -1;
/** Checkpoint ID mapped to Materialization ID - used to notify nested backend of completion. */
private final NavigableMap<Long, Long> materializationIdByCheckpointId = new TreeMap<>();
private long lastConfirmedMaterializationId = -1L;
/** last failed or cancelled materialization. */
private long lastFailedMaterializationId = -1L;
private final ChangelogTruncateHelper changelogTruncateHelper;
/**
* The local snapshot is considered complete when there is at least one successful
* materialization after rescaling.
*/
private AtomicBoolean hasCompletedMaterialization = new AtomicBoolean(false);
private boolean isRescaling = false;
public ChangelogKeyedStateBackend(
AbstractKeyedStateBackend<K> keyedStateBackend,
String subtaskName,
ExecutionConfig executionConfig,
TtlTimeProvider ttlTimeProvider,
MetricGroup metricGroup,
StateChangelogWriter<? extends ChangelogStateHandle> stateChangelogWriter,
Collection<ChangelogStateBackendHandle> initialState,
CheckpointStorageWorkerView checkpointStorageWorkerView) {
this(
keyedStateBackend,
subtaskName,
executionConfig,
ttlTimeProvider,
new ChangelogStateBackendMetricGroup(metricGroup),
stateChangelogWriter,
initialState,
checkpointStorageWorkerView,
new ChangelogStateFactory());
}
public ChangelogKeyedStateBackend(
AbstractKeyedStateBackend<K> keyedStateBackend,
String subtaskName,
ExecutionConfig executionConfig,
TtlTimeProvider ttlTimeProvider,
ChangelogStateBackendMetricGroup metricGroup,
StateChangelogWriter<? extends ChangelogStateHandle> stateChangelogWriter,
Collection<ChangelogStateBackendHandle> initialState,
CheckpointStorageWorkerView checkpointStorageWorkerView,
ChangelogStateFactory changelogStateFactory) {
this.keyedStateBackend = keyedStateBackend;
this.subtaskName = subtaskName;
this.executionConfig = executionConfig;
this.ttlTimeProvider = ttlTimeProvider;
this.keyValueStatesByName = new HashMap<>();
this.metrics = metricGroup;
this.changelogStateFactory = changelogStateFactory;
this.stateChangelogWriter = stateChangelogWriter;
this.lastUploadedTo = stateChangelogWriter.initialSequenceNumber();
this.closer.register(() -> stateChangelogWriter.truncateAndClose(lastUploadedTo));
this.changelogSnapshotState = completeRestore(initialState);
this.streamFactory =
new CheckpointStreamFactory() {
@Override
public CheckpointStateOutputStream createCheckpointStateOutputStream(
CheckpointedStateScope scope) throws IOException {
return checkpointStorageWorkerView.createTaskOwnedStateStream();
}
@Override
public boolean canFastDuplicate(
StreamStateHandle stateHandle, CheckpointedStateScope scope)
throws IOException {
return false;
}
@Override
public List<StreamStateHandle> duplicate(
List<StreamStateHandle> stateHandles, CheckpointedStateScope scope)
throws IOException {
return null;
}
};
this.closer.register(keyedStateBackend);
this.changelogTruncateHelper = new ChangelogTruncateHelper(stateChangelogWriter);
}
// -------------------- CheckpointableKeyedStateBackend --------------------------------
@Override
public KeyGroupRange getKeyGroupRange() {
return keyedStateBackend.getKeyGroupRange();
}
@Override
public void close() throws IOException {
closer.close();
}
@Override
public void setCurrentKey(K newKey) {
keyedStateBackend.setCurrentKey(newKey);
}
@Override
public K getCurrentKey() {
return keyedStateBackend.getCurrentKey();
}
@Override
public TypeSerializer<K> getKeySerializer() {
return keyedStateBackend.getKeySerializer();
}
@Override
public <N> Stream<K> getKeys(String state, N namespace) {
return keyedStateBackend.getKeys(state, namespace);
}
@Override
public <N> Stream<Tuple2<K, N>> getKeysAndNamespaces(String state) {
return keyedStateBackend.getKeysAndNamespaces(state);
}
@Override
public void dispose() {
keyedStateBackend.dispose();
lastName = null;
lastState = null;
keyValueStatesByName.clear();
changelogStateFactory.dispose();
}
@Override
public void registerKeySelectionListener(KeySelectionListener<K> listener) {
keyedStateBackend.registerKeySelectionListener(listener);
}
@Override
public boolean deregisterKeySelectionListener(KeySelectionListener<K> listener) {
return keyedStateBackend.deregisterKeySelectionListener(listener);
}
@Override
public <N, S extends State, T> void applyToAllKeys(
N namespace,
TypeSerializer<N> namespaceSerializer,
StateDescriptor<S, T> stateDescriptor,
KeyedStateFunction<K, S> function)
throws Exception {
keyedStateBackend.applyToAllKeys(
namespace,
namespaceSerializer,
stateDescriptor,
function,
this::getPartitionedState);
}
@Override
@SuppressWarnings("unchecked")
public <N, S extends State> S getPartitionedState(
N namespace,
TypeSerializer<N> namespaceSerializer,
StateDescriptor<S, ?> stateDescriptor)
throws Exception {
checkNotNull(namespace, "Namespace");
if (lastName != null && lastName.equals(stateDescriptor.getName())) {
lastState.setCurrentNamespace(namespace);
return (S) lastState;
}
final InternalKvState<K, ?, ?> previous =
keyValueStatesByName.get(stateDescriptor.getName());
if (previous != null) {
lastState = previous;
lastState.setCurrentNamespace(namespace);
lastName = stateDescriptor.getName();
functionDelegationHelper.addOrUpdate(stateDescriptor);
return (S) previous;
}
final S state = getOrCreateKeyedState(namespaceSerializer, stateDescriptor);
final InternalKvState<K, N, ?> kvState = (InternalKvState<K, N, ?>) state;
lastName = stateDescriptor.getName();
lastState = kvState;
kvState.setCurrentNamespace(namespace);
return state;
}
@Nonnull
@Override
public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(
long checkpointId,
long timestamp,
@Nonnull CheckpointStreamFactory streamFactory,
@Nonnull CheckpointOptions checkpointOptions)
throws Exception {
if (checkpointOptions.getCheckpointType().isSavepoint()) {
return nativeSavepoint(checkpointId, timestamp, streamFactory, checkpointOptions);
}
// The range to upload may overlap with the previous one(s). To reuse them, we could store
// the previous results either here in the backend or in the writer. However,
// materialization may truncate only a part of the previous result and the backend would
// have to split it somehow for the former option, so the latter is used.
lastCheckpointId = checkpointId;
lastUploadedFrom = changelogSnapshotState.lastMaterializedTo();
lastUploadedTo = stateChangelogWriter.nextSequenceNumber();
changelogTruncateHelper.checkpoint(checkpointId, lastUploadedTo);
LOG.info(
"snapshot of {} for checkpoint {}, change range: {}..{}, materialization ID {}",
subtaskName,
checkpointId,
lastUploadedFrom,
lastUploadedTo,
changelogSnapshotState.getMaterializationID());
ChangelogSnapshotState changelogStateBackendStateCopy = changelogSnapshotState;
materializationIdByCheckpointId.put(
checkpointId, changelogStateBackendStateCopy.materializationID);
return toRunnableFuture(
stateChangelogWriter
.persist(lastUploadedFrom, checkpointId)
.thenApply(
delta ->
buildSnapshotResult(
checkpointId,
delta,
changelogStateBackendStateCopy))
.whenComplete(
(snapshotResult, throwable) ->
metrics.reportSnapshotResult(snapshotResult))
.thenApply(this::castSnapshotResult));
}
private RunnableFuture<SnapshotResult<KeyedStateHandle>> nativeSavepoint(
long checkpointId,
long timestamp,
@Nonnull CheckpointStreamFactory streamFactory,
@Nonnull CheckpointOptions checkpointOptions)
throws Exception {
SnapshotType.SharingFilesStrategy sharingFilesStrategy =
checkpointOptions.getCheckpointType().getSharingFilesStrategy();
if (sharingFilesStrategy != SnapshotType.SharingFilesStrategy.NO_SHARING) {
throw new UnsupportedOperationException(
"ChangelogKeyedStateBackend doesn't support native savepoint with SharingFilesStrategy: "
+ sharingFilesStrategy);
}
long materializationID = materializedId++;
// For NO_SHARING native savepoint, trigger delegated one
RunnableFuture<SnapshotResult<KeyedStateHandle>> delegatedSnapshotResult =
keyedStateBackend.snapshot(
materializationID, timestamp, streamFactory, checkpointOptions);
materializationIdByCheckpointId.put(checkpointId, materializationID);
return new FutureTask<SnapshotResult<KeyedStateHandle>>(
() -> {
SnapshotResult<KeyedStateHandle> result =
FutureUtils.runIfNotDoneAndGet(delegatedSnapshotResult);
return castSnapshotResult(
buildSnapshotResult(
checkpointId,
SnapshotResult.empty(),
new ChangelogSnapshotState(
getMaterializedResult(result), materializationID)));
}) {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return delegatedSnapshotResult.cancel(mayInterruptIfRunning)
&& super.cancel(mayInterruptIfRunning);
}
@Override
public boolean isCancelled() {
return delegatedSnapshotResult.isCancelled() && super.isCancelled();
}
@Override
public boolean isDone() {
return delegatedSnapshotResult.isDone() && super.isDone();
}
};
}
@SuppressWarnings("unchecked")
private SnapshotResult<KeyedStateHandle> castSnapshotResult(SnapshotResult<?> snapshotResult) {
return (SnapshotResult<KeyedStateHandle>) snapshotResult;
}
private SnapshotResult<ChangelogStateBackendHandle> buildSnapshotResult(
long checkpointId,
SnapshotResult<? extends ChangelogStateHandle> delta,
ChangelogSnapshotState changelogStateBackendStateCopy) {
// collections don't change once started and handles are immutable
List<ChangelogStateHandle> prevDeltaCopy =
new ArrayList<>(changelogStateBackendStateCopy.getRestoredNonMaterialized());
long persistedSizeOfThisCheckpoint = 0L;
if (delta != null
&& delta.getJobManagerOwnedSnapshot() != null
&& delta.getJobManagerOwnedSnapshot().getStateSize() > 0) {
prevDeltaCopy.add(delta.getJobManagerOwnedSnapshot());
persistedSizeOfThisCheckpoint +=
delta.getJobManagerOwnedSnapshot().getCheckpointedSize();
}
if (prevDeltaCopy.isEmpty()
&& changelogStateBackendStateCopy.getMaterializedSnapshot().isEmpty()) {
return SnapshotResult.empty();
} else if ((!isRescaling || hasCompletedMaterialization.get())
&& (!changelogStateBackendStateCopy.getLocalMaterializedSnapshot().isEmpty()
|| delta.getTaskLocalSnapshot() != null)) {
List<ChangelogStateHandle> localDeltaCopy =
new ArrayList<>(
changelogStateBackendStateCopy.getLocalRestoredNonMaterialized());
if (delta != null
&& delta.getTaskLocalSnapshot() != null
&& delta.getTaskLocalSnapshot().getStateSize() > 0) {
localDeltaCopy.add(delta.getTaskLocalSnapshot());
}
ChangelogStateBackendHandleImpl jmHandle =
new ChangelogStateBackendHandleImpl(
changelogStateBackendStateCopy.getMaterializedSnapshot(),
prevDeltaCopy,
getKeyGroupRange(),
checkpointId,
changelogStateBackendStateCopy.materializationID,
persistedSizeOfThisCheckpoint);
return SnapshotResult.withLocalState(
jmHandle,
new ChangelogStateBackendLocalHandle(
changelogStateBackendStateCopy.getLocalMaterializedSnapshot(),
localDeltaCopy,
jmHandle));
} else {
return SnapshotResult.of(
new ChangelogStateBackendHandleImpl(
changelogStateBackendStateCopy.getMaterializedSnapshot(),
prevDeltaCopy,
getKeyGroupRange(),
checkpointId,
changelogStateBackendStateCopy.materializationID,
persistedSizeOfThisCheckpoint));
}
}
@Nonnull
@Override
@SuppressWarnings("unchecked")
public <T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>>
KeyGroupedInternalPriorityQueue<T> create(
@Nonnull String stateName,
@Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
KeyGroupedInternalPriorityQueue<T> internalPriorityQueue =
keyedStateBackend.create(stateName, byteOrderedElementSerializer);
ChangelogKeyGroupedPriorityQueue<T> queue =
(ChangelogKeyGroupedPriorityQueue<T>)
changelogStateFactory.getExistingState(
stateName, BackendStateType.PRIORITY_QUEUE);
if (queue == null) {
queue =
changelogStateFactory.create(
stateName,
internalPriorityQueue,
getPqStateChangeLogger(stateName, byteOrderedElementSerializer),
byteOrderedElementSerializer);
} else {
updateChangelogState(
queue, internalPriorityQueue, stateName, byteOrderedElementSerializer);
}
return queue;
}
private <T> void updateChangelogState(
ChangelogKeyGroupedPriorityQueue<T> queue,
KeyGroupedInternalPriorityQueue<T> priorityQueue,
String stateName,
TypeSerializer<T> byteOrderedElementSerializer) {
RegisteredPriorityQueueStateBackendMetaInfo<T> pqMetaInfo =
new RegisteredPriorityQueueStateBackendMetaInfo<>(
stateName, byteOrderedElementSerializer);
PriorityQueueStateChangeLoggerImpl<K, T> stateChangeLogger =
(PriorityQueueStateChangeLoggerImpl<K, T>) queue.getStateChangeLogger();
stateChangeLogger.setMetaInfo(pqMetaInfo);
queue.setDelegatedState(priorityQueue);
}
private <T> StateChangeLogger<T, Void> getPqStateChangeLogger(
String stateName, TypeSerializer<T> byteOrderedElementSerializer) {
PriorityQueueStateChangeLoggerImpl<K, T> priorityQueueStateChangeLogger =
new PriorityQueueStateChangeLoggerImpl<>(
byteOrderedElementSerializer,
keyedStateBackend.getKeyContext(),
stateChangelogWriter,
new RegisteredPriorityQueueStateBackendMetaInfo<>(
stateName, byteOrderedElementSerializer),
++lastCreatedStateId);
closer.register(priorityQueueStateChangeLogger);
return priorityQueueStateChangeLogger;
}
@VisibleForTesting
@Override
public int numKeyValueStateEntries() {
return keyedStateBackend.numKeyValueStateEntries();
}
@Override
public boolean isSafeToReuseKVState() {
return keyedStateBackend.isSafeToReuseKVState();
}
@Nonnull
@Override
public SavepointResources<K> savepoint() throws Exception {
return keyedStateBackend.savepoint();
}
// -------------------- CheckpointListener --------------------------------
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
if (lastCheckpointId == checkpointId) {
// Notify the writer so that it can re-use the previous uploads. Do NOT notify it about
// a range status change if it is not relevant anymore. Otherwise, it could CONFIRM a
// newer upload instead of the previous one. This newer upload could then be re-used
// while in fact JM has discarded its results.
// This might change if the log ownership changes (the method won't likely be needed).
stateChangelogWriter.confirm(lastUploadedFrom, lastUploadedTo, checkpointId);
}
Long materializationID = materializationIdByCheckpointId.remove(checkpointId);
if (materializationID != null) {
if (materializationID > lastConfirmedMaterializationId) {
keyedStateBackend.notifyCheckpointComplete(materializationID);
lastConfirmedMaterializationId = materializationID;
}
}
materializationIdByCheckpointId.headMap(checkpointId, true).clear();
}
@Override
public void notifyCheckpointAborted(long checkpointId) throws Exception {
if (lastCheckpointId == checkpointId) {
// Notify the writer so that it can clean up. Do NOT notify it about a range status
// change if it is not relevant anymore. Otherwise, it could DISCARD a newer upload
// instead of the previous one. Rely on truncation for the cleanup in this case.
// This might change if the log ownership changes (the method won't likely be needed).
stateChangelogWriter.reset(lastUploadedFrom, lastUploadedTo, checkpointId);
}
// TODO: Consider notifying nested state backend about checkpoint abortion (FLINK-25850)
}
// -------- Methods not simply delegating to wrapped state backend ---------
@Override
@SuppressWarnings("unchecked")
public <N, S extends State, T> S getOrCreateKeyedState(
TypeSerializer<N> namespaceSerializer, StateDescriptor<S, T> stateDescriptor)
throws Exception {
checkNotNull(namespaceSerializer, "Namespace serializer");
checkNotNull(
getKeySerializer(),
"State key serializer has not been configured in the config. "
+ "This operation cannot use partitioned state.");
InternalKvState<K, ?, ?> kvState = keyValueStatesByName.get(stateDescriptor.getName());
if (kvState == null) {
if (!stateDescriptor.isSerializerInitialized()) {
stateDescriptor.initializeSerializerUnlessSet(executionConfig);
}
kvState =
LatencyTrackingStateFactory.createStateAndWrapWithLatencyTrackingIfEnabled(
TtlStateFactory.createStateAndWrapWithTtlIfEnabled(
namespaceSerializer, stateDescriptor, this, ttlTimeProvider),
stateDescriptor,
keyedStateBackend.getLatencyTrackingStateConfig());
keyValueStatesByName.put(stateDescriptor.getName(), kvState);
keyedStateBackend.publishQueryableStateIfEnabled(stateDescriptor, kvState);
}
functionDelegationHelper.addOrUpdate(stateDescriptor);
return (S) kvState;
}
@Nonnull
@Override
@SuppressWarnings("unchecked")
public <N, SV, SEV, S extends State, IS extends S> IS createOrUpdateInternalState(
@Nonnull TypeSerializer<N> namespaceSerializer,
@Nonnull StateDescriptor<S, SV> stateDesc,
@Nonnull
StateSnapshotTransformer.StateSnapshotTransformFactory<SEV>
snapshotTransformFactory)
throws Exception {
InternalKvState<K, N, SV> state =
keyedStateBackend.createOrUpdateInternalState(
namespaceSerializer, stateDesc, snapshotTransformFactory);
ChangelogState changelogState =
changelogStateFactory.getExistingState(
stateDesc.getName(), BackendStateType.KEY_VALUE);
if (changelogState == null) {
changelogState =
changelogStateFactory.create(
stateDesc,
state,
getKvStateChangeLogger(state, stateDesc, snapshotTransformFactory),
keyedStateBackend /* pass the nested backend as key context so that it get key updates on recovery*/);
} else {
updateChangelogState(changelogState, state, stateDesc, snapshotTransformFactory);
}
return (IS) changelogState;
}
@SuppressWarnings("unchecked")
private <SV, SEV, S extends State, IS extends InternalKvState<K, N, SV>, N>
void updateChangelogState(
ChangelogState changelogState,
InternalKvState<K, N, SV> state,
StateDescriptor<S, SV> stateDesc,
StateSnapshotTransformer.StateSnapshotTransformFactory<SEV>
snapshotTransformFactory) {
RegisteredKeyValueStateBackendMetaInfo<N, SV> meta =
new RegisteredKeyValueStateBackendMetaInfo<>(
stateDesc.getType(),
stateDesc.getName(),
state.getNamespaceSerializer(),
state.getValueSerializer(),
(StateSnapshotTransformer.StateSnapshotTransformFactory<SV>)
snapshotTransformFactory);
AbstractChangelogState<K, N, SV, IS> kvChangelogState =
(AbstractChangelogState<K, N, SV, IS>) changelogState;
kvChangelogState.setDelegatedState(state);
KvStateChangeLoggerImpl<K, SV, N> stateChangeLogger =
(KvStateChangeLoggerImpl<K, SV, N>) kvChangelogState.getStateChangeLogger();
stateChangeLogger
.setMetaInfo(meta)
.setStateTtlConfig(stateDesc.getTtlConfig())
.setDefaultValue(stateDesc.getDefaultValue());
}
@SuppressWarnings("unchecked")
private <SV, SEV, S extends State, N> KvStateChangeLogger<SV, N> getKvStateChangeLogger(
InternalKvState<K, N, SV> state,
StateDescriptor<S, SV> stateDesc,
StateSnapshotTransformer.StateSnapshotTransformFactory<SEV> snapshotTransformFactory) {
RegisteredKeyValueStateBackendMetaInfo<N, SV> meta =
new RegisteredKeyValueStateBackendMetaInfo<>(
stateDesc.getType(),
stateDesc.getName(),
state.getNamespaceSerializer(),
state.getValueSerializer(),
(StateSnapshotTransformer.StateSnapshotTransformFactory<SV>)
snapshotTransformFactory);
KvStateChangeLoggerImpl<K, SV, N> kvStateChangeLogger =
new KvStateChangeLoggerImpl<>(
state.getKeySerializer(),
state.getNamespaceSerializer(),
state.getValueSerializer(),
keyedStateBackend.getKeyContext(),
stateChangelogWriter,
meta,
stateDesc.getTtlConfig(),
stateDesc.getDefaultValue(),
++lastCreatedStateId);
closer.register(kvStateChangeLogger);
return kvStateChangeLogger;
}
public void registerCloseable(@Nullable Closeable closeable) {
closer.register(closeable);
}
private ChangelogSnapshotState completeRestore(
Collection<ChangelogStateBackendHandle> stateHandles) {
long materializationId = 0L;
List<KeyedStateHandle> materialized = new ArrayList<>();
List<ChangelogStateHandle> restoredNonMaterialized = new ArrayList<>();
List<KeyedStateHandle> localMaterialized = new ArrayList<>();
List<ChangelogStateHandle> localRestoredNonMaterialized = new ArrayList<>();
isRescaling = stateHandles.size() > 1;
for (ChangelogStateBackendHandle h : stateHandles) {
if (h != null) {
if (h instanceof ChangelogStateBackendLocalHandle) {
ChangelogStateBackendLocalHandle localHandle =
(ChangelogStateBackendLocalHandle) h;
materialized.addAll(localHandle.getRemoteMaterializedStateHandles());
restoredNonMaterialized.addAll(
localHandle.getRemoteNonMaterializedStateHandles());
localMaterialized.addAll(localHandle.getMaterializedStateHandles());
localRestoredNonMaterialized.addAll(
localHandle.getNonMaterializedStateHandles());
} else {
materialized.addAll(h.getMaterializedStateHandles());
restoredNonMaterialized.addAll(h.getNonMaterializedStateHandles());
}
// choose max materializationID to handle rescaling
materializationId = Math.max(materializationId, h.getMaterializationID());
}
}
this.lastConfirmedMaterializationId = materializationId;
this.materializedId = materializationId + 1;
if (!isRescaling
&& (!localMaterialized.isEmpty() || !localRestoredNonMaterialized.isEmpty())) {
return new ChangelogSnapshotState(
materialized,
localMaterialized,
restoredNonMaterialized,
localRestoredNonMaterialized,
stateChangelogWriter.initialSequenceNumber(),
materializationId);
}
return new ChangelogSnapshotState(
materialized,
restoredNonMaterialized,
stateChangelogWriter.initialSequenceNumber(),
materializationId);
}
/**
* Initialize state materialization so that materialized data can be persisted durably and
* included into the checkpoint.
*
* <p>This method is not thread safe. It should be called either under a lock or through task
* mailbox executor.
*
* @return a tuple of - future snapshot result from the underlying state backend - a {@link
* SequenceNumber} identifying the latest change in the changelog
*/
@Override
public Optional<MaterializationRunnable> initMaterialization() throws Exception {
if (lastConfirmedMaterializationId < materializedId - 1
&& lastFailedMaterializationId < materializedId - 1) {
// SharedStateRegistry potentially requires that the checkpoint's dependency on the
// shared file be continuous, it will be broken if we trigger a new materialization
// before the previous one has either confirmed or failed. See discussion in
// https://github.com/apache/flink/pull/22669#issuecomment-1593370772 .
LOG.info(
"materialization:{} not confirmed or failed or cancelled, skip trigger new one.",
materializedId - 1);
return Optional.empty();
}
SequenceNumber upTo = stateChangelogWriter.nextSequenceNumber();
SequenceNumber lastMaterializedTo = changelogSnapshotState.lastMaterializedTo();
LOG.info(
"Initialize Materialization. Current changelog writers last append to sequence number {}",
upTo);
if (upTo.compareTo(lastMaterializedTo) > 0) {
LOG.info("Starting materialization from {} : {}", lastMaterializedTo, upTo);
// This ID is not needed for materialization; But since we are re-using the
// streamFactory that is designed for state backend snapshot, which requires unique
// checkpoint ID. A faked materialized Id is provided here.
long materializationID = materializedId++;
MaterializationRunnable materializationRunnable =
new MaterializationRunnable(
keyedStateBackend.snapshot(
materializationID,
System.currentTimeMillis(),
// TODO: implement its own streamFactory.
streamFactory,
CHECKPOINT_OPTIONS),
materializationID,
upTo);
// log metadata after materialization is triggered
changelogStateFactory.resetAllWritingMetaFlags();
return Optional.of(materializationRunnable);
} else {
LOG.debug(
"Skip materialization, last materialized to {} : last log to {}",
lastMaterializedTo,
upTo);
return Optional.empty();
}
}
/**
* This method is not thread safe. It should be called either under a lock or through task
* mailbox executor.
*/
@Override
public void handleMaterializationResult(
SnapshotResult<KeyedStateHandle> materializedSnapshot,
long materializationID,
SequenceNumber upTo) {
LOG.info(
"Task {} finishes materialization, updates the snapshotState upTo {} : {}",
subtaskName,
upTo,
materializedSnapshot);
changelogSnapshotState =
materializedSnapshot.getTaskLocalSnapshot() == null
? new ChangelogSnapshotState(
getMaterializedResult(materializedSnapshot),
Collections.emptyList(),
upTo,
materializationID)
: new ChangelogSnapshotState(
getMaterializedResult(materializedSnapshot),
getLocalMaterializedResult(materializedSnapshot),
Collections.emptyList(),
Collections.emptyList(),
upTo,
materializationID);
hasCompletedMaterialization.set(true);
changelogTruncateHelper.materialized(upTo);
}
@Override
public void handleMaterializationFailureOrCancellation(
long materializationID, SequenceNumber upTo, Throwable cause) {
LOG.info(
"Task {} failed or cancelled materialization:{} which is upTo:{}",
subtaskName,
materializationID,
upTo);
lastFailedMaterializationId = Math.max(lastFailedMaterializationId, materializationID);
}
// TODO: this method may change after the ownership PR
private List<KeyedStateHandle> getMaterializedResult(
@Nonnull SnapshotResult<KeyedStateHandle> materializedSnapshot) {
KeyedStateHandle jobManagerOwned = materializedSnapshot.getJobManagerOwnedSnapshot();
return jobManagerOwned == null ? emptyList() : singletonList(jobManagerOwned);
}
private List<KeyedStateHandle> getLocalMaterializedResult(
@Nonnull SnapshotResult<KeyedStateHandle> materializedSnapshot) {
KeyedStateHandle taskLocalSnapshot = materializedSnapshot.getTaskLocalSnapshot();
return taskLocalSnapshot == null ? emptyList() : singletonList(taskLocalSnapshot);
}
@Override
public KeyedStateBackend<K> getDelegatedKeyedStateBackend(boolean recursive) {
return keyedStateBackend.getDelegatedKeyedStateBackend(recursive);
}
@Override
public void notifyCheckpointSubsumed(long checkpointId) throws Exception {
changelogTruncateHelper.checkpointSubsumed(checkpointId);
}
public ChangelogRestoreTarget<K> getChangelogRestoreTarget() {
return new ChangelogRestoreTarget<K>() {
@Override
public KeyGroupRange getKeyGroupRange() {
return ChangelogKeyedStateBackend.this.getKeyGroupRange();
}
@Override
@SuppressWarnings("unchecked")
public <N, S extends State, V> S createKeyedState(
TypeSerializer<N> namespaceSerializer, StateDescriptor<S, V> stateDescriptor)
throws Exception {
InternalKvState<K, N, V> kvState =
keyedStateBackend.createOrUpdateInternalState(
namespaceSerializer, stateDescriptor, noTransform(), true);
ChangelogState changelogState =
changelogStateFactory.getExistingState(
stateDescriptor.getName(),
StateMetaInfoSnapshot.BackendStateType.KEY_VALUE);
if (changelogState == null) {
changelogState =
changelogStateFactory.create(
stateDescriptor,
kvState,
getKvStateChangeLogger(kvState, stateDescriptor, noTransform()),
keyedStateBackend /* pass the nested backend as key context so that it get key updates on recovery*/);
} else {
updateChangelogState(changelogState, kvState, stateDescriptor, noTransform());
}
functionDelegationHelper.addOrUpdate(stateDescriptor);
return (S) changelogState;
}
@Nonnull
@Override
@SuppressWarnings("unchecked")
public <T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>>
KeyGroupedInternalPriorityQueue<T> createPqState(
@Nonnull String stateName,
@Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
KeyGroupedInternalPriorityQueue<T> internalPriorityQueue =
keyedStateBackend.create(stateName, byteOrderedElementSerializer, true);
ChangelogKeyGroupedPriorityQueue<T> queue =
(ChangelogKeyGroupedPriorityQueue<T>)
changelogStateFactory.getExistingState(
stateName,
StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE);
if (queue == null) {
queue =
changelogStateFactory.create(
stateName,
internalPriorityQueue,
getPqStateChangeLogger(stateName, byteOrderedElementSerializer),
byteOrderedElementSerializer);
} else {
updateChangelogState(
queue, internalPriorityQueue, stateName, byteOrderedElementSerializer);
}
return queue;
}
@Override
public ChangelogState getExistingState(String name, BackendStateType type) {
return changelogStateFactory.getExistingState(name, type);
}
@Override
public CheckpointableKeyedStateBackend<K> getRestoredKeyedStateBackend() {
return ChangelogKeyedStateBackend.this;
}
};
}
private static <T> RunnableFuture<T> toRunnableFuture(CompletableFuture<T> f) {
return new RunnableFuture<T>() {
@Override
public void run() {
f.join();
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return f.cancel(mayInterruptIfRunning);
}
@Override
public boolean isCancelled() {
return f.isCancelled();
}
@Override
public boolean isDone() {
return f.isDone();
}
@Override
public T get() throws InterruptedException, ExecutionException {
return f.get();
}
@Override
public T get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return f.get(timeout, unit);
}
};
}
/**
* Snapshot State for ChangelogKeyedStateBackend, a wrapper over {@link SnapshotResult}.
*
* <p>It includes three parts: - materialized snapshot from the underlying delegated state
* backend - non-materialized part in the current changelog - non-materialized changelog, from
* previous logs (before failover or rescaling)
*/
private class ChangelogSnapshotState {
/** Set initially on restore and later upon materialization. */
private final SnapshotResult<ChangelogStateBackendHandle> changelogSnapshot;
/**
* The {@link SequenceNumber} up to which the state is materialized, exclusive. This
* indicates the non-materialized part of the current changelog.
*/
private final SequenceNumber materializedTo;
/** ID of this materialization corresponding to the nested backend checkpoint ID. */
private final long materializationID;
/**
* Construct a ChangelogSnapshotState with empty non-materialized part, which could be used
* when triggering manual materialization.
*/
public ChangelogSnapshotState(
List<KeyedStateHandle> materializedSnapshot, long materializationID) {
this(
materializedSnapshot,
Collections.emptyList(),
SequenceNumber.of(Long.MAX_VALUE),
materializationID);
}
public ChangelogSnapshotState(
List<KeyedStateHandle> materializedSnapshot,
List<ChangelogStateHandle> restoredNonMaterialized,
SequenceNumber materializedTo,
long materializationID) {
this.changelogSnapshot =
SnapshotResult.of(
new ChangelogStateBackendHandleImpl(
materializedSnapshot,
restoredNonMaterialized,
getKeyGroupRange(),
lastCheckpointId,
materializationID,
0L));
this.materializedTo = materializedTo;
this.materializationID = materializationID;
}
public ChangelogSnapshotState(
List<KeyedStateHandle> materializedSnapshot,
List<KeyedStateHandle> localMaterializedSnapshot,
List<ChangelogStateHandle> restoredNonMaterialized,
List<ChangelogStateHandle> localRestoredNonMaterialized,
SequenceNumber materializedTo,
long materializationID) {
ChangelogStateBackendHandleImpl jmHandle =
new ChangelogStateBackendHandleImpl(
materializedSnapshot,
restoredNonMaterialized,
getKeyGroupRange(),
lastCheckpointId,
materializationID,
0L);
this.changelogSnapshot =
SnapshotResult.withLocalState(
jmHandle,
new ChangelogStateBackendLocalHandle(
localMaterializedSnapshot,
localRestoredNonMaterialized,
jmHandle));
this.materializedTo = materializedTo;
this.materializationID = materializationID;
}
public List<KeyedStateHandle> getMaterializedSnapshot() {
return changelogSnapshot.getJobManagerOwnedSnapshot() != null
? changelogSnapshot.getJobManagerOwnedSnapshot().getMaterializedStateHandles()
: Collections.emptyList();
}
public List<KeyedStateHandle> getLocalMaterializedSnapshot() {
return changelogSnapshot.getTaskLocalSnapshot() != null
? changelogSnapshot.getTaskLocalSnapshot().getMaterializedStateHandles()
: Collections.emptyList();
}
public List<ChangelogStateHandle> getRestoredNonMaterialized() {
return changelogSnapshot.getJobManagerOwnedSnapshot() != null
? changelogSnapshot
.getJobManagerOwnedSnapshot()
.getNonMaterializedStateHandles()
: Collections.emptyList();
}
public List<ChangelogStateHandle> getLocalRestoredNonMaterialized() {
return changelogSnapshot.getTaskLocalSnapshot() != null
? changelogSnapshot.getTaskLocalSnapshot().getNonMaterializedStateHandles()
: Collections.emptyList();
}
public SequenceNumber lastMaterializedTo() {
return materializedTo;
}
public long getMaterializationID() {
return materializationID;
}
}
@VisibleForTesting
StateChangelogWriter<? extends ChangelogStateHandle> getChangelogWriter() {
return stateChangelogWriter;
}
}