blob: 1e5681859cdfa99cba053f0b543947aeafa9875f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.state.changelog.restore;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.api.common.typeutils.base.MapSerializer;
import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
import org.apache.flink.runtime.state.changelog.StateChange;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoReader;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot.BackendStateType;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters;
import org.apache.flink.state.changelog.ChangelogKeyedStateBackend;
import org.apache.flink.state.changelog.ChangelogState;
import org.apache.flink.state.changelog.StateChangeOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.Map;
import static org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters.StateTypeHint.KEYED_STATE;
import static org.apache.flink.state.changelog.StateChangeOperation.METADATA;
import static org.apache.flink.state.changelog.restore.FunctionDelegationHelper.delegateAggregateFunction;
import static org.apache.flink.state.changelog.restore.FunctionDelegationHelper.delegateReduceFunction;
import static org.apache.flink.util.Preconditions.checkNotNull;
/** Applies {@link StateChange}s to a {@link ChangelogKeyedStateBackend}. */
@SuppressWarnings({"rawtypes", "unchecked"})
class ChangelogBackendLogApplier {
private static final Logger LOG = LoggerFactory.getLogger(ChangelogBackendLogApplier.class);
public static void apply(
StateChange stateChange,
ChangelogKeyedStateBackend<?> changelogBackend,
ClassLoader classLoader,
Map<Short, StateID> stateIds)
throws Exception {
DataInputViewStreamWrapper in =
new DataInputViewStreamWrapper(new ByteArrayInputStream(stateChange.getChange()));
applyOperation(
StateChangeOperation.byCode(in.readByte()),
stateChange.getKeyGroup(),
changelogBackend,
in,
classLoader,
ChangelogApplierFactoryImpl.INSTANCE,
stateIds);
}
private static void applyOperation(
StateChangeOperation operation,
int keyGroup,
ChangelogKeyedStateBackend<?> backend,
DataInputView in,
ClassLoader classLoader,
ChangelogApplierFactory factory,
Map<Short, StateID> stateIds)
throws Exception {
LOG.debug("apply {} in key group {}", operation, keyGroup);
if (operation == METADATA) {
applyMetaDataChange(in, backend, classLoader, stateIds);
} else if (backend.getKeyGroupRange().contains(keyGroup)) {
applyDataChange(in, factory, backend, operation, stateIds);
}
}
private static void applyMetaDataChange(
DataInputView in,
ChangelogKeyedStateBackend<?> backend,
ClassLoader classLoader,
Map<Short, StateID> stateIds)
throws Exception {
StateMetaInfoSnapshot snapshot = readStateMetaInfoSnapshot(in, classLoader);
RegisteredStateMetaInfoBase meta;
switch (snapshot.getBackendStateType()) {
case KEY_VALUE:
meta = restoreKvMetaData(backend, snapshot, in);
break;
case PRIORITY_QUEUE:
meta = restorePqMetaData(backend, snapshot);
break;
default:
throw new RuntimeException(
"Unsupported state type: "
+ snapshot.getBackendStateType()
+ ", sate: "
+ snapshot.getName());
}
stateIds.put(
in.readShort(),
new StateID(meta.getName(), BackendStateType.byCode(in.readByte())));
}
private static StateTtlConfig readTtlConfig(DataInputView in) throws IOException {
if (in.readBoolean()) {
try {
try (ObjectInputStream objectInputStream =
new ObjectInputStream(new DataInputViewStream(in))) {
return (StateTtlConfig) objectInputStream.readObject();
}
} catch (ClassNotFoundException e) {
throw new IOException(e);
}
} else {
return StateTtlConfig.DISABLED;
}
}
@Nullable
private static Object readDefaultValue(
DataInputView in, RegisteredKeyValueStateBackendMetaInfo meta) throws IOException {
return in.readBoolean() ? meta.getStateSerializer().deserialize(in) : null;
}
private static RegisteredKeyValueStateBackendMetaInfo restoreKvMetaData(
ChangelogKeyedStateBackend<?> backend, StateMetaInfoSnapshot snapshot, DataInputView in)
throws Exception {
RegisteredKeyValueStateBackendMetaInfo meta =
new RegisteredKeyValueStateBackendMetaInfo(snapshot);
StateTtlConfig ttlConfig = readTtlConfig(in);
Object defaultValue = readDefaultValue(in, meta);
// Use regular API to create states in both changelog and the base backends the metadata is
// persisted in log before data changes.
// An alternative solution to load metadata "natively" by the base backends would require
// base state to be always present, i.e. the 1st checkpoint would have to be "full" always.
StateDescriptor stateDescriptor = toStateDescriptor(meta, defaultValue);
// todo: support changing ttl (FLINK-23143)
if (ttlConfig.isEnabled()) {
stateDescriptor.enableTimeToLive(ttlConfig);
}
backend.getOrCreateKeyedState(meta.getNamespaceSerializer(), stateDescriptor);
return meta;
}
private static StateDescriptor toStateDescriptor(
RegisteredKeyValueStateBackendMetaInfo meta, @Nullable Object defaultValue) {
switch (meta.getStateType()) {
case VALUE:
return new ValueStateDescriptor(
meta.getName(), meta.getStateSerializer(), defaultValue);
case MAP:
MapSerializer mapSerializer = (MapSerializer) meta.getStateSerializer();
return new MapStateDescriptor(
meta.getName(),
mapSerializer.getKeySerializer(),
mapSerializer.getValueSerializer());
case LIST:
return new ListStateDescriptor(
meta.getName(),
((ListSerializer) meta.getStateSerializer()).getElementSerializer());
case AGGREGATING:
return new AggregatingStateDescriptor(
meta.getName(), delegateAggregateFunction(), meta.getStateSerializer());
case REDUCING:
return new ReducingStateDescriptor(
meta.getName(), delegateReduceFunction(), meta.getStateSerializer());
default:
throw new IllegalArgumentException(meta.getStateType().toString());
}
}
private static RegisteredPriorityQueueStateBackendMetaInfo restorePqMetaData(
ChangelogKeyedStateBackend<?> backend, StateMetaInfoSnapshot snapshot) {
RegisteredPriorityQueueStateBackendMetaInfo meta =
new RegisteredPriorityQueueStateBackendMetaInfo(snapshot);
backend.create(meta.getName(), meta.getElementSerializer());
return meta;
}
private static StateMetaInfoSnapshot readStateMetaInfoSnapshot(
DataInputView in, ClassLoader classLoader) throws IOException {
int version = in.readInt();
StateMetaInfoReader reader =
StateMetaInfoSnapshotReadersWriters.getReader(version, KEYED_STATE);
return reader.readStateMetaInfoSnapshot(in, classLoader);
}
private static void applyDataChange(
DataInputView in,
ChangelogApplierFactory factory,
ChangelogKeyedStateBackend<?> backend,
StateChangeOperation operation,
Map<Short, StateID> stateIds)
throws Exception {
StateID id = checkNotNull(stateIds.get(in.readShort()));
ChangelogState state = backend.getExistingStateForRecovery(id.stateName, id.stateType);
StateChangeApplier changeApplier = state.getChangeApplier(factory);
changeApplier.apply(operation, in);
}
private ChangelogBackendLogApplier() {}
}