[HUDI-6962] Fix the conflicts resolution for bulk insert under NB-CC (#9896)
* Flink bulk_insert with fixed file group id suffix if NB-CC is enabled;
* The bulk_insert writer should resolve conflicts with other writers under OCC strategies.
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java
index 15f6be8..1bea517 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java
@@ -21,6 +21,7 @@
import org.apache.hudi.client.transaction.ConcurrentOperation;
import org.apache.hudi.client.transaction.ConflictResolutionStrategy;
import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -67,8 +68,8 @@
Option<HoodieInstant> lastCompletedTxnOwnerInstant,
boolean reloadActiveTimeline,
Set<String> pendingInstants) throws HoodieWriteConflictException {
- // Skip to resolve conflict if using non-blocking concurrency control
- if (config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl() && !config.isNonBlockingConcurrencyControl()) {
+ WriteOperationType operationType = thisCommitMetadata.map(HoodieCommitMetadata::getOperationType).orElse(null);
+ if (config.needResolveWriteConflict(operationType)) {
// deal with pendingInstants
Stream<HoodieInstant> completedInstantsDuringCurrentWriteOperation = getCompletedInstantsDuringCurrentWriteOperation(table.getMetaClient(), pendingInstants);
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index c9e9b94..8c08bea 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -46,6 +46,7 @@
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.model.RecordPayloadType;
import org.apache.hudi.common.model.WriteConcurrencyMode;
+import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.marker.MarkerType;
@@ -2616,6 +2617,16 @@
return props.getInteger(WRITES_FILEID_ENCODING, HoodieMetadataPayload.RECORD_INDEX_FIELD_FILEID_ENCODING_UUID);
}
+ public boolean needResolveWriteConflict(WriteOperationType operationType) {
+ if (getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
+ // NB-CC don't need to resolve write conflict except bulk insert operation
+ return WriteOperationType.BULK_INSERT == operationType || !isNonBlockingConcurrencyControl();
+ } else {
+ // SINGLE_WRITER case don't need to resolve write conflict
+ return false;
+ }
+ }
+
public boolean isNonBlockingConcurrencyControl() {
return getTableType().equals(HoodieTableType.MERGE_ON_READ)
&& getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index e412678..a52e547 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -302,8 +302,8 @@
* Refresh the last transaction metadata,
* should be called before the Driver starts a new transaction.
*/
- public void preTxn(HoodieTableMetaClient metaClient) {
- if (txnManager.isLockRequired() && !config.isNonBlockingConcurrencyControl()) {
+ public void preTxn(WriteOperationType operationType, HoodieTableMetaClient metaClient) {
+ if (txnManager.isLockRequired() && config.needResolveWriteConflict(operationType)) {
// refresh the meta client which is reused
metaClient.reloadActiveTimeline();
this.lastCompletedTxnAndMetadata = TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient);
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index 34fb5cf..860dffe 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -387,7 +387,7 @@
private void startInstant() {
// refresh the last txn metadata
- this.writeClient.preTxn(this.metaClient);
+ this.writeClient.preTxn(tableState.operationType, this.metaClient);
// put the assignment in front of metadata generation,
// because the instant request from write task is asynchronous.
this.instant = this.writeClient.startCommit(tableState.commitAction, this.metaClient);
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
index fcf38e1..c25a69d 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
@@ -92,16 +92,22 @@
return new SortOperatorGen(rowType, new String[] {FILE_GROUP_META_FIELD});
}
- private static String getFileId(Map<String, String> bucketIdToFileId, RowDataKeyGen keyGen, RowData record, String indexKeys, int numBuckets) {
+ private static String getFileId(Map<String, String> bucketIdToFileId, RowDataKeyGen keyGen, RowData record, String indexKeys, int numBuckets, boolean needFixedFileIdSuffix) {
String recordKey = keyGen.getRecordKey(record);
String partition = keyGen.getPartitionPath(record);
final int bucketNum = BucketIdentifier.getBucketId(recordKey, indexKeys, numBuckets);
String bucketId = partition + bucketNum;
- return bucketIdToFileId.computeIfAbsent(bucketId, k -> BucketIdentifier.newBucketFileIdPrefix(bucketNum));
+ return bucketIdToFileId.computeIfAbsent(bucketId, k -> {
+ if (needFixedFileIdSuffix) {
+ return BucketIdentifier.newBucketFileIdFixedSuffix(bucketNum);
+ } else {
+ return BucketIdentifier.newBucketFileIdPrefix(bucketNum);
+ }
+ });
}
- public static RowData rowWithFileId(Map<String, String> bucketIdToFileId, RowDataKeyGen keyGen, RowData record, String indexKeys, int numBuckets) {
- final String fileId = getFileId(bucketIdToFileId, keyGen, record, indexKeys, numBuckets);
+ public static RowData rowWithFileId(Map<String, String> bucketIdToFileId, RowDataKeyGen keyGen, RowData record, String indexKeys, int numBuckets, boolean needFixedFileIdSuffix) {
+ final String fileId = getFileId(bucketIdToFileId, keyGen, record, indexKeys, numBuckets, needFixedFileIdSuffix);
return GenericRowData.of(StringData.fromString(fileId), record);
}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java
index d44ef25..dfd8e0e 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java
@@ -117,11 +117,11 @@
this.ckpMetadata = CkpMetadataFactory.getCkpMetadata(writeClient.getConfig(), config);
this.initInstant = lastPendingInstant();
sendBootstrapEvent();
- initWriterHelper();
}
@Override
public void processElement(I value, Context ctx, Collector<Object> out) throws IOException {
+ initWriterHelperIfNeeded();
this.writerHelper.write((RowData) value);
}
@@ -136,6 +136,7 @@
* End input action for batch source.
*/
public void endInput() {
+ initWriterHelperIfNeeded();
final List<WriteStatus> writeStatus = this.writerHelper.getWriteStatuses(this.taskID);
final WriteMetadataEvent event = WriteMetadataEvent.builder()
@@ -165,11 +166,13 @@
// Utilities
// -------------------------------------------------------------------------
- private void initWriterHelper() {
- String instant = instantToWrite();
- this.writerHelper = WriterHelpers.getWriterHelper(this.config, this.writeClient.getHoodieTable(), this.writeClient.getConfig(),
- instant, this.taskID, getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getAttemptNumber(),
- this.rowType);
+ private void initWriterHelperIfNeeded() {
+ if (writerHelper == null) {
+ String instant = instantToWrite();
+ this.writerHelper = WriterHelpers.getWriterHelper(this.config, this.writeClient.getHoodieTable(), this.writeClient.getConfig(),
+ instant, this.taskID, getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getAttemptNumber(),
+ this.rowType);
+ }
}
private void sendBootstrapEvent() {
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
index cb9344f..e66009a 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
@@ -124,10 +124,11 @@
RowDataKeyGen keyGen = RowDataKeyGen.instance(conf, rowType);
RowType rowTypeWithFileId = BucketBulkInsertWriterHelper.rowTypeWithFileId(rowType);
InternalTypeInfo<RowData> typeInfo = InternalTypeInfo.of(rowTypeWithFileId);
+ boolean needFixedFileIdSuffix = OptionsResolver.isNonBlockingConcurrencyControl(conf);
Map<String, String> bucketIdToFileId = new HashMap<>();
dataStream = dataStream.partitionCustom(partitioner, keyGen::getHoodieKey)
- .map(record -> BucketBulkInsertWriterHelper.rowWithFileId(bucketIdToFileId, keyGen, record, indexKeys, numBuckets), typeInfo)
+ .map(record -> BucketBulkInsertWriterHelper.rowWithFileId(bucketIdToFileId, keyGen, record, indexKeys, numBuckets, needFixedFileIdSuffix), typeInfo)
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); // same parallelism as write task to avoid shuffle
if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT)) {
SortOperatorGen sortOperatorGen = BucketBulkInsertWriterHelper.getFileIdSorterGen(rowTypeWithFileId);
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
index 79320e1..816c9a4 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
@@ -25,6 +25,7 @@
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.exception.HoodieWriteConflictException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.util.FlinkWriteClients;
import org.apache.hudi.utils.TestData;
@@ -228,6 +229,121 @@
TestData.checkWrittenData(tempFile, readOptimizedResult, 1);
}
+ // case1: txn1 is upsert writer, txn2 is bulk_insert writer.
+ // |----------- txn1 -----------|
+ // |----- txn2 ------|
+ // the txn2 would fail to commit caused by conflict
+ @Test
+ public void testBulkInsertInMultiWriter() throws Exception {
+ conf.setString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name());
+ conf.setString(FlinkOptions.INDEX_TYPE, HoodieIndex.IndexType.BUCKET.name());
+ conf.setString(FlinkOptions.PAYLOAD_CLASS_NAME, PartialUpdateAvroPayload.class.getName());
+ // disable schedule compaction in writers
+ conf.setBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED, false);
+ conf.setBoolean(FlinkOptions.PRE_COMBINE, true);
+
+ // start pipeline1 and insert record: [id1,Danny,null,1,par1], suspend the tx commit
+ List<RowData> dataset1 = Collections.singletonList(
+ insertRow(
+ StringData.fromString("id1"), StringData.fromString("Danny"), null,
+ TimestampData.fromEpochMillis(1), StringData.fromString("par1")));
+ TestHarness pipeline1 = preparePipeline(conf)
+ .consume(dataset1)
+ .assertEmptyDataFiles();
+
+ // start pipeline2 and bulk insert record: [id1,null,23,1,par1], suspend the tx commit
+ Configuration conf2 = conf.clone();
+ conf2.setString(FlinkOptions.OPERATION, "BULK_INSERT");
+ conf2.setString(FlinkOptions.WRITE_CLIENT_ID, "2");
+ List<RowData> dataset2 = Collections.singletonList(
+ insertRow(
+ StringData.fromString("id1"), null, 23,
+ TimestampData.fromEpochMillis(2), StringData.fromString("par1")));
+ TestHarness pipeline2 = preparePipeline(conf2)
+ .consume(dataset2);
+
+ // step to commit the 1st txn
+ pipeline1.checkpoint(1)
+ .assertNextEvent()
+ .checkpointComplete(1);
+
+ // step to commit the 2nd txn, should throw exception
+ pipeline2.endInputThrows(HoodieWriteConflictException.class, "Cannot resolve conflicts");
+ }
+
+ // case1: txn1 is upsert writer, txn2 is bulk_insert writer.
+ // |----- txn1 ------|
+ // |----------- txn2 -----------|
+ // both two txn would success to commit
+ @Test
+ public void testBulkInsertInSequence() throws Exception {
+ conf.setString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name());
+ conf.setString(FlinkOptions.INDEX_TYPE, HoodieIndex.IndexType.BUCKET.name());
+ conf.setString(FlinkOptions.PAYLOAD_CLASS_NAME, PartialUpdateAvroPayload.class.getName());
+ // disable schedule compaction in writers
+ conf.setBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED, false);
+ conf.setBoolean(FlinkOptions.PRE_COMBINE, true);
+
+ Configuration conf1 = conf.clone();
+ conf1.setString(FlinkOptions.OPERATION, "BULK_INSERT");
+ // start pipeline1 and bulk insert record: [id1,Danny,null,1,par1], suspend the tx commit
+ List<RowData> dataset1 = Collections.singletonList(
+ insertRow(
+ StringData.fromString("id1"), StringData.fromString("Danny"), null,
+ TimestampData.fromEpochMillis(1), StringData.fromString("par1")));
+ TestHarness pipeline1 = preparePipeline(conf1)
+ .consume(dataset1);
+
+ // start pipeline2 and insert record: [id1,null,23,2,par1], suspend the tx commit
+ Configuration conf2 = conf.clone();
+ conf2.setString(FlinkOptions.WRITE_CLIENT_ID, "2");
+ List<RowData> dataset2 = Collections.singletonList(
+ insertRow(
+ StringData.fromString("id1"), null, 23,
+ TimestampData.fromEpochMillis(2), StringData.fromString("par1")));
+ TestHarness pipeline2 = preparePipeline(conf2)
+ .consume(dataset2);
+
+ // step to commit the 1st txn
+ pipeline1.endInput();
+
+ // step to commit the 2nd data
+ pipeline2.checkpoint(1)
+ .assertNextEvent()
+ .checkpointComplete(1);
+
+ // snapshot result is [(id1,Danny,23,2,par1)] after two writers finish to commit
+ Map<String, String> tmpSnapshotResult = Collections.singletonMap("par1", "[id1,par1,id1,Danny,23,2,par1]");
+ pipeline2.checkWrittenData(tmpSnapshotResult, 1);
+
+ // schedule compaction outside of all writers
+ try (HoodieFlinkWriteClient writeClient = FlinkWriteClients.createWriteClient(conf)) {
+ Option<String> scheduleInstant = writeClient.scheduleCompaction(Option.empty());
+ assertNotNull(scheduleInstant.get());
+ }
+
+ // step to commit the 3rd txn
+ // it also triggers inline compactor
+ List<RowData> dataset3 = Collections.singletonList(
+ insertRow(
+ StringData.fromString("id3"), StringData.fromString("Julian"), 53,
+ TimestampData.fromEpochMillis(4), StringData.fromString("par1")));
+ pipeline2.consume(dataset3)
+ .checkpoint(2)
+ .assertNextEvent()
+ .checkpointComplete(2);
+
+ // snapshot read result is [(id1,Danny,23,2,par1), (id3,Julian,53,4,par1)] after three writers finish to commit
+ Map<String, String> finalSnapshotResult = Collections.singletonMap(
+ "par1",
+ "[id1,par1,id1,Danny,23,2,par1, id3,par1,id3,Julian,53,4,par1]");
+ pipeline2.checkWrittenData(finalSnapshotResult, 1);
+ // read optimized read result is [(id1,Danny,23,2,par1)]
+ // because the data files belongs 3rd commit is not included in the last compaction.
+ Map<String, String> readOptimizedResult = Collections.singletonMap("par1", "[id1,par1,id1,Danny,23,2,par1]");
+ TestData.checkWrittenData(tempFile, readOptimizedResult, 1);
+ }
+
@Override
protected HoodieTableType getTableType() {
return HoodieTableType.MERGE_ON_READ;
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BulkInsertFunctionWrapper.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BulkInsertFunctionWrapper.java
new file mode 100644
index 0000000..92f8f6d
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BulkInsertFunctionWrapper.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.utils;
+
+import org.apache.hudi.adapter.TestStreamConfigs;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.OptionsResolver;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
+import org.apache.hudi.sink.bucket.BucketBulkInsertWriterHelper;
+import org.apache.hudi.sink.bulk.BulkInsertWriteFunction;
+import org.apache.hudi.sink.bulk.RowDataKeyGen;
+import org.apache.hudi.sink.bulk.sort.SortOperator;
+import org.apache.hudi.sink.bulk.sort.SortOperatorGen;
+import org.apache.hudi.sink.event.WriteMetadataEvent;
+import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.collect.utils.MockOperatorEventGateway;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.streaming.util.MockStreamTaskBuilder;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A wrapper class to manipulate the {@link BulkInsertWriteFunction} instance for testing.
+ *
+ * @param <I> Input type
+ */
+public class BulkInsertFunctionWrapper<I> implements TestFunctionWrapper<I> {
+ private final Configuration conf;
+ private final RowType rowType;
+ private final RowType rowTypeWithFileId;
+
+ private final IOManager ioManager;
+ private final MockStreamingRuntimeContext runtimeContext;
+ private final MockOperatorEventGateway gateway;
+ private final MockOperatorCoordinatorContext coordinatorContext;
+ private final StreamWriteOperatorCoordinator coordinator;
+ private final boolean needSortInput;
+
+ private BulkInsertWriteFunction<RowData> writeFunction;
+ private MapFunction<RowData, RowData> mapFunction;
+ private Map<String, String> bucketIdToFileId;
+ private SortOperator sortOperator;
+ private CollectorOutput<RowData> output;
+
+ public BulkInsertFunctionWrapper(String tablePath, Configuration conf) throws Exception {
+ ioManager = new IOManagerAsync();
+ MockEnvironment environment = new MockEnvironmentBuilder()
+ .setTaskName("mockTask")
+ .setManagedMemorySize(4 * MemoryManager.DEFAULT_PAGE_SIZE)
+ .setIOManager(ioManager)
+ .build();
+ this.runtimeContext = new MockStreamingRuntimeContext(false, 1, 0, environment);
+ this.gateway = new MockOperatorEventGateway();
+ this.conf = conf;
+ this.rowType = (RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf)).getLogicalType();
+ this.rowTypeWithFileId = BucketBulkInsertWriterHelper.rowTypeWithFileId(rowType);
+ this.coordinatorContext = new MockOperatorCoordinatorContext(new OperatorID(), 1);
+ this.coordinator = new StreamWriteOperatorCoordinator(conf, this.coordinatorContext);
+ this.needSortInput = conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT);
+ }
+
+ public void openFunction() throws Exception {
+ this.coordinator.start();
+ this.coordinator.setExecutor(new MockCoordinatorExecutor(coordinatorContext));
+ setupWriteFunction();
+ setupMapFunction();
+ if (needSortInput) {
+ setupSortOperator();
+ }
+ }
+
+ public void invoke(I record) throws Exception {
+ RowData recordWithFileId = mapFunction.map((RowData) record);
+ if (needSortInput) {
+ // Sort input first, trigger writeFunction at the #endInput
+ sortOperator.processElement(new StreamRecord(recordWithFileId));
+ } else {
+ writeFunction.processElement(recordWithFileId, null, null);
+ }
+ }
+
+ public WriteMetadataEvent[] getEventBuffer() {
+ return this.coordinator.getEventBuffer();
+ }
+
+ public OperatorEvent getNextEvent() {
+ return this.gateway.getNextEvent();
+ }
+
+ public void checkpointFunction(long checkpointId) {
+ // Do nothing
+ }
+
+ @Override
+ public void endInput() {
+ if (needSortInput) {
+ // sort all inputs of SortOperator and flush to WriteFunction
+ try {
+ sortOperator.endInput();
+ List<RowData> sortedRecords = output.getRecords();
+ for (RowData record : sortedRecords) {
+ writeFunction.processElement(record, null, null);
+ }
+ } catch (Exception e) {
+ throw new HoodieException(e);
+ }
+ }
+ writeFunction.endInput();
+ if (bucketIdToFileId != null) {
+ this.bucketIdToFileId.clear();
+ }
+ }
+
+ public void checkpointComplete(long checkpointId) {
+ coordinator.notifyCheckpointComplete(checkpointId);
+ }
+
+ public void coordinatorFails() throws Exception {
+ this.coordinator.close();
+ this.coordinator.start();
+ this.coordinator.setExecutor(new MockCoordinatorExecutor(coordinatorContext));
+ }
+
+ public void checkpointFails(long checkpointId) {
+ coordinator.notifyCheckpointAborted(checkpointId);
+ }
+
+ public StreamWriteOperatorCoordinator getCoordinator() {
+ return coordinator;
+ }
+
+ public MockOperatorCoordinatorContext getCoordinatorContext() {
+ return coordinatorContext;
+ }
+
+ @Override
+ public void close() throws Exception {
+ this.coordinator.close();
+ this.ioManager.close();
+ this.writeFunction.close();
+ if (this.bucketIdToFileId != null) {
+ this.bucketIdToFileId.clear();
+ }
+ if (needSortInput) {
+ this.sortOperator.close();
+ }
+ }
+
+ // -------------------------------------------------------------------------
+ // Utilities
+ // -------------------------------------------------------------------------
+
+ private void setupWriteFunction() throws Exception {
+ writeFunction = new BulkInsertWriteFunction<>(conf, rowType);
+ writeFunction.setRuntimeContext(runtimeContext);
+ writeFunction.setOperatorEventGateway(gateway);
+ writeFunction.open(conf);
+ // handle the bootstrap event
+ coordinator.handleEventFromOperator(0, getNextEvent());
+ }
+
+ private void setupMapFunction() {
+ RowDataKeyGen keyGen = RowDataKeyGen.instance(conf, rowType);
+ String indexKeys = OptionsResolver.getIndexKeyField(conf);
+ int numBuckets = conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);
+ boolean needFixedFileIdSuffix = OptionsResolver.isNonBlockingConcurrencyControl(conf);
+ this.bucketIdToFileId = new HashMap<>();
+ this.mapFunction = r -> BucketBulkInsertWriterHelper.rowWithFileId(bucketIdToFileId, keyGen, r, indexKeys, numBuckets, needFixedFileIdSuffix);
+ }
+
+ private void setupSortOperator() throws Exception {
+ MockEnvironment environment = new MockEnvironmentBuilder()
+ .setTaskName("mockTask")
+ .setManagedMemorySize(12 * MemoryManager.DEFAULT_PAGE_SIZE)
+ .setIOManager(ioManager)
+ .build();
+ StreamTask<?, ?> streamTask = new MockStreamTaskBuilder(environment)
+ .setConfig(new StreamConfig(conf))
+ .setExecutionConfig(new ExecutionConfig().enableObjectReuse())
+ .build();
+ SortOperatorGen sortOperatorGen = BucketBulkInsertWriterHelper.getFileIdSorterGen(rowTypeWithFileId);
+ this.sortOperator = (SortOperator) sortOperatorGen.createSortOperator(conf);
+ this.sortOperator.setProcessingTimeService(new TestProcessingTimeService());
+ this.output = new CollectorOutput<>();
+ StreamConfig streamConfig = new StreamConfig(conf);
+ streamConfig.setOperatorID(new OperatorID());
+ RowDataSerializer inputSerializer = new RowDataSerializer(rowTypeWithFileId);
+ TestStreamConfigs.setupNetworkInputs(streamConfig, inputSerializer);
+ streamConfig.setManagedMemoryFractionOperatorOfUseCase(ManagedMemoryUseCase.OPERATOR, .99);
+ this.sortOperator.setup(streamTask, streamConfig, output);
+ this.sortOperator.open();
+ }
+}
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
index 1b301cb..b2208b8 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
@@ -306,6 +306,31 @@
}
/**
+ * Flush data and commit using endInput. Asserts the commit would fail.
+ */
+ public void endInputThrows(Class<?> cause, String msg) {
+ // this triggers the data write and event send
+ this.pipeline.endInput();
+ final OperatorEvent nextEvent = this.pipeline.getNextEvent();
+ this.pipeline.getCoordinator().handleEventFromOperator(0, nextEvent);
+ assertTrue(this.pipeline.getCoordinatorContext().isJobFailed(), "Job should have been failed");
+ Throwable throwable = this.pipeline.getCoordinatorContext().getJobFailureReason().getCause();
+ assertThat(throwable, instanceOf(cause));
+ assertThat(throwable.getMessage(), containsString(msg));
+ }
+
+ /**
+ * Flush data and commit using endInput.
+ */
+ public TestHarness endInput() {
+ // this triggers the data write and event send
+ this.pipeline.endInput();
+ final OperatorEvent nextEvent = this.pipeline.getNextEvent();
+ this.pipeline.getCoordinator().handleEventFromOperator(0, nextEvent);
+ return this;
+ }
+
+ /**
* Asserts the checkpoint with id {@code checkpointId} throws when completes .
*/
public TestHarness checkpointCompleteThrows(long checkpointId, Class<?> cause, String msg) {
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index afaf360..732065c 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -34,6 +34,7 @@
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.sink.utils.BucketStreamWriteFunctionWrapper;
+import org.apache.hudi.sink.utils.BulkInsertFunctionWrapper;
import org.apache.hudi.sink.utils.ConsistentBucketStreamWriteFunctionWrapper;
import org.apache.hudi.sink.utils.InsertFunctionWrapper;
import org.apache.hudi.sink.utils.StreamWriteFunctionWrapper;
@@ -553,7 +554,9 @@
* Initializes a writing pipeline with given configuration.
*/
public static TestFunctionWrapper<RowData> getWritePipeline(String basePath, Configuration conf) throws Exception {
- if (OptionsResolver.isAppendMode(conf)) {
+ if (OptionsResolver.isBulkInsertOperation(conf)) {
+ return new BulkInsertFunctionWrapper<>(basePath, conf);
+ } else if (OptionsResolver.isAppendMode(conf)) {
return new InsertFunctionWrapper<>(basePath, conf);
} else if (OptionsResolver.isBucketIndexType(conf)) {
if (OptionsResolver.isConsistentHashingBucketIndexType(conf)) {
diff --git a/hudi-flink-datasource/hudi-flink1.13.x/src/test/java/org/apache/hudi/adapter/TestStreamConfigs.java b/hudi-flink-datasource/hudi-flink1.13.x/src/test/java/org/apache/hudi/adapter/TestStreamConfigs.java
new file mode 100644
index 0000000..4b62c79
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink1.13.x/src/test/java/org/apache/hudi/adapter/TestStreamConfigs.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+
+/**
+ * StreamConfig for test goals.
+ */
+public class TestStreamConfigs {
+
+ public static void setupNetworkInputs(StreamConfig streamConfig, TypeSerializer<?>... inputSerializers) {
+ streamConfig.setupNetworkInputs(inputSerializers);
+ }
+}
diff --git a/hudi-flink-datasource/hudi-flink1.14.x/src/test/java/org/apache/hudi/adapter/TestStreamConfigs.java b/hudi-flink-datasource/hudi-flink1.14.x/src/test/java/org/apache/hudi/adapter/TestStreamConfigs.java
new file mode 100644
index 0000000..4b62c79
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink1.14.x/src/test/java/org/apache/hudi/adapter/TestStreamConfigs.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+
+/**
+ * StreamConfig for test goals.
+ */
+public class TestStreamConfigs {
+
+ public static void setupNetworkInputs(StreamConfig streamConfig, TypeSerializer<?>... inputSerializers) {
+ streamConfig.setupNetworkInputs(inputSerializers);
+ }
+}
diff --git a/hudi-flink-datasource/hudi-flink1.15.x/src/test/java/org/apache/hudi/adapter/TestStreamConfigs.java b/hudi-flink-datasource/hudi-flink1.15.x/src/test/java/org/apache/hudi/adapter/TestStreamConfigs.java
new file mode 100644
index 0000000..4b62c79
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink1.15.x/src/test/java/org/apache/hudi/adapter/TestStreamConfigs.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+
+/**
+ * StreamConfig for test goals.
+ */
+public class TestStreamConfigs {
+
+ public static void setupNetworkInputs(StreamConfig streamConfig, TypeSerializer<?>... inputSerializers) {
+ streamConfig.setupNetworkInputs(inputSerializers);
+ }
+}
diff --git a/hudi-flink-datasource/hudi-flink1.16.x/src/test/java/org/apache/hudi/adapter/TestStreamConfigs.java b/hudi-flink-datasource/hudi-flink1.16.x/src/test/java/org/apache/hudi/adapter/TestStreamConfigs.java
new file mode 100644
index 0000000..a7a620b
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink1.16.x/src/test/java/org/apache/hudi/adapter/TestStreamConfigs.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+
+/**
+ * StreamConfig for test goals.
+ */
+public class TestStreamConfigs {
+
+ public static void setupNetworkInputs(StreamConfig streamConfig, TypeSerializer<?>... inputSerializers) {
+ streamConfig.setupNetworkInputs(inputSerializers);
+ // Since Flink 1.16, need call serializeAllConfigs to serialize all object configs synchronously.
+ // See https://issues.apache.org/jira/browse/FLINK-26675.
+ streamConfig.serializeAllConfigs();
+ }
+}
diff --git a/hudi-flink-datasource/hudi-flink1.17.x/src/test/java/org/apache/hudi/adapter/TestStreamConfigs.java b/hudi-flink-datasource/hudi-flink1.17.x/src/test/java/org/apache/hudi/adapter/TestStreamConfigs.java
new file mode 100644
index 0000000..a7a620b
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink1.17.x/src/test/java/org/apache/hudi/adapter/TestStreamConfigs.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+
+/**
+ * StreamConfig for test goals.
+ */
+public class TestStreamConfigs {
+
+ public static void setupNetworkInputs(StreamConfig streamConfig, TypeSerializer<?>... inputSerializers) {
+ streamConfig.setupNetworkInputs(inputSerializers);
+ // Since Flink 1.16, need call serializeAllConfigs to serialize all object configs synchronously.
+ // See https://issues.apache.org/jira/browse/FLINK-26675.
+ streamConfig.serializeAllConfigs();
+ }
+}