[HUDI-7132] Data may be lost for flink task failure (#10312)
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index ec612c6..f98f1bd 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -301,9 +301,7 @@
@Override
public void subtaskFailed(int i, @Nullable Throwable throwable) {
- // reset the event
- this.eventBuffer[i] = null;
- LOG.warn("Reset the event for task [" + i + "]", throwable);
+ // no operation
}
@Override
@@ -378,7 +376,8 @@
}
private void addEventToBuffer(WriteMetadataEvent event) {
- if (this.eventBuffer[event.getTaskID()] != null) {
+ if (this.eventBuffer[event.getTaskID()] != null
+ && this.eventBuffer[event.getTaskID()].getInstantTime().equals(event.getInstantTime())) {
this.eventBuffer[event.getTaskID()].mergeWith(event);
} else {
this.eventBuffer[event.getTaskID()] = event;
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
index 5cbe989..e43463e 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
@@ -148,6 +148,35 @@
}
@Test
+ public void testEventReset() {
+ CompletableFuture<byte[]> future = new CompletableFuture<>();
+ coordinator.checkpointCoordinator(1, future);
+ OperatorEvent event1 = WriteMetadataEvent.builder()
+ .taskID(0)
+ .instantTime("001")
+ .writeStatus(Collections.emptyList())
+ .build();
+ coordinator.handleEventFromOperator(0, event1);
+ coordinator.subtaskFailed(0, null);
+ assertNotNull(coordinator.getEventBuffer()[0], "Events should not be cleared by subTask failure");
+
+ OperatorEvent event2 = createOperatorEvent(0, "001", "par1", false, 0.1);
+ coordinator.handleEventFromOperator(0, event2);
+ coordinator.subtaskFailed(0, null);
+ assertNotNull(coordinator.getEventBuffer()[0], "Events should not be cleared by subTask failure");
+
+ OperatorEvent event3 = createOperatorEvent(0, "001", "par1", false, 0.1);
+ coordinator.handleEventFromOperator(0, event3);
+ assertThat("Multiple events of same instant should be merged",
+ coordinator.getEventBuffer()[0].getWriteStatuses().size(), is(2));
+
+ OperatorEvent event4 = createOperatorEvent(0, "002", "par1", false, 0.1);
+ coordinator.handleEventFromOperator(0, event4);
+ assertThat("The new event should override the old event",
+ coordinator.getEventBuffer()[0].getWriteStatuses().size(), is(1));
+ }
+
+ @Test
public void testCheckpointCompleteWithPartialEvents() {
final CompletableFuture<byte[]> future = new CompletableFuture<>();
coordinator.checkpointCoordinator(1, future);