[FLINK-22944][state] Optimize writing changelog
Write state name and type once fully
and later only write a short identifier.
diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractStateChangeLogger.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractStateChangeLogger.java
index 9df0ca6..b6d9408 100644
--- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractStateChangeLogger.java
+++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractStateChangeLogger.java
@@ -52,11 +52,13 @@
protected final RegisteredStateMetaInfoBase metaInfo;
private final StateMetaInfoSnapshot.BackendStateType stateType;
private boolean metaDataWritten = false;
+ private final short stateShortId;
public AbstractStateChangeLogger(
StateChangelogWriter<?> stateChangelogWriter,
InternalKeyContext<Key> keyContext,
- RegisteredStateMetaInfoBase metaInfo) {
+ RegisteredStateMetaInfoBase metaInfo,
+ short stateId) {
this.stateChangelogWriter = checkNotNull(stateChangelogWriter);
this.keyContext = checkNotNull(keyContext);
this.metaInfo = checkNotNull(metaInfo);
@@ -67,6 +69,7 @@
} else {
throw new IllegalArgumentException("Unsupported state type: " + metaInfo);
}
+ this.stateShortId = stateId;
}
@Override
@@ -149,6 +152,8 @@
StateMetaInfoSnapshotReadersWriters.getWriter()
.writeStateMetaInfoSnapshot(metaInfo.snapshot(), out);
writeDefaultValueAndTtl(out);
+ out.writeShort(stateShortId);
+ out.writeByte(stateType.getCode());
}));
metaDataWritten = true;
}
@@ -164,10 +169,7 @@
return serializeRaw(
wrapper -> {
wrapper.writeByte(op.getCode());
- // todo: optimize in FLINK-22944 by either writing short code or grouping and
- // writing once (same for key, ns)
- wrapper.writeUTF(metaInfo.getName());
- wrapper.writeByte(stateType.getCode());
+ wrapper.writeShort(stateShortId);
serializeScope(ns, wrapper);
if (dataWriter != null) {
dataWriter.accept(wrapper);
diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
index 4de278f..6f8de49 100644
--- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
@@ -190,6 +190,13 @@
private final ExecutorService asyncOperationsThreadPool;
+ /**
+ * Provides a unique ID for each state created by this backend instance. A mapping from this ID
+ * to state name is written once along with metadata; afterwards, only ID is written with each
+ * state change for efficiency.
+ */
+ private short lastCreatedStateId = -1;
+
public ChangelogKeyedStateBackend(
AbstractKeyedStateBackend<K> keyedStateBackend,
ExecutionConfig executionConfig,
@@ -379,7 +386,8 @@
keyedStateBackend.getKeyContext(),
stateChangelogWriter,
new RegisteredPriorityQueueStateBackendMetaInfo<>(
- stateName, byteOrderedElementSerializer));
+ stateName, byteOrderedElementSerializer),
+ ++lastCreatedStateId);
queue =
new ChangelogKeyGroupedPriorityQueue<>(
keyedStateBackend.create(stateName, byteOrderedElementSerializer),
@@ -506,7 +514,8 @@
stateChangelogWriter,
meta,
stateDesc.getTtlConfig(),
- stateDesc.getDefaultValue());
+ stateDesc.getDefaultValue(),
+ ++lastCreatedStateId);
IS is =
stateFactory.create(
state,
diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/KvStateChangeLoggerImpl.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/KvStateChangeLoggerImpl.java
index 766f624..d801b81 100644
--- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/KvStateChangeLoggerImpl.java
+++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/KvStateChangeLoggerImpl.java
@@ -52,8 +52,9 @@
StateChangelogWriter<?> stateChangelogWriter,
RegisteredStateMetaInfoBase metaInfo,
StateTtlConfig ttlConfig,
- @Nullable Value defaultValue) {
- super(stateChangelogWriter, keyContext, metaInfo);
+ @Nullable Value defaultValue,
+ short stateId) {
+ super(stateChangelogWriter, keyContext, metaInfo, stateId);
this.keySerializer = checkNotNull(keySerializer);
this.valueSerializer = checkNotNull(valueSerializer);
this.namespaceSerializer = checkNotNull(namespaceSerializer);
diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLoggerImpl.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLoggerImpl.java
index c800d7a..8aa5437 100644
--- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLoggerImpl.java
+++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLoggerImpl.java
@@ -35,8 +35,9 @@
TypeSerializer<T> serializer,
InternalKeyContext<K> keyContext,
StateChangelogWriter<?> stateChangelogWriter,
- RegisteredPriorityQueueStateBackendMetaInfo<T> meta) {
- super(stateChangelogWriter, keyContext, meta);
+ RegisteredPriorityQueueStateBackendMetaInfo<T> meta,
+ short stateId) {
+ super(stateChangelogWriter, keyContext, meta, stateId);
this.serializer = checkNotNull(serializer);
}
diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogBackendLogApplier.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogBackendLogApplier.java
index a61aca6..1e56818 100644
--- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogBackendLogApplier.java
+++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogBackendLogApplier.java
@@ -31,6 +31,7 @@
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
+import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
import org.apache.flink.runtime.state.changelog.StateChange;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoReader;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
@@ -48,6 +49,7 @@
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
+import java.util.Map;
import static org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters.StateTypeHint.KEYED_STATE;
import static org.apache.flink.state.changelog.StateChangeOperation.METADATA;
@@ -63,7 +65,8 @@
public static void apply(
StateChange stateChange,
ChangelogKeyedStateBackend<?> changelogBackend,
- ClassLoader classLoader)
+ ClassLoader classLoader,
+ Map<Short, StateID> stateIds)
throws Exception {
DataInputViewStreamWrapper in =
new DataInputViewStreamWrapper(new ByteArrayInputStream(stateChange.getChange()));
@@ -73,7 +76,8 @@
changelogBackend,
in,
classLoader,
- ChangelogApplierFactoryImpl.INSTANCE);
+ ChangelogApplierFactoryImpl.INSTANCE,
+ stateIds);
}
private static void applyOperation(
@@ -82,27 +86,33 @@
ChangelogKeyedStateBackend<?> backend,
DataInputView in,
ClassLoader classLoader,
- ChangelogApplierFactory factory)
+ ChangelogApplierFactory factory,
+ Map<Short, StateID> stateIds)
throws Exception {
LOG.debug("apply {} in key group {}", operation, keyGroup);
if (operation == METADATA) {
- applyMetaDataChange(in, backend, classLoader);
+ applyMetaDataChange(in, backend, classLoader, stateIds);
} else if (backend.getKeyGroupRange().contains(keyGroup)) {
- applyDataChange(in, factory, backend, operation);
+ applyDataChange(in, factory, backend, operation, stateIds);
}
}
private static void applyMetaDataChange(
- DataInputView in, ChangelogKeyedStateBackend<?> backend, ClassLoader classLoader)
+ DataInputView in,
+ ChangelogKeyedStateBackend<?> backend,
+ ClassLoader classLoader,
+ Map<Short, StateID> stateIds)
throws Exception {
+
StateMetaInfoSnapshot snapshot = readStateMetaInfoSnapshot(in, classLoader);
+ RegisteredStateMetaInfoBase meta;
switch (snapshot.getBackendStateType()) {
case KEY_VALUE:
- restoreKvMetaData(backend, snapshot, in);
- return;
+ meta = restoreKvMetaData(backend, snapshot, in);
+ break;
case PRIORITY_QUEUE:
- restorePqMetaData(backend, snapshot);
- return;
+ meta = restorePqMetaData(backend, snapshot);
+ break;
default:
throw new RuntimeException(
"Unsupported state type: "
@@ -110,6 +120,9 @@
+ ", sate: "
+ snapshot.getName());
}
+ stateIds.put(
+ in.readShort(),
+ new StateID(meta.getName(), BackendStateType.byCode(in.readByte())));
}
private static StateTtlConfig readTtlConfig(DataInputView in) throws IOException {
@@ -133,7 +146,7 @@
return in.readBoolean() ? meta.getStateSerializer().deserialize(in) : null;
}
- private static void restoreKvMetaData(
+ private static RegisteredKeyValueStateBackendMetaInfo restoreKvMetaData(
ChangelogKeyedStateBackend<?> backend, StateMetaInfoSnapshot snapshot, DataInputView in)
throws Exception {
RegisteredKeyValueStateBackendMetaInfo meta =
@@ -150,6 +163,7 @@
stateDescriptor.enableTimeToLive(ttlConfig);
}
backend.getOrCreateKeyedState(meta.getNamespaceSerializer(), stateDescriptor);
+ return meta;
}
private static StateDescriptor toStateDescriptor(
@@ -179,11 +193,12 @@
}
}
- private static void restorePqMetaData(
+ private static RegisteredPriorityQueueStateBackendMetaInfo restorePqMetaData(
ChangelogKeyedStateBackend<?> backend, StateMetaInfoSnapshot snapshot) {
RegisteredPriorityQueueStateBackendMetaInfo meta =
new RegisteredPriorityQueueStateBackendMetaInfo(snapshot);
backend.create(meta.getName(), meta.getElementSerializer());
+ return meta;
}
private static StateMetaInfoSnapshot readStateMetaInfoSnapshot(
@@ -198,11 +213,11 @@
DataInputView in,
ChangelogApplierFactory factory,
ChangelogKeyedStateBackend<?> backend,
- StateChangeOperation operation)
+ StateChangeOperation operation,
+ Map<Short, StateID> stateIds)
throws Exception {
- String name = checkNotNull(in.readUTF());
- BackendStateType type = BackendStateType.byCode(in.readByte());
- ChangelogState state = backend.getExistingStateForRecovery(name, type);
+ StateID id = checkNotNull(stateIds.get(in.readShort()));
+ ChangelogState state = backend.getExistingStateForRecovery(id.stateName, id.stateType);
StateChangeApplier changeApplier = state.getChangeApplier(factory);
changeApplier.apply(operation, in);
}
diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogBackendRestoreOperation.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogBackendRestoreOperation.java
index 4ab881c..d01ba06 100644
--- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogBackendRestoreOperation.java
+++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogBackendRestoreOperation.java
@@ -31,6 +31,8 @@
import org.apache.flink.util.function.FunctionWithException;
import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
@@ -82,12 +84,14 @@
StateChangelogHandleReader<T> changelogHandleReader,
ClassLoader classLoader)
throws Exception {
+ Map<Short, StateID> stateIds = new HashMap<>();
for (ChangelogStateHandle changelogHandle :
backendHandle.getNonMaterializedStateHandles()) {
try (CloseableIterator<StateChange> changes =
changelogHandleReader.getChanges((T) changelogHandle)) {
while (changes.hasNext()) {
- ChangelogBackendLogApplier.apply(changes.next(), backend, classLoader);
+ ChangelogBackendLogApplier.apply(
+ changes.next(), backend, classLoader, stateIds);
}
}
}
diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/StateID.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/StateID.java
new file mode 100644
index 0000000..79f9761
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/StateID.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog.restore;
+
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+
+/** Identifies a state during recovery. */
+final class StateID {
+ final StateMetaInfoSnapshot.BackendStateType stateType;
+ final String stateName;
+
+ StateID(String stateName, StateMetaInfoSnapshot.BackendStateType stateType) {
+ this.stateType = stateType;
+ this.stateName = stateName;
+ }
+}
diff --git a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/KvStateChangeLoggerImplTest.java b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/KvStateChangeLoggerImplTest.java
index a77fcfa..50a6070 100644
--- a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/KvStateChangeLoggerImplTest.java
+++ b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/KvStateChangeLoggerImplTest.java
@@ -50,7 +50,8 @@
writer,
metaInfo,
StateTtlConfig.DISABLED,
- "default");
+ "default",
+ Short.MIN_VALUE);
}
@Override
diff --git a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLoggerImplTest.java b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLoggerImplTest.java
index 672682d..946e4ed 100644
--- a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLoggerImplTest.java
+++ b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLoggerImplTest.java
@@ -37,7 +37,7 @@
RegisteredPriorityQueueStateBackendMetaInfo<String> metaInfo =
new RegisteredPriorityQueueStateBackendMetaInfo<>("test", valueSerializer);
return new PriorityQueueStateChangeLoggerImpl<>(
- valueSerializer, keyContext, writer, metaInfo);
+ valueSerializer, keyContext, writer, metaInfo, Short.MIN_VALUE);
}
@Override