[FLINK-38216][checkpoint][refactor] Split EndOfChannelStateEvent into EndOfInputChannelStateEvent and EndOfOutputChannelStateEvent
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
index f30e8e2..e73d916 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
@@ -43,7 +43,8 @@
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
-import org.apache.flink.runtime.io.network.partition.consumer.EndOfChannelStateEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.EndOfInputChannelStateEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.EndOfOutputChannelStateEvent;
 import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 import org.apache.flink.util.InstantiationUtil;
 
@@ -70,7 +71,7 @@
 
     private static final int CANCEL_CHECKPOINT_MARKER_EVENT = 4;
 
-    private static final int END_OF_CHANNEL_STATE_EVENT = 5;
+    private static final int END_OF_OUTPUT_CHANNEL_STATE_EVENT = 5;
 
     private static final int ANNOUNCEMENT_EVENT = 6;
 
@@ -84,6 +85,8 @@
 
     private static final int GENERALIZED_WATERMARK_EVENT = 11;
 
+    private static final int END_OF_INPUT_CHANNEL_STATE_EVENT = 12;
+
     private static final byte CHECKPOINT_TYPE_CHECKPOINT = 0;
 
     private static final byte CHECKPOINT_TYPE_SAVEPOINT = 1;
@@ -109,8 +112,10 @@
             return serializeCheckpointBarrier((CheckpointBarrier) event);
         } else if (eventClass == EndOfSuperstepEvent.class) {
             return ByteBuffer.wrap(new byte[] {0, 0, 0, END_OF_SUPERSTEP_EVENT});
-        } else if (eventClass == EndOfChannelStateEvent.class) {
-            return ByteBuffer.wrap(new byte[] {0, 0, 0, END_OF_CHANNEL_STATE_EVENT});
+        } else if (eventClass == EndOfOutputChannelStateEvent.class) {
+            return ByteBuffer.wrap(new byte[] {0, 0, 0, END_OF_OUTPUT_CHANNEL_STATE_EVENT});
+        } else if (eventClass == EndOfInputChannelStateEvent.class) {
+            return ByteBuffer.wrap(new byte[] {0, 0, 0, END_OF_INPUT_CHANNEL_STATE_EVENT});
         } else if (eventClass == EndOfData.class) {
             return ByteBuffer.wrap(
                     new byte[] {
@@ -197,8 +202,10 @@
                 return deserializeCheckpointBarrier(buffer);
             } else if (type == END_OF_SUPERSTEP_EVENT) {
                 return EndOfSuperstepEvent.INSTANCE;
-            } else if (type == END_OF_CHANNEL_STATE_EVENT) {
-                return EndOfChannelStateEvent.INSTANCE;
+            } else if (type == END_OF_OUTPUT_CHANNEL_STATE_EVENT) {
+                return EndOfOutputChannelStateEvent.INSTANCE;
+            } else if (type == END_OF_INPUT_CHANNEL_STATE_EVENT) {
+                return EndOfInputChannelStateEvent.INSTANCE;
             } else if (type == END_OF_USER_RECORDS_EVENT) {
                 return new EndOfData(StopMode.values()[buffer.get()]);
             } else if (type == CANCEL_CHECKPOINT_MARKER_EVENT) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
index 6c9ae40..38c0641 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
@@ -24,7 +24,7 @@
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.EndOfData;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.EndOfChannelStateEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.EndOfOutputChannelStateEvent;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
 import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
@@ -408,7 +408,7 @@
         public static DataType getDataType(AbstractEvent event, boolean hasPriority) {
             if (hasPriority) {
                 return PRIORITIZED_EVENT_BUFFER;
-            } else if (event instanceof EndOfChannelStateEvent) {
+            } else if (event instanceof EndOfOutputChannelStateEvent) {
                 return RECOVERY_COMPLETION;
             } else if (event instanceof EndOfData) {
                 return END_OF_DATA;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedResultPartition.java
index 268f34c..0060bf7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedResultPartition.java
@@ -28,7 +28,7 @@
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.logger.NetworkActionsLogger;
-import org.apache.flink.runtime.io.network.partition.consumer.EndOfChannelStateEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.EndOfOutputChannelStateEvent;
 import org.apache.flink.util.function.SupplierWithException;
 
 import javax.annotation.Nullable;
@@ -262,7 +262,7 @@
             return;
         }
         try (BufferConsumer eventBufferConsumer =
-                EventSerializer.toBufferConsumer(EndOfChannelStateEvent.INSTANCE, false)) {
+                EventSerializer.toBufferConsumer(EndOfOutputChannelStateEvent.INSTANCE, false)) {
             for (int i = 0; i < subpartitions.length; i++) {
                 if (((PipelinedSubpartition) subpartitions[i]).isSupportChannelStateRecover()) {
                     addToSubpartition(i, eventBufferConsumer.copy(), 0);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/EndOfChannelStateEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/EndOfInputChannelStateEvent.java
similarity index 77%
copy from flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/EndOfChannelStateEvent.java
copy to flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/EndOfInputChannelStateEvent.java
index ea0f71a..e9f39f7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/EndOfChannelStateEvent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/EndOfInputChannelStateEvent.java
@@ -22,20 +22,16 @@
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.runtime.event.RuntimeEvent;
 
-/**
- * Marks the end of recovered state of {@link RecoveredInputChannel} of this subtask or {@link
- * org.apache.flink.runtime.io.network.partition.ResultSubpartition ResultSubpartition} on the
- * upstream.
- */
-public class EndOfChannelStateEvent extends RuntimeEvent {
+/** Marks the end of recovered state of {@link RecoveredInputChannel} of this subtask. */
+public class EndOfInputChannelStateEvent extends RuntimeEvent {
 
     /** The singleton instance of this event. */
-    public static final EndOfChannelStateEvent INSTANCE = new EndOfChannelStateEvent();
+    public static final EndOfInputChannelStateEvent INSTANCE = new EndOfInputChannelStateEvent();
 
     // ------------------------------------------------------------------------
 
     // not instantiable
-    private EndOfChannelStateEvent() {}
+    private EndOfInputChannelStateEvent() {}
 
     // ------------------------------------------------------------------------
 
@@ -53,12 +49,12 @@
 
     @Override
     public int hashCode() {
-        return 1965146670;
+        return 20250813;
     }
 
     @Override
     public boolean equals(Object obj) {
-        return obj != null && obj.getClass() == EndOfChannelStateEvent.class;
+        return obj != null && obj.getClass() == EndOfInputChannelStateEvent.class;
     }
 
     @Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/EndOfChannelStateEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/EndOfOutputChannelStateEvent.java
similarity index 83%
rename from flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/EndOfChannelStateEvent.java
rename to flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/EndOfOutputChannelStateEvent.java
index ea0f71a..fd299e2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/EndOfChannelStateEvent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/EndOfOutputChannelStateEvent.java
@@ -23,19 +23,19 @@
 import org.apache.flink.runtime.event.RuntimeEvent;
 
 /**
- * Marks the end of recovered state of {@link RecoveredInputChannel} of this subtask or {@link
+ * Marks the end of recovered state of {@link
  * org.apache.flink.runtime.io.network.partition.ResultSubpartition ResultSubpartition} on the
  * upstream.
  */
-public class EndOfChannelStateEvent extends RuntimeEvent {
+public class EndOfOutputChannelStateEvent extends RuntimeEvent {
 
     /** The singleton instance of this event. */
-    public static final EndOfChannelStateEvent INSTANCE = new EndOfChannelStateEvent();
+    public static final EndOfOutputChannelStateEvent INSTANCE = new EndOfOutputChannelStateEvent();
 
     // ------------------------------------------------------------------------
 
     // not instantiable
-    private EndOfChannelStateEvent() {}
+    private EndOfOutputChannelStateEvent() {}
 
     // ------------------------------------------------------------------------
 
@@ -58,7 +58,7 @@
 
     @Override
     public boolean equals(Object obj) {
-        return obj != null && obj.getClass() == EndOfChannelStateEvent.class;
+        return obj != null && obj.getClass() == EndOfOutputChannelStateEvent.class;
     }
 
     @Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java
index 1f41a09..1d31a65 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java
@@ -154,7 +154,8 @@
     }
 
     public void finishReadRecoveredState() throws IOException {
-        onRecoveredStateBuffer(EventSerializer.toBuffer(EndOfChannelStateEvent.INSTANCE, false));
+        onRecoveredStateBuffer(
+                EventSerializer.toBuffer(EndOfInputChannelStateEvent.INSTANCE, false));
         bufferManager.releaseFloatingBuffers();
         LOG.debug("{}/{} finished recovering input.", inputGate.getOwningTaskName(), channelInfo);
     }
@@ -172,7 +173,7 @@
 
         if (next == null) {
             return null;
-        } else if (isEndOfChannelStateEvent(next)) {
+        } else if (isEndOfInputChannelStateEvent(next)) {
             stateConsumedFuture.complete(null);
             return null;
         } else {
@@ -180,14 +181,14 @@
         }
     }
 
-    private boolean isEndOfChannelStateEvent(Buffer buffer) throws IOException {
+    private boolean isEndOfInputChannelStateEvent(Buffer buffer) throws IOException {
         if (buffer.isBuffer()) {
             return false;
         }
 
         AbstractEvent event = EventSerializer.fromBuffer(buffer, getClass().getClassLoader());
         buffer.setReaderIndex(0);
-        return event.getClass() == EndOfChannelStateEvent.class;
+        return event.getClass() == EndOfInputChannelStateEvent.class;
     }
 
     @Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/AbstractStreamTaskNetworkInput.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/AbstractStreamTaskNetworkInput.java
index c076f5b..46be9e6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/AbstractStreamTaskNetworkInput.java
+++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/AbstractStreamTaskNetworkInput.java
@@ -26,7 +26,7 @@
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.EndOfChannelStateEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.EndOfOutputChannelStateEvent;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
 import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate;
@@ -270,7 +270,7 @@
             if (checkpointedInputGate.isFinished()) {
                 return DataInputStatus.END_OF_INPUT;
             }
-        } else if (event.getClass() == EndOfChannelStateEvent.class) {
+        } else if (event.getClass() == EndOfOutputChannelStateEvent.class) {
             if (checkpointedInputGate.allChannelsRecovered()) {
                 return DataInputStatus.END_OF_RECOVERY;
             }
diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointedInputGate.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointedInputGate.java
index f3075d3..ad0dcb5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointedInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointedInputGate.java
@@ -29,7 +29,7 @@
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.EventAnnouncement;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.EndOfChannelStateEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.EndOfOutputChannelStateEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput;
@@ -200,7 +200,7 @@
                     announcedBarrier,
                     eventAnnouncement.getSequenceNumber(),
                     bufferOrEvent.getChannelInfo());
-        } else if (bufferOrEvent.getEvent().getClass() == EndOfChannelStateEvent.class) {
+        } else if (bufferOrEvent.getEvent().getClass() == EndOfOutputChannelStateEvent.class) {
             upstreamRecoveryTracker.handleEndOfRecovery(bufferOrEvent.getChannelInfo());
         }
         return Optional.of(bufferOrEvent);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
index ab98e82..25a93b3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
@@ -38,6 +38,8 @@
 import org.apache.flink.runtime.io.network.api.SubtaskConnectionDescriptor;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
+import org.apache.flink.runtime.io.network.partition.consumer.EndOfInputChannelStateEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.EndOfOutputChannelStateEvent;
 import org.apache.flink.runtime.io.network.util.TestTaskEvent;
 import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 
@@ -120,6 +122,9 @@
         new RecoveryMetadata(3),
         new WatermarkEvent(new LongWatermark(42L, "test"), false),
         new WatermarkEvent(new BoolWatermark(true, "test"), true),
+        new WatermarkEvent(new BoolWatermark(true, "test"), true),
+        EndOfInputChannelStateEvent.INSTANCE,
+        EndOfOutputChannelStateEvent.INSTANCE,
     };
 
     @Test
@@ -161,6 +166,9 @@
                     assertThat(bufferConsumer.build().getDataType())
                             .isEqualTo(Buffer.DataType.UNALIGNED_WATERMARK_EVENT);
                 }
+            } else if (evt instanceof EndOfOutputChannelStateEvent) {
+                assertThat(bufferConsumer.build().getDataType())
+                        .isEqualTo(Buffer.DataType.RECOVERY_COMPLETION);
             } else {
                 assertThat(bufferConsumer.build().getDataType())
                         .isEqualTo(Buffer.DataType.EVENT_BUFFER);
@@ -191,6 +199,8 @@
                     assertThat(buffer.getDataType())
                             .isEqualTo(Buffer.DataType.UNALIGNED_WATERMARK_EVENT);
                 }
+            } else if (evt instanceof EndOfOutputChannelStateEvent) {
+                assertThat(buffer.getDataType()).isEqualTo(Buffer.DataType.RECOVERY_COMPLETION);
             } else {
                 assertThat(buffer.getDataType()).isEqualTo(Buffer.DataType.EVENT_BUFFER);
             }
diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointedInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointedInputGateTest.java
index 0ddaf07..bb60d3b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointedInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointedInputGateTest.java
@@ -37,7 +37,7 @@
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.EndOfChannelStateEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.EndOfOutputChannelStateEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
 import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
@@ -86,7 +86,7 @@
                 enqueueEndOfState(gate, channelIndex);
                 Optional<BufferOrEvent> bufferOrEvent = gate.pollNext();
                 while (bufferOrEvent.isPresent()
-                        && bufferOrEvent.get().getEvent() instanceof EndOfChannelStateEvent
+                        && bufferOrEvent.get().getEvent() instanceof EndOfOutputChannelStateEvent
                         && !gate.allChannelsRecovered()) {
                     bufferOrEvent = gate.pollNext();
                 }
@@ -97,7 +97,7 @@
             Optional<BufferOrEvent> polled = gate.pollNext();
             assertThat(polled).isPresent();
             assertThat(polled.get().isEvent()).isTrue();
-            assertThat(polled.get().getEvent()).isEqualTo(EndOfChannelStateEvent.INSTANCE);
+            assertThat(polled.get().getEvent()).isEqualTo(EndOfOutputChannelStateEvent.INSTANCE);
             assertThat(resumeCounter.getNumResumed()).isEqualTo(numberOfChannels);
             assertThat(gate.pollNext())
                     .as("should only be a single event no matter of what is the number of channels")
@@ -282,7 +282,7 @@
 
     private void enqueueEndOfState(CheckpointedInputGate checkpointedInputGate, int channelIndex)
             throws IOException {
-        enqueue(checkpointedInputGate, channelIndex, EndOfChannelStateEvent.INSTANCE);
+        enqueue(checkpointedInputGate, channelIndex, EndOfOutputChannelStateEvent.INSTANCE);
     }
 
     private void enqueueEndOfPartition(
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
index f365b3d..c9cc91b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
@@ -36,7 +36,7 @@
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.CheckpointableInput;
-import org.apache.flink.runtime.io.network.partition.consumer.EndOfChannelStateEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.EndOfOutputChannelStateEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleInputGate;
 import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
@@ -229,9 +229,9 @@
 
         inputGate.sendElement(new StreamRecord<>(42L), 0);
         assertThat(input.emitNext(output)).isEqualTo(DataInputStatus.MORE_AVAILABLE);
-        inputGate.sendEvent(EndOfChannelStateEvent.INSTANCE, 0);
+        inputGate.sendEvent(EndOfOutputChannelStateEvent.INSTANCE, 0);
         assertThat(input.emitNext(output)).isEqualTo(DataInputStatus.MORE_AVAILABLE);
-        inputGate.sendEvent(EndOfChannelStateEvent.INSTANCE, 1);
+        inputGate.sendEvent(EndOfOutputChannelStateEvent.INSTANCE, 1);
         assertThat(input.emitNext(output)).isEqualTo(DataInputStatus.END_OF_RECOVERY);
     }