[HUDI-2087] Support Append only in Flink stream (#3390)

Co-authored-by: 喻兆靖 <yuzhaojing@bilibili.com>
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 07b419e..5b751e4 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -437,6 +437,12 @@
     final HoodieRecordLocation loc = record.getCurrentLocation();
     final String fileID = loc.getFileId();
     final String partitionPath = record.getPartitionPath();
+    // Always use FlinkCreateHandle when insert duplication turns on
+    if (config.allowDuplicateInserts()) {
+      return new FlinkCreateHandle<>(config, instantTime, table, partitionPath,
+          fileID, table.getTaskContextSupplier());
+    }
+
     if (bucketToHandles.containsKey(fileID)) {
       MiniBatchHandle lastHandle = (MiniBatchHandle) bucketToHandles.get(fileID);
       if (lastHandle.shouldReplace()) {
diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index f839b5e..34b050c 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -209,6 +209,12 @@
       .defaultValue(TABLE_TYPE_COPY_ON_WRITE)
       .withDescription("Type of table to write. COPY_ON_WRITE (or) MERGE_ON_READ");
 
+  public static final ConfigOption<Boolean> INSERT_ALLOW_DUP = ConfigOptions
+          .key("write.insert.allow_dup")
+          .booleanType()
+          .defaultValue(true)
+          .withDescription("Whether to allow data duplication for INSERT operation, if enabled, writes the base files directly, default true");
+
   public static final ConfigOption<String> OPERATION = ConfigOptions
       .key("write.operation")
       .stringType()
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
index 688cf1b..e6c59b1 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
@@ -91,8 +91,6 @@
 
   private final Configuration conf;
 
-  private transient org.apache.hadoop.conf.Configuration hadoopConf;
-
   private final boolean isChangingRecords;
 
   /**
@@ -117,21 +115,25 @@
   public void open(Configuration parameters) throws Exception {
     super.open(parameters);
     HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(this.conf);
-    this.hadoopConf = StreamerUtil.getHadoopConf();
     HoodieFlinkEngineContext context = new HoodieFlinkEngineContext(
-        new SerializableConfiguration(this.hadoopConf),
+        new SerializableConfiguration(StreamerUtil.getHadoopConf()),
         new FlinkTaskContextSupplier(getRuntimeContext()));
     this.bucketAssigner = BucketAssigners.create(
         getRuntimeContext().getIndexOfThisSubtask(),
         getRuntimeContext().getMaxNumberOfParallelSubtasks(),
         getRuntimeContext().getNumberOfParallelSubtasks(),
-        WriteOperationType.isOverwrite(WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION))),
+        ignoreSmallFiles(writeConfig),
         HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE)),
         context,
         writeConfig);
     this.payloadCreation = PayloadCreation.instance(this.conf);
   }
 
+  private boolean ignoreSmallFiles(HoodieWriteConfig writeConfig) {
+    WriteOperationType operationType = WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION));
+    return WriteOperationType.isOverwrite(operationType) || writeConfig.allowDuplicateInserts();
+  }
+
   @Override
   public void snapshotState(FunctionSnapshotContext context) {
     this.bucketAssigner.reset();
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigners.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigners.java
index 8d304db..13d4587 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigners.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigners.java
@@ -35,25 +35,25 @@
   /**
    * Creates a {@code BucketAssigner}.
    *
-   * @param taskID         The task ID
-   * @param maxParallelism The max parallelism
-   * @param numTasks       The number of tasks
-   * @param overwrite      Whether the write operation is OVERWRITE
-   * @param tableType      The table type
-   * @param context        The engine context
-   * @param config         The configuration
+   * @param taskID           The task ID
+   * @param maxParallelism   The max parallelism
+   * @param numTasks         The number of tasks
+   * @param ignoreSmallFiles Whether to ignore the small files
+   * @param tableType        The table type
+   * @param context          The engine context
+   * @param config           The configuration
    * @return the bucket assigner instance
    */
   public static BucketAssigner create(
       int taskID,
       int maxParallelism,
       int numTasks,
-      boolean overwrite,
+      boolean ignoreSmallFiles,
       HoodieTableType tableType,
       HoodieFlinkEngineContext context,
       HoodieWriteConfig config) {
     boolean delta = tableType.equals(HoodieTableType.MERGE_ON_READ);
-    WriteProfile writeProfile = WriteProfiles.singleton(overwrite, delta, config, context);
+    WriteProfile writeProfile = WriteProfiles.singleton(ignoreSmallFiles, delta, config, context);
     return new BucketAssigner(taskID, maxParallelism, numTasks, writeProfile, config);
   }
 }
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/OverwriteWriteProfile.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/EmptyWriteProfile.java
similarity index 74%
rename from hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/OverwriteWriteProfile.java
rename to hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/EmptyWriteProfile.java
index 8b08446..3cdd798 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/OverwriteWriteProfile.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/EmptyWriteProfile.java
@@ -26,13 +26,18 @@
 import java.util.List;
 
 /**
- * WriteProfile for INSERT OVERWRITE and INSERT OVERWRITE TABLE operations,
- * this WriteProfile always skip the existing small files because of the 'OVERWRITE' semantics.
+ * WriteProfile that always return empty small files.
+ *
+ * <p>This write profile is used for cases:
+ * i). INSERT OVERWRITE and INSERT OVERWRITE TABLE operations,
+ * the existing small files are ignored because of the 'OVERWRITE' semantics;
+ * ii). INSERT operation when data file merge is disabled.
+ *
  *
  * <p>Note: assumes the index can always index log files for Flink write.
  */
-public class OverwriteWriteProfile extends WriteProfile {
-  public OverwriteWriteProfile(HoodieWriteConfig config, HoodieFlinkEngineContext context) {
+public class EmptyWriteProfile extends WriteProfile {
+  public EmptyWriteProfile(HoodieWriteConfig config, HoodieFlinkEngineContext context) {
     super(config, context);
   }
 
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java
index 093cef5..1277b19 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java
@@ -53,22 +53,22 @@
 
   private WriteProfiles() {}
 
-  public static synchronized  WriteProfile singleton(
-      boolean overwrite,
+  public static synchronized WriteProfile singleton(
+      boolean ignoreSmallFiles,
       boolean delta,
       HoodieWriteConfig config,
       HoodieFlinkEngineContext context) {
     return PROFILES.computeIfAbsent(config.getBasePath(),
-        k -> getWriteProfile(overwrite, delta, config, context));
+        k -> getWriteProfile(ignoreSmallFiles, delta, config, context));
   }
 
   private static WriteProfile getWriteProfile(
-      boolean overwrite,
+      boolean ignoreSmallFiles,
       boolean delta,
       HoodieWriteConfig config,
       HoodieFlinkEngineContext context) {
-    if (overwrite) {
-      return new OverwriteWriteProfile(config, context);
+    if (ignoreSmallFiles) {
+      return new EmptyWriteProfile(config, context);
     } else if (delta) {
       return new DeltaWriteProfile(config, context);
     } else {
diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
index 4597d09..a7ef14f 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
@@ -69,6 +69,9 @@
   @Parameter(names = {"--table-type"}, description = "Type of table. COPY_ON_WRITE (or) MERGE_ON_READ.", required = true)
   public String tableType;
 
+  @Parameter(names = {"--insert-allow-dup"}, description = "Whether to allow data duplication for INSERT operation, if enabled, writes the base files directly.", required = true)
+  public Boolean insertAllowDup = true;
+
   @Parameter(names = {"--props"}, description = "Path to properties file on localfs or dfs, with configurations for "
       + "hoodie client, schema provider, key generator and data source. For hoodie client props, sane defaults are "
       + "used, but recommend use to provide basic things like metrics endpoints, hive configs etc. For sources, refer"
@@ -305,6 +308,7 @@
     conf.setString(FlinkOptions.TABLE_NAME, config.targetTableName);
     // copy_on_write works same as COPY_ON_WRITE
     conf.setString(FlinkOptions.TABLE_TYPE, config.tableType.toUpperCase());
+    conf.setBoolean(FlinkOptions.INSERT_ALLOW_DUP, config.insertAllowDup);
     conf.setString(FlinkOptions.OPERATION, config.operation.value());
     conf.setString(FlinkOptions.PRECOMBINE_FIELD, config.sourceOrderingField);
     conf.setString(FlinkOptions.PAYLOAD_CLASS, config.payloadClassName);
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
index a8fe93b..d3c5388 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
@@ -62,7 +62,7 @@
 
     Configuration conf = (Configuration) helper.getOptions();
     TableSchema schema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
-    validateRequiredFields(conf, schema);
+    sanityCheck(conf, schema);
     setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), context.getCatalogTable(), schema);
 
     Path path = new Path(conf.getOptional(FlinkOptions.PATH).orElseThrow(() ->
@@ -79,7 +79,7 @@
   public DynamicTableSink createDynamicTableSink(Context context) {
     Configuration conf = FlinkOptions.fromMap(context.getCatalogTable().getOptions());
     TableSchema schema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
-    validateRequiredFields(conf, schema);
+    sanityCheck(conf, schema);
     setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), context.getCatalogTable(), schema);
     return new HoodieTableSink(conf, schema);
   }
@@ -103,12 +103,13 @@
   //  Utilities
   // -------------------------------------------------------------------------
 
-  /** Validate required options. For e.g, record key and pre_combine key.
+  /**
+   * The sanity check.
    *
    * @param conf The table options
    * @param schema The table schema
    */
-  private void validateRequiredFields(Configuration conf, TableSchema schema) {
+  private void sanityCheck(Configuration conf, TableSchema schema) {
     List<String> fields = Arrays.stream(schema.getFieldNames()).collect(Collectors.toList());
 
     // validate record key in pk absence.
@@ -128,6 +129,11 @@
       throw new ValidationException("Field " + preCombineField + " does not exist in the table schema."
           + "Please check 'write.precombine.field' option.");
     }
+
+    if (conf.getString(FlinkOptions.TABLE_TYPE).toUpperCase().equals(FlinkOptions.TABLE_TYPE_MERGE_ON_READ)
+        && conf.getBoolean(FlinkOptions.INSERT_ALLOW_DUP)) {
+      throw new ValidationException("Option 'write.insert.allow_dup' is only allowed for COPY_ON_WRITE table.");
+    }
   }
 
   /**
diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index d42993b..7967b69 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -27,6 +27,7 @@
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.EngineType;
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.util.Option;
@@ -145,6 +146,7 @@
             .withEngineType(EngineType.FLINK)
             .withPath(conf.getString(FlinkOptions.PATH))
             .combineInput(conf.getBoolean(FlinkOptions.INSERT_DROP_DUPS), true)
+            .withMergeAllowDuplicateOnInserts(allowDuplicateInserts(conf))
             .withCompactionConfig(
                 HoodieCompactionConfig.newBuilder()
                     .withPayloadClass(conf.getString(FlinkOptions.PAYLOAD_CLASS))
@@ -345,4 +347,9 @@
       throw new IOException("Could not load transformer class(es) " + classNames, e);
     }
   }
+
+  public static boolean allowDuplicateInserts(Configuration conf) {
+    WriteOperationType operationType = WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION));
+    return operationType == WriteOperationType.INSERT && conf.getBoolean(FlinkOptions.INSERT_ALLOW_DUP);
+  }
 }
diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
index 7e060f7..29a7455 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
@@ -23,6 +23,7 @@
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 import org.apache.hudi.common.table.view.FileSystemViewStorageType;
@@ -533,6 +534,81 @@
   }
 
   @Test
+  public void testInsertAllowsDuplication() throws Exception {
+    // reset the config option
+    conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0006); // 630 bytes batch size
+    conf.setString(FlinkOptions.OPERATION, WriteOperationType.INSERT.value());
+    funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
+
+    // open the function and ingest data
+    funcWrapper.openFunction();
+    // Each record is 208 bytes. so 4 records expect to trigger a mini-batch write
+    for (RowData rowData : TestData.DATA_SET_INSERT_SAME_KEY) {
+      funcWrapper.invoke(rowData);
+    }
+
+    // this triggers the data write and event send
+    funcWrapper.checkpointFunction(1);
+    Map<String, List<HoodieRecord>> dataBuffer = funcWrapper.getDataBuffer();
+    assertThat("All data should be flushed out", dataBuffer.size(), is(0));
+
+    final OperatorEvent event1 = funcWrapper.getNextEvent(); // remove the first event first
+    final OperatorEvent event2 = funcWrapper.getNextEvent();
+    assertThat("The operator expect to send an event", event2, instanceOf(WriteMetadataEvent.class));
+
+    funcWrapper.getCoordinator().handleEventFromOperator(0, event1);
+    funcWrapper.getCoordinator().handleEventFromOperator(0, event2);
+    assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
+
+    String instant = funcWrapper.getWriteClient()
+            .getLastPendingInstant(getTableType());
+
+    funcWrapper.checkpointComplete(1);
+
+    Map<String, String> expected = new HashMap<>();
+
+    expected.put("par1", "["
+        + "id1,par1,id1,Danny,23,0,par1, "
+        + "id1,par1,id1,Danny,23,1,par1, "
+        + "id1,par1,id1,Danny,23,2,par1, "
+        + "id1,par1,id1,Danny,23,3,par1, "
+        + "id1,par1,id1,Danny,23,4,par1]");
+
+    TestData.checkWrittenAllData(tempFile, expected, 1);
+
+    // started a new instant already
+    checkInflightInstant(funcWrapper.getWriteClient());
+    checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant);
+
+    // insert duplicates again
+    for (RowData rowData : TestData.DATA_SET_INSERT_SAME_KEY) {
+      funcWrapper.invoke(rowData);
+    }
+
+    funcWrapper.checkpointFunction(2);
+
+    final OperatorEvent event3 = funcWrapper.getNextEvent(); // remove the first event first
+    final OperatorEvent event4 = funcWrapper.getNextEvent();
+    funcWrapper.getCoordinator().handleEventFromOperator(0, event3);
+    funcWrapper.getCoordinator().handleEventFromOperator(0, event4);
+    funcWrapper.checkpointComplete(2);
+
+    // same with the original base file content.
+    expected.put("par1", "["
+        + "id1,par1,id1,Danny,23,0,par1, "
+        + "id1,par1,id1,Danny,23,0,par1, "
+        + "id1,par1,id1,Danny,23,1,par1, "
+        + "id1,par1,id1,Danny,23,1,par1, "
+        + "id1,par1,id1,Danny,23,2,par1, "
+        + "id1,par1,id1,Danny,23,2,par1, "
+        + "id1,par1,id1,Danny,23,3,par1, "
+        + "id1,par1,id1,Danny,23,3,par1, "
+        + "id1,par1,id1,Danny,23,4,par1, "
+        + "id1,par1,id1,Danny,23,4,par1]");
+    TestData.checkWrittenAllData(tempFile, expected, 1);
+  }
+
+  @Test
   public void testInsertWithSmallBufferSize() throws Exception {
     // reset the config option
     conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.0006); // 630 bytes buffer size
diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
index 07e23b5..fa4f92b 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
@@ -37,6 +37,7 @@
 import org.apache.flink.configuration.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import java.io.File;
 import java.util.Comparator;
@@ -67,6 +68,11 @@
     conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
   }
 
+  @Test
+  public void testInsertAllowsDuplication() {
+    // ignore the test because only COW table supports INSERT duplication
+  }
+
   @Override
   protected void checkWrittenData(File baseFile, Map<String, String> expected, int partitions) throws Exception {
     HoodieTableMetaClient metaClient = HoodieFlinkTable.create(writeConfig, context).getMetaClient();
diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
index 13a71ec..de54d90 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
@@ -22,7 +22,6 @@
 import org.apache.hudi.configuration.FlinkOptions;
 
 import org.apache.flink.configuration.Configuration;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 import java.util.HashMap;
@@ -39,10 +38,14 @@
     conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
   }
 
-  @Disabled
   @Test
-  public void testIndexStateBootstrap() {
-    // Ignore the index bootstrap because we only support parquet load now.
+  public void testInsertAllowsDuplication() {
+    // ignore the test because only COW table supports INSERT duplication
+  }
+
+  @Override
+  protected Map<String, String> getExpectedBeforeCheckpointComplete() {
+    return EXPECTED1;
   }
 
   protected Map<String, String> getMiniBatchExpected() {
diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
index 799739c..21f1647 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
@@ -340,6 +340,24 @@
     assertThat(conf2.getInteger(FlinkOptions.ARCHIVE_MAX_COMMITS), is(45));
   }
 
+  @Test
+  void testMorTableInsertAllowDuplication() {
+    TableSchema schema = TableSchema.builder()
+        .field("f0", DataTypes.INT().notNull())
+        .field("f1", DataTypes.VARCHAR(20))
+        .field("f2", DataTypes.TIMESTAMP(3))
+        .field("ts", DataTypes.TIMESTAMP(3))
+        .primaryKey("f0")
+        .build();
+    // overwrite the operation
+    this.conf.setString(FlinkOptions.OPERATION.key(), "insert");
+    this.conf.setString(FlinkOptions.TABLE_TYPE.key(), FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
+
+    final MockContext sinkContext = MockContext.getInstance(this.conf, schema, "f2");
+    assertThrows(ValidationException.class, () -> new HoodieTableFactory().createDynamicTableSink(sinkContext),
+        "Option 'write.insert.allow_dup' is only allowed for COPY_ON_WRITE table.");
+  }
+
   // -------------------------------------------------------------------------
   //  Inner Class
   // -------------------------------------------------------------------------
diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index 5ddb99c..f5ac9c5 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -395,6 +395,48 @@
   }
 
   /**
+   * Checks the source data set are written as expected.
+   * Different with {@link #checkWrittenData}, it reads all the data files.
+   *
+   * <p>Note: Replace it with the Flink reader when it is supported.
+   *
+   * @param baseFile   The file base to check, should be a directory
+   * @param expected   The expected results mapping, the key should be the partition path
+   *                   and value should be values list with the key partition
+   * @param partitions The expected partition number
+   */
+  public static void checkWrittenAllData(
+      File baseFile,
+      Map<String, String> expected,
+      int partitions) throws IOException {
+    assert baseFile.isDirectory();
+    FileFilter filter = file -> !file.getName().startsWith(".");
+    File[] partitionDirs = baseFile.listFiles(filter);
+
+    assertNotNull(partitionDirs);
+    assertThat(partitionDirs.length, is(partitions));
+
+    for (File partitionDir : partitionDirs) {
+      File[] dataFiles = partitionDir.listFiles(filter);
+      assertNotNull(dataFiles);
+
+      List<String> readBuffer = new ArrayList<>();
+      for (File dataFile : dataFiles) {
+        ParquetReader<GenericRecord> reader = AvroParquetReader
+            .<GenericRecord>builder(new Path(dataFile.getAbsolutePath())).build();
+        GenericRecord nextRecord = reader.read();
+        while (nextRecord != null) {
+          readBuffer.add(filterOutVariables(nextRecord));
+          nextRecord = reader.read();
+        }
+      }
+
+      readBuffer.sort(Comparator.naturalOrder());
+      assertThat(readBuffer.toString(), is(expected.get(partitionDir.getName())));
+    }
+  }
+
+  /**
    * Checks the source data are written as expected.
    *
    * <p>Note: Replace it with the Flink reader when it is supported.