[HUDI-7132] Data may be lost for flink task failure (#10312)

diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index ec612c6..f98f1bd 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -301,9 +301,7 @@
 
   @Override
   public void subtaskFailed(int i, @Nullable Throwable throwable) {
-    // reset the event
-    this.eventBuffer[i] = null;
-    LOG.warn("Reset the event for task [" + i + "]", throwable);
+    // no operation
   }
 
   @Override
@@ -378,7 +376,8 @@
   }
 
   private void addEventToBuffer(WriteMetadataEvent event) {
-    if (this.eventBuffer[event.getTaskID()] != null) {
+    if (this.eventBuffer[event.getTaskID()] != null
+        && this.eventBuffer[event.getTaskID()].getInstantTime().equals(event.getInstantTime())) {
       this.eventBuffer[event.getTaskID()].mergeWith(event);
     } else {
       this.eventBuffer[event.getTaskID()] = event;
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
index 5cbe989..e43463e 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
@@ -148,6 +148,35 @@
   }
 
   @Test
+  public void testEventReset() {
+    CompletableFuture<byte[]> future = new CompletableFuture<>();
+    coordinator.checkpointCoordinator(1, future);
+    OperatorEvent event1 = WriteMetadataEvent.builder()
+        .taskID(0)
+        .instantTime("001")
+        .writeStatus(Collections.emptyList())
+        .build();
+    coordinator.handleEventFromOperator(0, event1);
+    coordinator.subtaskFailed(0, null);
+    assertNotNull(coordinator.getEventBuffer()[0], "Events should not be cleared by subTask failure");
+
+    OperatorEvent event2 = createOperatorEvent(0, "001", "par1", false, 0.1);
+    coordinator.handleEventFromOperator(0, event2);
+    coordinator.subtaskFailed(0, null);
+    assertNotNull(coordinator.getEventBuffer()[0], "Events should not be cleared by subTask failure");
+
+    OperatorEvent event3 = createOperatorEvent(0, "001", "par1", false, 0.1);
+    coordinator.handleEventFromOperator(0, event3);
+    assertThat("Multiple events of same instant should be merged",
+        coordinator.getEventBuffer()[0].getWriteStatuses().size(), is(2));
+
+    OperatorEvent event4 = createOperatorEvent(0, "002", "par1", false, 0.1);
+    coordinator.handleEventFromOperator(0, event4);
+    assertThat("The new event should override the old event",
+        coordinator.getEventBuffer()[0].getWriteStatuses().size(), is(1));
+  }
+
+  @Test
   public void testCheckpointCompleteWithPartialEvents() {
     final CompletableFuture<byte[]> future = new CompletableFuture<>();
     coordinator.checkpointCoordinator(1, future);