[HUDI-6962] Fix the conflicts resolution for bulk insert under NB-CC (#9896)

* Flink bulk_insert with fixed file group id suffix if NB-CC is enabled;
* The bulk_insert writer should resolve conflicts with other writers under OCC strategies.
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java
index 15f6be8..1bea517 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java
@@ -21,6 +21,7 @@
 import org.apache.hudi.client.transaction.ConcurrentOperation;
 import org.apache.hudi.client.transaction.ConflictResolutionStrategy;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -67,8 +68,8 @@
       Option<HoodieInstant> lastCompletedTxnOwnerInstant,
       boolean reloadActiveTimeline,
       Set<String> pendingInstants) throws HoodieWriteConflictException {
-    // Skip to resolve conflict if using non-blocking concurrency control
-    if (config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl() && !config.isNonBlockingConcurrencyControl()) {
+    WriteOperationType operationType = thisCommitMetadata.map(HoodieCommitMetadata::getOperationType).orElse(null);
+    if (config.needResolveWriteConflict(operationType)) {
       // deal with pendingInstants
       Stream<HoodieInstant> completedInstantsDuringCurrentWriteOperation = getCompletedInstantsDuringCurrentWriteOperation(table.getMetaClient(), pendingInstants);
 
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index c9e9b94..8c08bea 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -46,6 +46,7 @@
 import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
 import org.apache.hudi.common.model.RecordPayloadType;
 import org.apache.hudi.common.model.WriteConcurrencyMode;
+import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import org.apache.hudi.common.table.marker.MarkerType;
@@ -2616,6 +2617,16 @@
     return props.getInteger(WRITES_FILEID_ENCODING, HoodieMetadataPayload.RECORD_INDEX_FIELD_FILEID_ENCODING_UUID);
   }
 
+  public boolean needResolveWriteConflict(WriteOperationType operationType) {
+    if (getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
+      // NB-CC don't need to resolve write conflict except bulk insert operation
+      return WriteOperationType.BULK_INSERT == operationType || !isNonBlockingConcurrencyControl();
+    } else {
+      // SINGLE_WRITER case don't need to resolve write conflict
+      return false;
+    }
+  }
+
   public boolean isNonBlockingConcurrencyControl() {
     return getTableType().equals(HoodieTableType.MERGE_ON_READ)
         && getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index e412678..a52e547 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -302,8 +302,8 @@
    * Refresh the last transaction metadata,
    * should be called before the Driver starts a new transaction.
    */
-  public void preTxn(HoodieTableMetaClient metaClient) {
-    if (txnManager.isLockRequired() && !config.isNonBlockingConcurrencyControl()) {
+  public void preTxn(WriteOperationType operationType, HoodieTableMetaClient metaClient) {
+    if (txnManager.isLockRequired() && config.needResolveWriteConflict(operationType)) {
       // refresh the meta client which is reused
       metaClient.reloadActiveTimeline();
       this.lastCompletedTxnAndMetadata = TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient);
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index 34fb5cf..860dffe 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -387,7 +387,7 @@
 
   private void startInstant() {
     // refresh the last txn metadata
-    this.writeClient.preTxn(this.metaClient);
+    this.writeClient.preTxn(tableState.operationType, this.metaClient);
     // put the assignment in front of metadata generation,
     // because the instant request from write task is asynchronous.
     this.instant = this.writeClient.startCommit(tableState.commitAction, this.metaClient);
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
index fcf38e1..c25a69d 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
@@ -92,16 +92,22 @@
     return new SortOperatorGen(rowType, new String[] {FILE_GROUP_META_FIELD});
   }
 
-  private static String getFileId(Map<String, String> bucketIdToFileId, RowDataKeyGen keyGen, RowData record, String indexKeys, int numBuckets) {
+  private static String getFileId(Map<String, String> bucketIdToFileId, RowDataKeyGen keyGen, RowData record, String indexKeys, int numBuckets, boolean needFixedFileIdSuffix) {
     String recordKey = keyGen.getRecordKey(record);
     String partition = keyGen.getPartitionPath(record);
     final int bucketNum = BucketIdentifier.getBucketId(recordKey, indexKeys, numBuckets);
     String bucketId = partition + bucketNum;
-    return bucketIdToFileId.computeIfAbsent(bucketId, k -> BucketIdentifier.newBucketFileIdPrefix(bucketNum));
+    return bucketIdToFileId.computeIfAbsent(bucketId, k -> {
+      if (needFixedFileIdSuffix) {
+        return BucketIdentifier.newBucketFileIdFixedSuffix(bucketNum);
+      } else {
+        return BucketIdentifier.newBucketFileIdPrefix(bucketNum);
+      }
+    });
   }
 
-  public static RowData rowWithFileId(Map<String, String> bucketIdToFileId, RowDataKeyGen keyGen, RowData record, String indexKeys, int numBuckets) {
-    final String fileId = getFileId(bucketIdToFileId, keyGen, record, indexKeys, numBuckets);
+  public static RowData rowWithFileId(Map<String, String> bucketIdToFileId, RowDataKeyGen keyGen, RowData record, String indexKeys, int numBuckets, boolean needFixedFileIdSuffix) {
+    final String fileId = getFileId(bucketIdToFileId, keyGen, record, indexKeys, numBuckets, needFixedFileIdSuffix);
     return GenericRowData.of(StringData.fromString(fileId), record);
   }
 
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java
index d44ef25..dfd8e0e 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java
@@ -117,11 +117,11 @@
     this.ckpMetadata = CkpMetadataFactory.getCkpMetadata(writeClient.getConfig(), config);
     this.initInstant = lastPendingInstant();
     sendBootstrapEvent();
-    initWriterHelper();
   }
 
   @Override
   public void processElement(I value, Context ctx, Collector<Object> out) throws IOException {
+    initWriterHelperIfNeeded();
     this.writerHelper.write((RowData) value);
   }
 
@@ -136,6 +136,7 @@
    * End input action for batch source.
    */
   public void endInput() {
+    initWriterHelperIfNeeded();
     final List<WriteStatus> writeStatus = this.writerHelper.getWriteStatuses(this.taskID);
 
     final WriteMetadataEvent event = WriteMetadataEvent.builder()
@@ -165,11 +166,13 @@
   //  Utilities
   // -------------------------------------------------------------------------
 
-  private void initWriterHelper() {
-    String instant = instantToWrite();
-    this.writerHelper = WriterHelpers.getWriterHelper(this.config, this.writeClient.getHoodieTable(), this.writeClient.getConfig(),
-        instant, this.taskID, getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getAttemptNumber(),
-        this.rowType);
+  private void initWriterHelperIfNeeded() {
+    if (writerHelper == null) {
+      String instant = instantToWrite();
+      this.writerHelper = WriterHelpers.getWriterHelper(this.config, this.writeClient.getHoodieTable(), this.writeClient.getConfig(),
+          instant, this.taskID, getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getAttemptNumber(),
+          this.rowType);
+    }
   }
 
   private void sendBootstrapEvent() {
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
index cb9344f..e66009a 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
@@ -124,10 +124,11 @@
       RowDataKeyGen keyGen = RowDataKeyGen.instance(conf, rowType);
       RowType rowTypeWithFileId = BucketBulkInsertWriterHelper.rowTypeWithFileId(rowType);
       InternalTypeInfo<RowData> typeInfo = InternalTypeInfo.of(rowTypeWithFileId);
+      boolean needFixedFileIdSuffix = OptionsResolver.isNonBlockingConcurrencyControl(conf);
 
       Map<String, String> bucketIdToFileId = new HashMap<>();
       dataStream = dataStream.partitionCustom(partitioner, keyGen::getHoodieKey)
-          .map(record -> BucketBulkInsertWriterHelper.rowWithFileId(bucketIdToFileId, keyGen, record, indexKeys, numBuckets), typeInfo)
+          .map(record -> BucketBulkInsertWriterHelper.rowWithFileId(bucketIdToFileId, keyGen, record, indexKeys, numBuckets, needFixedFileIdSuffix), typeInfo)
           .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); // same parallelism as write task to avoid shuffle
       if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT)) {
         SortOperatorGen sortOperatorGen = BucketBulkInsertWriterHelper.getFileIdSorterGen(rowTypeWithFileId);
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
index 79320e1..816c9a4 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
@@ -25,6 +25,7 @@
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.exception.HoodieWriteConflictException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.util.FlinkWriteClients;
 import org.apache.hudi.utils.TestData;
@@ -228,6 +229,121 @@
     TestData.checkWrittenData(tempFile, readOptimizedResult, 1);
   }
 
+  // case1: txn1 is upsert writer, txn2 is bulk_insert writer.
+  //      |----------- txn1 -----------|
+  //                       |----- txn2 ------|
+  // the txn2 would fail to commit caused by conflict
+  @Test
+  public void testBulkInsertInMultiWriter() throws Exception {
+    conf.setString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name());
+    conf.setString(FlinkOptions.INDEX_TYPE, HoodieIndex.IndexType.BUCKET.name());
+    conf.setString(FlinkOptions.PAYLOAD_CLASS_NAME, PartialUpdateAvroPayload.class.getName());
+    // disable schedule compaction in writers
+    conf.setBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED, false);
+    conf.setBoolean(FlinkOptions.PRE_COMBINE, true);
+
+    // start pipeline1 and insert record: [id1,Danny,null,1,par1], suspend the tx commit
+    List<RowData> dataset1 = Collections.singletonList(
+        insertRow(
+            StringData.fromString("id1"), StringData.fromString("Danny"), null,
+            TimestampData.fromEpochMillis(1), StringData.fromString("par1")));
+    TestHarness pipeline1 = preparePipeline(conf)
+        .consume(dataset1)
+        .assertEmptyDataFiles();
+
+    // start pipeline2 and bulk insert record: [id1,null,23,1,par1], suspend the tx commit
+    Configuration conf2 = conf.clone();
+    conf2.setString(FlinkOptions.OPERATION, "BULK_INSERT");
+    conf2.setString(FlinkOptions.WRITE_CLIENT_ID, "2");
+    List<RowData> dataset2 = Collections.singletonList(
+        insertRow(
+            StringData.fromString("id1"), null, 23,
+            TimestampData.fromEpochMillis(2), StringData.fromString("par1")));
+    TestHarness pipeline2 = preparePipeline(conf2)
+        .consume(dataset2);
+
+    // step to commit the 1st txn
+    pipeline1.checkpoint(1)
+        .assertNextEvent()
+        .checkpointComplete(1);
+
+    // step to commit the 2nd txn, should throw exception
+    pipeline2.endInputThrows(HoodieWriteConflictException.class, "Cannot resolve conflicts");
+  }
+
+  // case1: txn1 is upsert writer, txn2 is bulk_insert writer.
+  //                       |----- txn1 ------|
+  //      |----------- txn2 -----------|
+  // both two txn would success to commit
+  @Test
+  public void testBulkInsertInSequence() throws Exception {
+    conf.setString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name());
+    conf.setString(FlinkOptions.INDEX_TYPE, HoodieIndex.IndexType.BUCKET.name());
+    conf.setString(FlinkOptions.PAYLOAD_CLASS_NAME, PartialUpdateAvroPayload.class.getName());
+    // disable schedule compaction in writers
+    conf.setBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED, false);
+    conf.setBoolean(FlinkOptions.PRE_COMBINE, true);
+
+    Configuration conf1 = conf.clone();
+    conf1.setString(FlinkOptions.OPERATION, "BULK_INSERT");
+    // start pipeline1 and bulk insert record: [id1,Danny,null,1,par1], suspend the tx commit
+    List<RowData> dataset1 = Collections.singletonList(
+        insertRow(
+            StringData.fromString("id1"), StringData.fromString("Danny"), null,
+            TimestampData.fromEpochMillis(1), StringData.fromString("par1")));
+    TestHarness pipeline1 = preparePipeline(conf1)
+        .consume(dataset1);
+
+    // start pipeline2 and insert record: [id1,null,23,2,par1], suspend the tx commit
+    Configuration conf2 = conf.clone();
+    conf2.setString(FlinkOptions.WRITE_CLIENT_ID, "2");
+    List<RowData> dataset2 = Collections.singletonList(
+        insertRow(
+            StringData.fromString("id1"), null, 23,
+            TimestampData.fromEpochMillis(2), StringData.fromString("par1")));
+    TestHarness pipeline2 = preparePipeline(conf2)
+        .consume(dataset2);
+
+    // step to commit the 1st txn
+    pipeline1.endInput();
+
+    // step to commit the 2nd data
+    pipeline2.checkpoint(1)
+        .assertNextEvent()
+        .checkpointComplete(1);
+
+    // snapshot result is [(id1,Danny,23,2,par1)] after two writers finish to commit
+    Map<String, String> tmpSnapshotResult = Collections.singletonMap("par1", "[id1,par1,id1,Danny,23,2,par1]");
+    pipeline2.checkWrittenData(tmpSnapshotResult, 1);
+
+    // schedule compaction outside of all writers
+    try (HoodieFlinkWriteClient writeClient = FlinkWriteClients.createWriteClient(conf)) {
+      Option<String> scheduleInstant = writeClient.scheduleCompaction(Option.empty());
+      assertNotNull(scheduleInstant.get());
+    }
+
+    // step to commit the 3rd txn
+    // it also triggers inline compactor
+    List<RowData> dataset3 = Collections.singletonList(
+        insertRow(
+            StringData.fromString("id3"), StringData.fromString("Julian"), 53,
+            TimestampData.fromEpochMillis(4), StringData.fromString("par1")));
+    pipeline2.consume(dataset3)
+        .checkpoint(2)
+        .assertNextEvent()
+        .checkpointComplete(2);
+
+    // snapshot read result is [(id1,Danny,23,2,par1), (id3,Julian,53,4,par1)] after three writers finish to commit
+    Map<String, String> finalSnapshotResult = Collections.singletonMap(
+        "par1",
+        "[id1,par1,id1,Danny,23,2,par1, id3,par1,id3,Julian,53,4,par1]");
+    pipeline2.checkWrittenData(finalSnapshotResult, 1);
+    // read optimized read result is [(id1,Danny,23,2,par1)]
+    // because the data files belongs 3rd commit is not included in the last compaction.
+    Map<String, String> readOptimizedResult = Collections.singletonMap("par1", "[id1,par1,id1,Danny,23,2,par1]");
+    TestData.checkWrittenData(tempFile, readOptimizedResult, 1);
+  }
+
   @Override
   protected HoodieTableType getTableType() {
     return HoodieTableType.MERGE_ON_READ;
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BulkInsertFunctionWrapper.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BulkInsertFunctionWrapper.java
new file mode 100644
index 0000000..92f8f6d
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BulkInsertFunctionWrapper.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.utils;
+
+import org.apache.hudi.adapter.TestStreamConfigs;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.OptionsResolver;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
+import org.apache.hudi.sink.bucket.BucketBulkInsertWriterHelper;
+import org.apache.hudi.sink.bulk.BulkInsertWriteFunction;
+import org.apache.hudi.sink.bulk.RowDataKeyGen;
+import org.apache.hudi.sink.bulk.sort.SortOperator;
+import org.apache.hudi.sink.bulk.sort.SortOperatorGen;
+import org.apache.hudi.sink.event.WriteMetadataEvent;
+import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.collect.utils.MockOperatorEventGateway;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.streaming.util.MockStreamTaskBuilder;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A wrapper class to manipulate the {@link BulkInsertWriteFunction} instance for testing.
+ *
+ * @param <I> Input type
+ */
+public class BulkInsertFunctionWrapper<I> implements TestFunctionWrapper<I> {
+  private final Configuration conf;
+  private final RowType rowType;
+  private final RowType rowTypeWithFileId;
+
+  private final IOManager ioManager;
+  private final MockStreamingRuntimeContext runtimeContext;
+  private final MockOperatorEventGateway gateway;
+  private final MockOperatorCoordinatorContext coordinatorContext;
+  private final StreamWriteOperatorCoordinator coordinator;
+  private final boolean needSortInput;
+
+  private BulkInsertWriteFunction<RowData> writeFunction;
+  private MapFunction<RowData, RowData> mapFunction;
+  private Map<String, String> bucketIdToFileId;
+  private SortOperator sortOperator;
+  private CollectorOutput<RowData> output;
+
+  public BulkInsertFunctionWrapper(String tablePath, Configuration conf) throws Exception {
+    ioManager = new IOManagerAsync();
+    MockEnvironment environment = new MockEnvironmentBuilder()
+        .setTaskName("mockTask")
+        .setManagedMemorySize(4 * MemoryManager.DEFAULT_PAGE_SIZE)
+        .setIOManager(ioManager)
+        .build();
+    this.runtimeContext = new MockStreamingRuntimeContext(false, 1, 0, environment);
+    this.gateway = new MockOperatorEventGateway();
+    this.conf = conf;
+    this.rowType = (RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf)).getLogicalType();
+    this.rowTypeWithFileId = BucketBulkInsertWriterHelper.rowTypeWithFileId(rowType);
+    this.coordinatorContext = new MockOperatorCoordinatorContext(new OperatorID(), 1);
+    this.coordinator = new StreamWriteOperatorCoordinator(conf, this.coordinatorContext);
+    this.needSortInput = conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT);
+  }
+
+  public void openFunction() throws Exception {
+    this.coordinator.start();
+    this.coordinator.setExecutor(new MockCoordinatorExecutor(coordinatorContext));
+    setupWriteFunction();
+    setupMapFunction();
+    if (needSortInput) {
+      setupSortOperator();
+    }
+  }
+
+  public void invoke(I record) throws Exception {
+    RowData recordWithFileId = mapFunction.map((RowData) record);
+    if (needSortInput) {
+      // Sort input first, trigger writeFunction at the #endInput
+      sortOperator.processElement(new StreamRecord(recordWithFileId));
+    } else {
+      writeFunction.processElement(recordWithFileId, null, null);
+    }
+  }
+
+  public WriteMetadataEvent[] getEventBuffer() {
+    return this.coordinator.getEventBuffer();
+  }
+
+  public OperatorEvent getNextEvent() {
+    return this.gateway.getNextEvent();
+  }
+
+  public void checkpointFunction(long checkpointId) {
+    // Do nothing
+  }
+
+  @Override
+  public void endInput() {
+    if (needSortInput) {
+      // sort all inputs of SortOperator and flush to WriteFunction
+      try {
+        sortOperator.endInput();
+        List<RowData> sortedRecords = output.getRecords();
+        for (RowData record : sortedRecords) {
+          writeFunction.processElement(record, null, null);
+        }
+      } catch (Exception e) {
+        throw new HoodieException(e);
+      }
+    }
+    writeFunction.endInput();
+    if (bucketIdToFileId != null) {
+      this.bucketIdToFileId.clear();
+    }
+  }
+
+  public void checkpointComplete(long checkpointId) {
+    coordinator.notifyCheckpointComplete(checkpointId);
+  }
+
+  public void coordinatorFails() throws Exception {
+    this.coordinator.close();
+    this.coordinator.start();
+    this.coordinator.setExecutor(new MockCoordinatorExecutor(coordinatorContext));
+  }
+
+  public void checkpointFails(long checkpointId) {
+    coordinator.notifyCheckpointAborted(checkpointId);
+  }
+
+  public StreamWriteOperatorCoordinator getCoordinator() {
+    return coordinator;
+  }
+
+  public MockOperatorCoordinatorContext getCoordinatorContext() {
+    return coordinatorContext;
+  }
+
+  @Override
+  public void close() throws Exception {
+    this.coordinator.close();
+    this.ioManager.close();
+    this.writeFunction.close();
+    if (this.bucketIdToFileId != null) {
+      this.bucketIdToFileId.clear();
+    }
+    if (needSortInput) {
+      this.sortOperator.close();
+    }
+  }
+
+  // -------------------------------------------------------------------------
+  //  Utilities
+  // -------------------------------------------------------------------------
+
+  private void setupWriteFunction() throws Exception {
+    writeFunction = new BulkInsertWriteFunction<>(conf, rowType);
+    writeFunction.setRuntimeContext(runtimeContext);
+    writeFunction.setOperatorEventGateway(gateway);
+    writeFunction.open(conf);
+    // handle the bootstrap event
+    coordinator.handleEventFromOperator(0, getNextEvent());
+  }
+
+  private void setupMapFunction() {
+    RowDataKeyGen keyGen = RowDataKeyGen.instance(conf, rowType);
+    String indexKeys = OptionsResolver.getIndexKeyField(conf);
+    int numBuckets = conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);
+    boolean needFixedFileIdSuffix = OptionsResolver.isNonBlockingConcurrencyControl(conf);
+    this.bucketIdToFileId = new HashMap<>();
+    this.mapFunction = r -> BucketBulkInsertWriterHelper.rowWithFileId(bucketIdToFileId, keyGen, r, indexKeys, numBuckets, needFixedFileIdSuffix);
+  }
+
+  private void setupSortOperator() throws Exception {
+    MockEnvironment environment = new MockEnvironmentBuilder()
+        .setTaskName("mockTask")
+        .setManagedMemorySize(12 * MemoryManager.DEFAULT_PAGE_SIZE)
+        .setIOManager(ioManager)
+        .build();
+    StreamTask<?, ?> streamTask = new MockStreamTaskBuilder(environment)
+        .setConfig(new StreamConfig(conf))
+        .setExecutionConfig(new ExecutionConfig().enableObjectReuse())
+        .build();
+    SortOperatorGen sortOperatorGen = BucketBulkInsertWriterHelper.getFileIdSorterGen(rowTypeWithFileId);
+    this.sortOperator = (SortOperator) sortOperatorGen.createSortOperator(conf);
+    this.sortOperator.setProcessingTimeService(new TestProcessingTimeService());
+    this.output = new CollectorOutput<>();
+    StreamConfig streamConfig = new StreamConfig(conf);
+    streamConfig.setOperatorID(new OperatorID());
+    RowDataSerializer inputSerializer = new RowDataSerializer(rowTypeWithFileId);
+    TestStreamConfigs.setupNetworkInputs(streamConfig, inputSerializer);
+    streamConfig.setManagedMemoryFractionOperatorOfUseCase(ManagedMemoryUseCase.OPERATOR, .99);
+    this.sortOperator.setup(streamTask, streamConfig, output);
+    this.sortOperator.open();
+  }
+}
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
index 1b301cb..b2208b8 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
@@ -306,6 +306,31 @@
     }
 
     /**
+     * Flush data and commit using endInput. Asserts the commit would fail.
+     */
+    public void endInputThrows(Class<?> cause, String msg) {
+      // this triggers the data write and event send
+      this.pipeline.endInput();
+      final OperatorEvent nextEvent = this.pipeline.getNextEvent();
+      this.pipeline.getCoordinator().handleEventFromOperator(0, nextEvent);
+      assertTrue(this.pipeline.getCoordinatorContext().isJobFailed(), "Job should have been failed");
+      Throwable throwable = this.pipeline.getCoordinatorContext().getJobFailureReason().getCause();
+      assertThat(throwable, instanceOf(cause));
+      assertThat(throwable.getMessage(), containsString(msg));
+    }
+
+    /**
+     * Flush data and commit using endInput.
+     */
+    public TestHarness endInput() {
+      // this triggers the data write and event send
+      this.pipeline.endInput();
+      final OperatorEvent nextEvent = this.pipeline.getNextEvent();
+      this.pipeline.getCoordinator().handleEventFromOperator(0, nextEvent);
+      return this;
+    }
+
+    /**
      * Asserts the checkpoint with id {@code checkpointId} throws when completes .
      */
     public TestHarness checkpointCompleteThrows(long checkpointId, Class<?> cause, String msg) {
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index afaf360..732065c 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -34,6 +34,7 @@
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.sink.utils.BucketStreamWriteFunctionWrapper;
+import org.apache.hudi.sink.utils.BulkInsertFunctionWrapper;
 import org.apache.hudi.sink.utils.ConsistentBucketStreamWriteFunctionWrapper;
 import org.apache.hudi.sink.utils.InsertFunctionWrapper;
 import org.apache.hudi.sink.utils.StreamWriteFunctionWrapper;
@@ -553,7 +554,9 @@
    * Initializes a writing pipeline with given configuration.
    */
   public static TestFunctionWrapper<RowData> getWritePipeline(String basePath, Configuration conf) throws Exception {
-    if (OptionsResolver.isAppendMode(conf)) {
+    if (OptionsResolver.isBulkInsertOperation(conf)) {
+      return new BulkInsertFunctionWrapper<>(basePath, conf);
+    } else if (OptionsResolver.isAppendMode(conf)) {
       return new InsertFunctionWrapper<>(basePath, conf);
     } else if (OptionsResolver.isBucketIndexType(conf)) {
       if (OptionsResolver.isConsistentHashingBucketIndexType(conf)) {
diff --git a/hudi-flink-datasource/hudi-flink1.13.x/src/test/java/org/apache/hudi/adapter/TestStreamConfigs.java b/hudi-flink-datasource/hudi-flink1.13.x/src/test/java/org/apache/hudi/adapter/TestStreamConfigs.java
new file mode 100644
index 0000000..4b62c79
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink1.13.x/src/test/java/org/apache/hudi/adapter/TestStreamConfigs.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+
+/**
+ * StreamConfig for test goals.
+ */
+public class TestStreamConfigs {
+
+  public static void setupNetworkInputs(StreamConfig streamConfig, TypeSerializer<?>... inputSerializers) {
+    streamConfig.setupNetworkInputs(inputSerializers);
+  }
+}
diff --git a/hudi-flink-datasource/hudi-flink1.14.x/src/test/java/org/apache/hudi/adapter/TestStreamConfigs.java b/hudi-flink-datasource/hudi-flink1.14.x/src/test/java/org/apache/hudi/adapter/TestStreamConfigs.java
new file mode 100644
index 0000000..4b62c79
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink1.14.x/src/test/java/org/apache/hudi/adapter/TestStreamConfigs.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+
+/**
+ * StreamConfig for test goals.
+ */
+public class TestStreamConfigs {
+
+  public static void setupNetworkInputs(StreamConfig streamConfig, TypeSerializer<?>... inputSerializers) {
+    streamConfig.setupNetworkInputs(inputSerializers);
+  }
+}
diff --git a/hudi-flink-datasource/hudi-flink1.15.x/src/test/java/org/apache/hudi/adapter/TestStreamConfigs.java b/hudi-flink-datasource/hudi-flink1.15.x/src/test/java/org/apache/hudi/adapter/TestStreamConfigs.java
new file mode 100644
index 0000000..4b62c79
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink1.15.x/src/test/java/org/apache/hudi/adapter/TestStreamConfigs.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+
+/**
+ * StreamConfig for test goals.
+ */
+public class TestStreamConfigs {
+
+  public static void setupNetworkInputs(StreamConfig streamConfig, TypeSerializer<?>... inputSerializers) {
+    streamConfig.setupNetworkInputs(inputSerializers);
+  }
+}
diff --git a/hudi-flink-datasource/hudi-flink1.16.x/src/test/java/org/apache/hudi/adapter/TestStreamConfigs.java b/hudi-flink-datasource/hudi-flink1.16.x/src/test/java/org/apache/hudi/adapter/TestStreamConfigs.java
new file mode 100644
index 0000000..a7a620b
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink1.16.x/src/test/java/org/apache/hudi/adapter/TestStreamConfigs.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+
+/**
+ * StreamConfig for test goals.
+ */
+public class TestStreamConfigs {
+
+  public static void setupNetworkInputs(StreamConfig streamConfig, TypeSerializer<?>... inputSerializers) {
+    streamConfig.setupNetworkInputs(inputSerializers);
+    // Since Flink 1.16, need call serializeAllConfigs to serialize all object configs synchronously.
+    // See https://issues.apache.org/jira/browse/FLINK-26675.
+    streamConfig.serializeAllConfigs();
+  }
+}
diff --git a/hudi-flink-datasource/hudi-flink1.17.x/src/test/java/org/apache/hudi/adapter/TestStreamConfigs.java b/hudi-flink-datasource/hudi-flink1.17.x/src/test/java/org/apache/hudi/adapter/TestStreamConfigs.java
new file mode 100644
index 0000000..a7a620b
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink1.17.x/src/test/java/org/apache/hudi/adapter/TestStreamConfigs.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+
+/**
+ * StreamConfig for test goals.
+ */
+public class TestStreamConfigs {
+
+  public static void setupNetworkInputs(StreamConfig streamConfig, TypeSerializer<?>... inputSerializers) {
+    streamConfig.setupNetworkInputs(inputSerializers);
+    // Since Flink 1.16, need call serializeAllConfigs to serialize all object configs synchronously.
+    // See https://issues.apache.org/jira/browse/FLINK-26675.
+    streamConfig.serializeAllConfigs();
+  }
+}