[FLINK-38216][checkpoint][refactor] Split EndOfChannelStateEvent into EndOfInputChannelStateEvent and EndOfOutputChannelStateEvent
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
index f30e8e2..e73d916 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
@@ -43,7 +43,8 @@
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
-import org.apache.flink.runtime.io.network.partition.consumer.EndOfChannelStateEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.EndOfInputChannelStateEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.EndOfOutputChannelStateEvent;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.util.InstantiationUtil;
@@ -70,7 +71,7 @@
private static final int CANCEL_CHECKPOINT_MARKER_EVENT = 4;
- private static final int END_OF_CHANNEL_STATE_EVENT = 5;
+ private static final int END_OF_OUTPUT_CHANNEL_STATE_EVENT = 5;
private static final int ANNOUNCEMENT_EVENT = 6;
@@ -84,6 +85,8 @@
private static final int GENERALIZED_WATERMARK_EVENT = 11;
+ private static final int END_OF_INPUT_CHANNEL_STATE_EVENT = 12;
+
private static final byte CHECKPOINT_TYPE_CHECKPOINT = 0;
private static final byte CHECKPOINT_TYPE_SAVEPOINT = 1;
@@ -109,8 +112,10 @@
return serializeCheckpointBarrier((CheckpointBarrier) event);
} else if (eventClass == EndOfSuperstepEvent.class) {
return ByteBuffer.wrap(new byte[] {0, 0, 0, END_OF_SUPERSTEP_EVENT});
- } else if (eventClass == EndOfChannelStateEvent.class) {
- return ByteBuffer.wrap(new byte[] {0, 0, 0, END_OF_CHANNEL_STATE_EVENT});
+ } else if (eventClass == EndOfOutputChannelStateEvent.class) {
+ return ByteBuffer.wrap(new byte[] {0, 0, 0, END_OF_OUTPUT_CHANNEL_STATE_EVENT});
+ } else if (eventClass == EndOfInputChannelStateEvent.class) {
+ return ByteBuffer.wrap(new byte[] {0, 0, 0, END_OF_INPUT_CHANNEL_STATE_EVENT});
} else if (eventClass == EndOfData.class) {
return ByteBuffer.wrap(
new byte[] {
@@ -197,8 +202,10 @@
return deserializeCheckpointBarrier(buffer);
} else if (type == END_OF_SUPERSTEP_EVENT) {
return EndOfSuperstepEvent.INSTANCE;
- } else if (type == END_OF_CHANNEL_STATE_EVENT) {
- return EndOfChannelStateEvent.INSTANCE;
+ } else if (type == END_OF_OUTPUT_CHANNEL_STATE_EVENT) {
+ return EndOfOutputChannelStateEvent.INSTANCE;
+ } else if (type == END_OF_INPUT_CHANNEL_STATE_EVENT) {
+ return EndOfInputChannelStateEvent.INSTANCE;
} else if (type == END_OF_USER_RECORDS_EVENT) {
return new EndOfData(StopMode.values()[buffer.get()]);
} else if (type == CANCEL_CHECKPOINT_MARKER_EVENT) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
index 6c9ae40..38c0641 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
@@ -24,7 +24,7 @@
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfData;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.EndOfChannelStateEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.EndOfOutputChannelStateEvent;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
@@ -408,7 +408,7 @@
public static DataType getDataType(AbstractEvent event, boolean hasPriority) {
if (hasPriority) {
return PRIORITIZED_EVENT_BUFFER;
- } else if (event instanceof EndOfChannelStateEvent) {
+ } else if (event instanceof EndOfOutputChannelStateEvent) {
return RECOVERY_COMPLETION;
} else if (event instanceof EndOfData) {
return END_OF_DATA;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedResultPartition.java
index 268f34c..0060bf7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedResultPartition.java
@@ -28,7 +28,7 @@
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.logger.NetworkActionsLogger;
-import org.apache.flink.runtime.io.network.partition.consumer.EndOfChannelStateEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.EndOfOutputChannelStateEvent;
import org.apache.flink.util.function.SupplierWithException;
import javax.annotation.Nullable;
@@ -262,7 +262,7 @@
return;
}
try (BufferConsumer eventBufferConsumer =
- EventSerializer.toBufferConsumer(EndOfChannelStateEvent.INSTANCE, false)) {
+ EventSerializer.toBufferConsumer(EndOfOutputChannelStateEvent.INSTANCE, false)) {
for (int i = 0; i < subpartitions.length; i++) {
if (((PipelinedSubpartition) subpartitions[i]).isSupportChannelStateRecover()) {
addToSubpartition(i, eventBufferConsumer.copy(), 0);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/EndOfChannelStateEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/EndOfInputChannelStateEvent.java
similarity index 77%
copy from flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/EndOfChannelStateEvent.java
copy to flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/EndOfInputChannelStateEvent.java
index ea0f71a..e9f39f7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/EndOfChannelStateEvent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/EndOfInputChannelStateEvent.java
@@ -22,20 +22,16 @@
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.event.RuntimeEvent;
-/**
- * Marks the end of recovered state of {@link RecoveredInputChannel} of this subtask or {@link
- * org.apache.flink.runtime.io.network.partition.ResultSubpartition ResultSubpartition} on the
- * upstream.
- */
-public class EndOfChannelStateEvent extends RuntimeEvent {
+/** Marks the end of recovered state of {@link RecoveredInputChannel} of this subtask. */
+public class EndOfInputChannelStateEvent extends RuntimeEvent {
/** The singleton instance of this event. */
- public static final EndOfChannelStateEvent INSTANCE = new EndOfChannelStateEvent();
+ public static final EndOfInputChannelStateEvent INSTANCE = new EndOfInputChannelStateEvent();
// ------------------------------------------------------------------------
// not instantiable
- private EndOfChannelStateEvent() {}
+ private EndOfInputChannelStateEvent() {}
// ------------------------------------------------------------------------
@@ -53,12 +49,12 @@
@Override
public int hashCode() {
- return 1965146670;
+ return 20250813;
}
@Override
public boolean equals(Object obj) {
- return obj != null && obj.getClass() == EndOfChannelStateEvent.class;
+ return obj != null && obj.getClass() == EndOfInputChannelStateEvent.class;
}
@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/EndOfChannelStateEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/EndOfOutputChannelStateEvent.java
similarity index 83%
rename from flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/EndOfChannelStateEvent.java
rename to flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/EndOfOutputChannelStateEvent.java
index ea0f71a..fd299e2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/EndOfChannelStateEvent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/EndOfOutputChannelStateEvent.java
@@ -23,19 +23,19 @@
import org.apache.flink.runtime.event.RuntimeEvent;
/**
- * Marks the end of recovered state of {@link RecoveredInputChannel} of this subtask or {@link
+ * Marks the end of recovered state of {@link
* org.apache.flink.runtime.io.network.partition.ResultSubpartition ResultSubpartition} on the
* upstream.
*/
-public class EndOfChannelStateEvent extends RuntimeEvent {
+public class EndOfOutputChannelStateEvent extends RuntimeEvent {
/** The singleton instance of this event. */
- public static final EndOfChannelStateEvent INSTANCE = new EndOfChannelStateEvent();
+ public static final EndOfOutputChannelStateEvent INSTANCE = new EndOfOutputChannelStateEvent();
// ------------------------------------------------------------------------
// not instantiable
- private EndOfChannelStateEvent() {}
+ private EndOfOutputChannelStateEvent() {}
// ------------------------------------------------------------------------
@@ -58,7 +58,7 @@
@Override
public boolean equals(Object obj) {
- return obj != null && obj.getClass() == EndOfChannelStateEvent.class;
+ return obj != null && obj.getClass() == EndOfOutputChannelStateEvent.class;
}
@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java
index 1f41a09..1d31a65 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java
@@ -154,7 +154,8 @@
}
public void finishReadRecoveredState() throws IOException {
- onRecoveredStateBuffer(EventSerializer.toBuffer(EndOfChannelStateEvent.INSTANCE, false));
+ onRecoveredStateBuffer(
+ EventSerializer.toBuffer(EndOfInputChannelStateEvent.INSTANCE, false));
bufferManager.releaseFloatingBuffers();
LOG.debug("{}/{} finished recovering input.", inputGate.getOwningTaskName(), channelInfo);
}
@@ -172,7 +173,7 @@
if (next == null) {
return null;
- } else if (isEndOfChannelStateEvent(next)) {
+ } else if (isEndOfInputChannelStateEvent(next)) {
stateConsumedFuture.complete(null);
return null;
} else {
@@ -180,14 +181,14 @@
}
}
- private boolean isEndOfChannelStateEvent(Buffer buffer) throws IOException {
+ private boolean isEndOfInputChannelStateEvent(Buffer buffer) throws IOException {
if (buffer.isBuffer()) {
return false;
}
AbstractEvent event = EventSerializer.fromBuffer(buffer, getClass().getClassLoader());
buffer.setReaderIndex(0);
- return event.getClass() == EndOfChannelStateEvent.class;
+ return event.getClass() == EndOfInputChannelStateEvent.class;
}
@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/AbstractStreamTaskNetworkInput.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/AbstractStreamTaskNetworkInput.java
index c076f5b..46be9e6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/AbstractStreamTaskNetworkInput.java
+++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/AbstractStreamTaskNetworkInput.java
@@ -26,7 +26,7 @@
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.EndOfChannelStateEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.EndOfOutputChannelStateEvent;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate;
@@ -270,7 +270,7 @@
if (checkpointedInputGate.isFinished()) {
return DataInputStatus.END_OF_INPUT;
}
- } else if (event.getClass() == EndOfChannelStateEvent.class) {
+ } else if (event.getClass() == EndOfOutputChannelStateEvent.class) {
if (checkpointedInputGate.allChannelsRecovered()) {
return DataInputStatus.END_OF_RECOVERY;
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointedInputGate.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointedInputGate.java
index f3075d3..ad0dcb5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointedInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointedInputGate.java
@@ -29,7 +29,7 @@
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.EventAnnouncement;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.EndOfChannelStateEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.EndOfOutputChannelStateEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput;
@@ -200,7 +200,7 @@
announcedBarrier,
eventAnnouncement.getSequenceNumber(),
bufferOrEvent.getChannelInfo());
- } else if (bufferOrEvent.getEvent().getClass() == EndOfChannelStateEvent.class) {
+ } else if (bufferOrEvent.getEvent().getClass() == EndOfOutputChannelStateEvent.class) {
upstreamRecoveryTracker.handleEndOfRecovery(bufferOrEvent.getChannelInfo());
}
return Optional.of(bufferOrEvent);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
index ab98e82..25a93b3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
@@ -38,6 +38,8 @@
import org.apache.flink.runtime.io.network.api.SubtaskConnectionDescriptor;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
+import org.apache.flink.runtime.io.network.partition.consumer.EndOfInputChannelStateEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.EndOfOutputChannelStateEvent;
import org.apache.flink.runtime.io.network.util.TestTaskEvent;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
@@ -120,6 +122,9 @@
new RecoveryMetadata(3),
new WatermarkEvent(new LongWatermark(42L, "test"), false),
new WatermarkEvent(new BoolWatermark(true, "test"), true),
+ new WatermarkEvent(new BoolWatermark(true, "test"), true),
+ EndOfInputChannelStateEvent.INSTANCE,
+ EndOfOutputChannelStateEvent.INSTANCE,
};
@Test
@@ -161,6 +166,9 @@
assertThat(bufferConsumer.build().getDataType())
.isEqualTo(Buffer.DataType.UNALIGNED_WATERMARK_EVENT);
}
+ } else if (evt instanceof EndOfOutputChannelStateEvent) {
+ assertThat(bufferConsumer.build().getDataType())
+ .isEqualTo(Buffer.DataType.RECOVERY_COMPLETION);
} else {
assertThat(bufferConsumer.build().getDataType())
.isEqualTo(Buffer.DataType.EVENT_BUFFER);
@@ -191,6 +199,8 @@
assertThat(buffer.getDataType())
.isEqualTo(Buffer.DataType.UNALIGNED_WATERMARK_EVENT);
}
+ } else if (evt instanceof EndOfOutputChannelStateEvent) {
+ assertThat(buffer.getDataType()).isEqualTo(Buffer.DataType.RECOVERY_COMPLETION);
} else {
assertThat(buffer.getDataType()).isEqualTo(Buffer.DataType.EVENT_BUFFER);
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointedInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointedInputGateTest.java
index 0ddaf07..bb60d3b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointedInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointedInputGateTest.java
@@ -37,7 +37,7 @@
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.EndOfChannelStateEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.EndOfOutputChannelStateEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
@@ -86,7 +86,7 @@
enqueueEndOfState(gate, channelIndex);
Optional<BufferOrEvent> bufferOrEvent = gate.pollNext();
while (bufferOrEvent.isPresent()
- && bufferOrEvent.get().getEvent() instanceof EndOfChannelStateEvent
+ && bufferOrEvent.get().getEvent() instanceof EndOfOutputChannelStateEvent
&& !gate.allChannelsRecovered()) {
bufferOrEvent = gate.pollNext();
}
@@ -97,7 +97,7 @@
Optional<BufferOrEvent> polled = gate.pollNext();
assertThat(polled).isPresent();
assertThat(polled.get().isEvent()).isTrue();
- assertThat(polled.get().getEvent()).isEqualTo(EndOfChannelStateEvent.INSTANCE);
+ assertThat(polled.get().getEvent()).isEqualTo(EndOfOutputChannelStateEvent.INSTANCE);
assertThat(resumeCounter.getNumResumed()).isEqualTo(numberOfChannels);
assertThat(gate.pollNext())
.as("should only be a single event no matter of what is the number of channels")
@@ -282,7 +282,7 @@
private void enqueueEndOfState(CheckpointedInputGate checkpointedInputGate, int channelIndex)
throws IOException {
- enqueue(checkpointedInputGate, channelIndex, EndOfChannelStateEvent.INSTANCE);
+ enqueue(checkpointedInputGate, channelIndex, EndOfOutputChannelStateEvent.INSTANCE);
}
private void enqueueEndOfPartition(
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
index f365b3d..c9cc91b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
@@ -36,7 +36,7 @@
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.CheckpointableInput;
-import org.apache.flink.runtime.io.network.partition.consumer.EndOfChannelStateEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.EndOfOutputChannelStateEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleInputGate;
import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
@@ -229,9 +229,9 @@
inputGate.sendElement(new StreamRecord<>(42L), 0);
assertThat(input.emitNext(output)).isEqualTo(DataInputStatus.MORE_AVAILABLE);
- inputGate.sendEvent(EndOfChannelStateEvent.INSTANCE, 0);
+ inputGate.sendEvent(EndOfOutputChannelStateEvent.INSTANCE, 0);
assertThat(input.emitNext(output)).isEqualTo(DataInputStatus.MORE_AVAILABLE);
- inputGate.sendEvent(EndOfChannelStateEvent.INSTANCE, 1);
+ inputGate.sendEvent(EndOfOutputChannelStateEvent.INSTANCE, 1);
assertThat(input.emitNext(output)).isEqualTo(DataInputStatus.END_OF_RECOVERY);
}