[FLINK-22944][state] Optimize writing changelog

Write state name and type once fully
and later only write a short identifier.
diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractStateChangeLogger.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractStateChangeLogger.java
index 9df0ca6..b6d9408 100644
--- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractStateChangeLogger.java
+++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractStateChangeLogger.java
@@ -52,11 +52,13 @@
     protected final RegisteredStateMetaInfoBase metaInfo;
     private final StateMetaInfoSnapshot.BackendStateType stateType;
     private boolean metaDataWritten = false;
+    private final short stateShortId;
 
     public AbstractStateChangeLogger(
             StateChangelogWriter<?> stateChangelogWriter,
             InternalKeyContext<Key> keyContext,
-            RegisteredStateMetaInfoBase metaInfo) {
+            RegisteredStateMetaInfoBase metaInfo,
+            short stateId) {
         this.stateChangelogWriter = checkNotNull(stateChangelogWriter);
         this.keyContext = checkNotNull(keyContext);
         this.metaInfo = checkNotNull(metaInfo);
@@ -67,6 +69,7 @@
         } else {
             throw new IllegalArgumentException("Unsupported state type: " + metaInfo);
         }
+        this.stateShortId = stateId;
     }
 
     @Override
@@ -149,6 +152,8 @@
                                 StateMetaInfoSnapshotReadersWriters.getWriter()
                                         .writeStateMetaInfoSnapshot(metaInfo.snapshot(), out);
                                 writeDefaultValueAndTtl(out);
+                                out.writeShort(stateShortId);
+                                out.writeByte(stateType.getCode());
                             }));
             metaDataWritten = true;
         }
@@ -164,10 +169,7 @@
         return serializeRaw(
                 wrapper -> {
                     wrapper.writeByte(op.getCode());
-                    // todo: optimize in FLINK-22944 by either writing short code or grouping and
-                    // writing once (same for key, ns)
-                    wrapper.writeUTF(metaInfo.getName());
-                    wrapper.writeByte(stateType.getCode());
+                    wrapper.writeShort(stateShortId);
                     serializeScope(ns, wrapper);
                     if (dataWriter != null) {
                         dataWriter.accept(wrapper);
diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
index 4de278f..6f8de49 100644
--- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
@@ -190,6 +190,13 @@
 
     private final ExecutorService asyncOperationsThreadPool;
 
+    /**
+     * Provides a unique ID for each state created by this backend instance. A mapping from this ID
+     * to state name is written once along with metadata; afterwards, only ID is written with each
+     * state change for efficiency.
+     */
+    private short lastCreatedStateId = -1;
+
     public ChangelogKeyedStateBackend(
             AbstractKeyedStateBackend<K> keyedStateBackend,
             ExecutionConfig executionConfig,
@@ -379,7 +386,8 @@
                             keyedStateBackend.getKeyContext(),
                             stateChangelogWriter,
                             new RegisteredPriorityQueueStateBackendMetaInfo<>(
-                                    stateName, byteOrderedElementSerializer));
+                                    stateName, byteOrderedElementSerializer),
+                            ++lastCreatedStateId);
             queue =
                     new ChangelogKeyGroupedPriorityQueue<>(
                             keyedStateBackend.create(stateName, byteOrderedElementSerializer),
@@ -506,7 +514,8 @@
                         stateChangelogWriter,
                         meta,
                         stateDesc.getTtlConfig(),
-                        stateDesc.getDefaultValue());
+                        stateDesc.getDefaultValue(),
+                        ++lastCreatedStateId);
         IS is =
                 stateFactory.create(
                         state,
diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/KvStateChangeLoggerImpl.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/KvStateChangeLoggerImpl.java
index 766f624..d801b81 100644
--- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/KvStateChangeLoggerImpl.java
+++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/KvStateChangeLoggerImpl.java
@@ -52,8 +52,9 @@
             StateChangelogWriter<?> stateChangelogWriter,
             RegisteredStateMetaInfoBase metaInfo,
             StateTtlConfig ttlConfig,
-            @Nullable Value defaultValue) {
-        super(stateChangelogWriter, keyContext, metaInfo);
+            @Nullable Value defaultValue,
+            short stateId) {
+        super(stateChangelogWriter, keyContext, metaInfo, stateId);
         this.keySerializer = checkNotNull(keySerializer);
         this.valueSerializer = checkNotNull(valueSerializer);
         this.namespaceSerializer = checkNotNull(namespaceSerializer);
diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLoggerImpl.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLoggerImpl.java
index c800d7a..8aa5437 100644
--- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLoggerImpl.java
+++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLoggerImpl.java
@@ -35,8 +35,9 @@
             TypeSerializer<T> serializer,
             InternalKeyContext<K> keyContext,
             StateChangelogWriter<?> stateChangelogWriter,
-            RegisteredPriorityQueueStateBackendMetaInfo<T> meta) {
-        super(stateChangelogWriter, keyContext, meta);
+            RegisteredPriorityQueueStateBackendMetaInfo<T> meta,
+            short stateId) {
+        super(stateChangelogWriter, keyContext, meta, stateId);
         this.serializer = checkNotNull(serializer);
     }
 
diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogBackendLogApplier.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogBackendLogApplier.java
index a61aca6..1e56818 100644
--- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogBackendLogApplier.java
+++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogBackendLogApplier.java
@@ -31,6 +31,7 @@
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
 import org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
+import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
 import org.apache.flink.runtime.state.changelog.StateChange;
 import org.apache.flink.runtime.state.metainfo.StateMetaInfoReader;
 import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
@@ -48,6 +49,7 @@
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.ObjectInputStream;
+import java.util.Map;
 
 import static org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters.StateTypeHint.KEYED_STATE;
 import static org.apache.flink.state.changelog.StateChangeOperation.METADATA;
@@ -63,7 +65,8 @@
     public static void apply(
             StateChange stateChange,
             ChangelogKeyedStateBackend<?> changelogBackend,
-            ClassLoader classLoader)
+            ClassLoader classLoader,
+            Map<Short, StateID> stateIds)
             throws Exception {
         DataInputViewStreamWrapper in =
                 new DataInputViewStreamWrapper(new ByteArrayInputStream(stateChange.getChange()));
@@ -73,7 +76,8 @@
                 changelogBackend,
                 in,
                 classLoader,
-                ChangelogApplierFactoryImpl.INSTANCE);
+                ChangelogApplierFactoryImpl.INSTANCE,
+                stateIds);
     }
 
     private static void applyOperation(
@@ -82,27 +86,33 @@
             ChangelogKeyedStateBackend<?> backend,
             DataInputView in,
             ClassLoader classLoader,
-            ChangelogApplierFactory factory)
+            ChangelogApplierFactory factory,
+            Map<Short, StateID> stateIds)
             throws Exception {
         LOG.debug("apply {} in key group {}", operation, keyGroup);
         if (operation == METADATA) {
-            applyMetaDataChange(in, backend, classLoader);
+            applyMetaDataChange(in, backend, classLoader, stateIds);
         } else if (backend.getKeyGroupRange().contains(keyGroup)) {
-            applyDataChange(in, factory, backend, operation);
+            applyDataChange(in, factory, backend, operation, stateIds);
         }
     }
 
     private static void applyMetaDataChange(
-            DataInputView in, ChangelogKeyedStateBackend<?> backend, ClassLoader classLoader)
+            DataInputView in,
+            ChangelogKeyedStateBackend<?> backend,
+            ClassLoader classLoader,
+            Map<Short, StateID> stateIds)
             throws Exception {
+
         StateMetaInfoSnapshot snapshot = readStateMetaInfoSnapshot(in, classLoader);
+        RegisteredStateMetaInfoBase meta;
         switch (snapshot.getBackendStateType()) {
             case KEY_VALUE:
-                restoreKvMetaData(backend, snapshot, in);
-                return;
+                meta = restoreKvMetaData(backend, snapshot, in);
+                break;
             case PRIORITY_QUEUE:
-                restorePqMetaData(backend, snapshot);
-                return;
+                meta = restorePqMetaData(backend, snapshot);
+                break;
             default:
                 throw new RuntimeException(
                         "Unsupported state type: "
@@ -110,6 +120,9 @@
                                 + ", sate: "
                                 + snapshot.getName());
         }
+        stateIds.put(
+                in.readShort(),
+                new StateID(meta.getName(), BackendStateType.byCode(in.readByte())));
     }
 
     private static StateTtlConfig readTtlConfig(DataInputView in) throws IOException {
@@ -133,7 +146,7 @@
         return in.readBoolean() ? meta.getStateSerializer().deserialize(in) : null;
     }
 
-    private static void restoreKvMetaData(
+    private static RegisteredKeyValueStateBackendMetaInfo restoreKvMetaData(
             ChangelogKeyedStateBackend<?> backend, StateMetaInfoSnapshot snapshot, DataInputView in)
             throws Exception {
         RegisteredKeyValueStateBackendMetaInfo meta =
@@ -150,6 +163,7 @@
             stateDescriptor.enableTimeToLive(ttlConfig);
         }
         backend.getOrCreateKeyedState(meta.getNamespaceSerializer(), stateDescriptor);
+        return meta;
     }
 
     private static StateDescriptor toStateDescriptor(
@@ -179,11 +193,12 @@
         }
     }
 
-    private static void restorePqMetaData(
+    private static RegisteredPriorityQueueStateBackendMetaInfo restorePqMetaData(
             ChangelogKeyedStateBackend<?> backend, StateMetaInfoSnapshot snapshot) {
         RegisteredPriorityQueueStateBackendMetaInfo meta =
                 new RegisteredPriorityQueueStateBackendMetaInfo(snapshot);
         backend.create(meta.getName(), meta.getElementSerializer());
+        return meta;
     }
 
     private static StateMetaInfoSnapshot readStateMetaInfoSnapshot(
@@ -198,11 +213,11 @@
             DataInputView in,
             ChangelogApplierFactory factory,
             ChangelogKeyedStateBackend<?> backend,
-            StateChangeOperation operation)
+            StateChangeOperation operation,
+            Map<Short, StateID> stateIds)
             throws Exception {
-        String name = checkNotNull(in.readUTF());
-        BackendStateType type = BackendStateType.byCode(in.readByte());
-        ChangelogState state = backend.getExistingStateForRecovery(name, type);
+        StateID id = checkNotNull(stateIds.get(in.readShort()));
+        ChangelogState state = backend.getExistingStateForRecovery(id.stateName, id.stateType);
         StateChangeApplier changeApplier = state.getChangeApplier(factory);
         changeApplier.apply(operation, in);
     }
diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogBackendRestoreOperation.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogBackendRestoreOperation.java
index 4ab881c..d01ba06 100644
--- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogBackendRestoreOperation.java
+++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogBackendRestoreOperation.java
@@ -31,6 +31,8 @@
 import org.apache.flink.util.function.FunctionWithException;
 
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Objects;
 import java.util.stream.Collectors;
 
@@ -82,12 +84,14 @@
             StateChangelogHandleReader<T> changelogHandleReader,
             ClassLoader classLoader)
             throws Exception {
+        Map<Short, StateID> stateIds = new HashMap<>();
         for (ChangelogStateHandle changelogHandle :
                 backendHandle.getNonMaterializedStateHandles()) {
             try (CloseableIterator<StateChange> changes =
                     changelogHandleReader.getChanges((T) changelogHandle)) {
                 while (changes.hasNext()) {
-                    ChangelogBackendLogApplier.apply(changes.next(), backend, classLoader);
+                    ChangelogBackendLogApplier.apply(
+                            changes.next(), backend, classLoader, stateIds);
                 }
             }
         }
diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/StateID.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/StateID.java
new file mode 100644
index 0000000..79f9761
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/StateID.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog.restore;
+
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+
+/** Identifies a state during recovery. */
+final class StateID {
+    final StateMetaInfoSnapshot.BackendStateType stateType;
+    final String stateName;
+
+    StateID(String stateName, StateMetaInfoSnapshot.BackendStateType stateType) {
+        this.stateType = stateType;
+        this.stateName = stateName;
+    }
+}
diff --git a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/KvStateChangeLoggerImplTest.java b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/KvStateChangeLoggerImplTest.java
index a77fcfa..50a6070 100644
--- a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/KvStateChangeLoggerImplTest.java
+++ b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/KvStateChangeLoggerImplTest.java
@@ -50,7 +50,8 @@
                 writer,
                 metaInfo,
                 StateTtlConfig.DISABLED,
-                "default");
+                "default",
+                Short.MIN_VALUE);
     }
 
     @Override
diff --git a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLoggerImplTest.java b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLoggerImplTest.java
index 672682d..946e4ed 100644
--- a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLoggerImplTest.java
+++ b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLoggerImplTest.java
@@ -37,7 +37,7 @@
         RegisteredPriorityQueueStateBackendMetaInfo<String> metaInfo =
                 new RegisteredPriorityQueueStateBackendMetaInfo<>("test", valueSerializer);
         return new PriorityQueueStateChangeLoggerImpl<>(
-                valueSerializer, keyContext, writer, metaInfo);
+                valueSerializer, keyContext, writer, metaInfo, Short.MIN_VALUE);
     }
 
     @Override