[HUDI-2087] Support Append only in Flink stream (#3390)
Co-authored-by: 喻兆靖 <yuzhaojing@bilibili.com>
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 07b419e..5b751e4 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -437,6 +437,12 @@
final HoodieRecordLocation loc = record.getCurrentLocation();
final String fileID = loc.getFileId();
final String partitionPath = record.getPartitionPath();
+ // Always use FlinkCreateHandle when insert duplication turns on
+ if (config.allowDuplicateInserts()) {
+ return new FlinkCreateHandle<>(config, instantTime, table, partitionPath,
+ fileID, table.getTaskContextSupplier());
+ }
+
if (bucketToHandles.containsKey(fileID)) {
MiniBatchHandle lastHandle = (MiniBatchHandle) bucketToHandles.get(fileID);
if (lastHandle.shouldReplace()) {
diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index f839b5e..34b050c 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -209,6 +209,12 @@
.defaultValue(TABLE_TYPE_COPY_ON_WRITE)
.withDescription("Type of table to write. COPY_ON_WRITE (or) MERGE_ON_READ");
+ public static final ConfigOption<Boolean> INSERT_ALLOW_DUP = ConfigOptions
+ .key("write.insert.allow_dup")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription("Whether to allow data duplication for INSERT operation, if enabled, writes the base files directly, default true");
+
public static final ConfigOption<String> OPERATION = ConfigOptions
.key("write.operation")
.stringType()
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
index 688cf1b..e6c59b1 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
@@ -91,8 +91,6 @@
private final Configuration conf;
- private transient org.apache.hadoop.conf.Configuration hadoopConf;
-
private final boolean isChangingRecords;
/**
@@ -117,21 +115,25 @@
public void open(Configuration parameters) throws Exception {
super.open(parameters);
HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(this.conf);
- this.hadoopConf = StreamerUtil.getHadoopConf();
HoodieFlinkEngineContext context = new HoodieFlinkEngineContext(
- new SerializableConfiguration(this.hadoopConf),
+ new SerializableConfiguration(StreamerUtil.getHadoopConf()),
new FlinkTaskContextSupplier(getRuntimeContext()));
this.bucketAssigner = BucketAssigners.create(
getRuntimeContext().getIndexOfThisSubtask(),
getRuntimeContext().getMaxNumberOfParallelSubtasks(),
getRuntimeContext().getNumberOfParallelSubtasks(),
- WriteOperationType.isOverwrite(WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION))),
+ ignoreSmallFiles(writeConfig),
HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE)),
context,
writeConfig);
this.payloadCreation = PayloadCreation.instance(this.conf);
}
+ private boolean ignoreSmallFiles(HoodieWriteConfig writeConfig) {
+ WriteOperationType operationType = WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION));
+ return WriteOperationType.isOverwrite(operationType) || writeConfig.allowDuplicateInserts();
+ }
+
@Override
public void snapshotState(FunctionSnapshotContext context) {
this.bucketAssigner.reset();
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigners.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigners.java
index 8d304db..13d4587 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigners.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigners.java
@@ -35,25 +35,25 @@
/**
* Creates a {@code BucketAssigner}.
*
- * @param taskID The task ID
- * @param maxParallelism The max parallelism
- * @param numTasks The number of tasks
- * @param overwrite Whether the write operation is OVERWRITE
- * @param tableType The table type
- * @param context The engine context
- * @param config The configuration
+ * @param taskID The task ID
+ * @param maxParallelism The max parallelism
+ * @param numTasks The number of tasks
+ * @param ignoreSmallFiles Whether to ignore the small files
+ * @param tableType The table type
+ * @param context The engine context
+ * @param config The configuration
* @return the bucket assigner instance
*/
public static BucketAssigner create(
int taskID,
int maxParallelism,
int numTasks,
- boolean overwrite,
+ boolean ignoreSmallFiles,
HoodieTableType tableType,
HoodieFlinkEngineContext context,
HoodieWriteConfig config) {
boolean delta = tableType.equals(HoodieTableType.MERGE_ON_READ);
- WriteProfile writeProfile = WriteProfiles.singleton(overwrite, delta, config, context);
+ WriteProfile writeProfile = WriteProfiles.singleton(ignoreSmallFiles, delta, config, context);
return new BucketAssigner(taskID, maxParallelism, numTasks, writeProfile, config);
}
}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/OverwriteWriteProfile.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/EmptyWriteProfile.java
similarity index 74%
rename from hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/OverwriteWriteProfile.java
rename to hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/EmptyWriteProfile.java
index 8b08446..3cdd798 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/OverwriteWriteProfile.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/EmptyWriteProfile.java
@@ -26,13 +26,18 @@
import java.util.List;
/**
- * WriteProfile for INSERT OVERWRITE and INSERT OVERWRITE TABLE operations,
- * this WriteProfile always skip the existing small files because of the 'OVERWRITE' semantics.
+ * WriteProfile that always return empty small files.
+ *
+ * <p>This write profile is used for cases:
+ * i). INSERT OVERWRITE and INSERT OVERWRITE TABLE operations,
+ * the existing small files are ignored because of the 'OVERWRITE' semantics;
+ * ii). INSERT operation when data file merge is disabled.
+ *
*
* <p>Note: assumes the index can always index log files for Flink write.
*/
-public class OverwriteWriteProfile extends WriteProfile {
- public OverwriteWriteProfile(HoodieWriteConfig config, HoodieFlinkEngineContext context) {
+public class EmptyWriteProfile extends WriteProfile {
+ public EmptyWriteProfile(HoodieWriteConfig config, HoodieFlinkEngineContext context) {
super(config, context);
}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java
index 093cef5..1277b19 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java
@@ -53,22 +53,22 @@
private WriteProfiles() {}
- public static synchronized WriteProfile singleton(
- boolean overwrite,
+ public static synchronized WriteProfile singleton(
+ boolean ignoreSmallFiles,
boolean delta,
HoodieWriteConfig config,
HoodieFlinkEngineContext context) {
return PROFILES.computeIfAbsent(config.getBasePath(),
- k -> getWriteProfile(overwrite, delta, config, context));
+ k -> getWriteProfile(ignoreSmallFiles, delta, config, context));
}
private static WriteProfile getWriteProfile(
- boolean overwrite,
+ boolean ignoreSmallFiles,
boolean delta,
HoodieWriteConfig config,
HoodieFlinkEngineContext context) {
- if (overwrite) {
- return new OverwriteWriteProfile(config, context);
+ if (ignoreSmallFiles) {
+ return new EmptyWriteProfile(config, context);
} else if (delta) {
return new DeltaWriteProfile(config, context);
} else {
diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
index 4597d09..a7ef14f 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
@@ -69,6 +69,9 @@
@Parameter(names = {"--table-type"}, description = "Type of table. COPY_ON_WRITE (or) MERGE_ON_READ.", required = true)
public String tableType;
+ @Parameter(names = {"--insert-allow-dup"}, description = "Whether to allow data duplication for INSERT operation, if enabled, writes the base files directly.", required = true)
+ public Boolean insertAllowDup = true;
+
@Parameter(names = {"--props"}, description = "Path to properties file on localfs or dfs, with configurations for "
+ "hoodie client, schema provider, key generator and data source. For hoodie client props, sane defaults are "
+ "used, but recommend use to provide basic things like metrics endpoints, hive configs etc. For sources, refer"
@@ -305,6 +308,7 @@
conf.setString(FlinkOptions.TABLE_NAME, config.targetTableName);
// copy_on_write works same as COPY_ON_WRITE
conf.setString(FlinkOptions.TABLE_TYPE, config.tableType.toUpperCase());
+ conf.setBoolean(FlinkOptions.INSERT_ALLOW_DUP, config.insertAllowDup);
conf.setString(FlinkOptions.OPERATION, config.operation.value());
conf.setString(FlinkOptions.PRECOMBINE_FIELD, config.sourceOrderingField);
conf.setString(FlinkOptions.PAYLOAD_CLASS, config.payloadClassName);
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
index a8fe93b..d3c5388 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
@@ -62,7 +62,7 @@
Configuration conf = (Configuration) helper.getOptions();
TableSchema schema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
- validateRequiredFields(conf, schema);
+ sanityCheck(conf, schema);
setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), context.getCatalogTable(), schema);
Path path = new Path(conf.getOptional(FlinkOptions.PATH).orElseThrow(() ->
@@ -79,7 +79,7 @@
public DynamicTableSink createDynamicTableSink(Context context) {
Configuration conf = FlinkOptions.fromMap(context.getCatalogTable().getOptions());
TableSchema schema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
- validateRequiredFields(conf, schema);
+ sanityCheck(conf, schema);
setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), context.getCatalogTable(), schema);
return new HoodieTableSink(conf, schema);
}
@@ -103,12 +103,13 @@
// Utilities
// -------------------------------------------------------------------------
- /** Validate required options. For e.g, record key and pre_combine key.
+ /**
+ * The sanity check.
*
* @param conf The table options
* @param schema The table schema
*/
- private void validateRequiredFields(Configuration conf, TableSchema schema) {
+ private void sanityCheck(Configuration conf, TableSchema schema) {
List<String> fields = Arrays.stream(schema.getFieldNames()).collect(Collectors.toList());
// validate record key in pk absence.
@@ -128,6 +129,11 @@
throw new ValidationException("Field " + preCombineField + " does not exist in the table schema."
+ "Please check 'write.precombine.field' option.");
}
+
+ if (conf.getString(FlinkOptions.TABLE_TYPE).toUpperCase().equals(FlinkOptions.TABLE_TYPE_MERGE_ON_READ)
+ && conf.getBoolean(FlinkOptions.INSERT_ALLOW_DUP)) {
+ throw new ValidationException("Option 'write.insert.allow_dup' is only allowed for COPY_ON_WRITE table.");
+ }
}
/**
diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index d42993b..7967b69 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -27,6 +27,7 @@
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.util.Option;
@@ -145,6 +146,7 @@
.withEngineType(EngineType.FLINK)
.withPath(conf.getString(FlinkOptions.PATH))
.combineInput(conf.getBoolean(FlinkOptions.INSERT_DROP_DUPS), true)
+ .withMergeAllowDuplicateOnInserts(allowDuplicateInserts(conf))
.withCompactionConfig(
HoodieCompactionConfig.newBuilder()
.withPayloadClass(conf.getString(FlinkOptions.PAYLOAD_CLASS))
@@ -345,4 +347,9 @@
throw new IOException("Could not load transformer class(es) " + classNames, e);
}
}
+
+ public static boolean allowDuplicateInserts(Configuration conf) {
+ WriteOperationType operationType = WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION));
+ return operationType == WriteOperationType.INSERT && conf.getBoolean(FlinkOptions.INSERT_ALLOW_DUP);
+ }
}
diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
index 7e060f7..29a7455 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
@@ -23,6 +23,7 @@
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
@@ -533,6 +534,81 @@
}
@Test
+ public void testInsertAllowsDuplication() throws Exception {
+ // reset the config option
+ conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0006); // 630 bytes batch size
+ conf.setString(FlinkOptions.OPERATION, WriteOperationType.INSERT.value());
+ funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
+
+ // open the function and ingest data
+ funcWrapper.openFunction();
+ // Each record is 208 bytes. so 4 records expect to trigger a mini-batch write
+ for (RowData rowData : TestData.DATA_SET_INSERT_SAME_KEY) {
+ funcWrapper.invoke(rowData);
+ }
+
+ // this triggers the data write and event send
+ funcWrapper.checkpointFunction(1);
+ Map<String, List<HoodieRecord>> dataBuffer = funcWrapper.getDataBuffer();
+ assertThat("All data should be flushed out", dataBuffer.size(), is(0));
+
+ final OperatorEvent event1 = funcWrapper.getNextEvent(); // remove the first event first
+ final OperatorEvent event2 = funcWrapper.getNextEvent();
+ assertThat("The operator expect to send an event", event2, instanceOf(WriteMetadataEvent.class));
+
+ funcWrapper.getCoordinator().handleEventFromOperator(0, event1);
+ funcWrapper.getCoordinator().handleEventFromOperator(0, event2);
+ assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
+
+ String instant = funcWrapper.getWriteClient()
+ .getLastPendingInstant(getTableType());
+
+ funcWrapper.checkpointComplete(1);
+
+ Map<String, String> expected = new HashMap<>();
+
+ expected.put("par1", "["
+ + "id1,par1,id1,Danny,23,0,par1, "
+ + "id1,par1,id1,Danny,23,1,par1, "
+ + "id1,par1,id1,Danny,23,2,par1, "
+ + "id1,par1,id1,Danny,23,3,par1, "
+ + "id1,par1,id1,Danny,23,4,par1]");
+
+ TestData.checkWrittenAllData(tempFile, expected, 1);
+
+ // started a new instant already
+ checkInflightInstant(funcWrapper.getWriteClient());
+ checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant);
+
+ // insert duplicates again
+ for (RowData rowData : TestData.DATA_SET_INSERT_SAME_KEY) {
+ funcWrapper.invoke(rowData);
+ }
+
+ funcWrapper.checkpointFunction(2);
+
+ final OperatorEvent event3 = funcWrapper.getNextEvent(); // remove the first event first
+ final OperatorEvent event4 = funcWrapper.getNextEvent();
+ funcWrapper.getCoordinator().handleEventFromOperator(0, event3);
+ funcWrapper.getCoordinator().handleEventFromOperator(0, event4);
+ funcWrapper.checkpointComplete(2);
+
+ // same with the original base file content.
+ expected.put("par1", "["
+ + "id1,par1,id1,Danny,23,0,par1, "
+ + "id1,par1,id1,Danny,23,0,par1, "
+ + "id1,par1,id1,Danny,23,1,par1, "
+ + "id1,par1,id1,Danny,23,1,par1, "
+ + "id1,par1,id1,Danny,23,2,par1, "
+ + "id1,par1,id1,Danny,23,2,par1, "
+ + "id1,par1,id1,Danny,23,3,par1, "
+ + "id1,par1,id1,Danny,23,3,par1, "
+ + "id1,par1,id1,Danny,23,4,par1, "
+ + "id1,par1,id1,Danny,23,4,par1]");
+ TestData.checkWrittenAllData(tempFile, expected, 1);
+ }
+
+ @Test
public void testInsertWithSmallBufferSize() throws Exception {
// reset the config option
conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.0006); // 630 bytes buffer size
diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
index 07e23b5..fa4f92b 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
@@ -37,6 +37,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import java.io.File;
import java.util.Comparator;
@@ -67,6 +68,11 @@
conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
}
+ @Test
+ public void testInsertAllowsDuplication() {
+ // ignore the test because only COW table supports INSERT duplication
+ }
+
@Override
protected void checkWrittenData(File baseFile, Map<String, String> expected, int partitions) throws Exception {
HoodieTableMetaClient metaClient = HoodieFlinkTable.create(writeConfig, context).getMetaClient();
diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
index 13a71ec..de54d90 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
@@ -22,7 +22,6 @@
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.flink.configuration.Configuration;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
@@ -39,10 +38,14 @@
conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
}
- @Disabled
@Test
- public void testIndexStateBootstrap() {
- // Ignore the index bootstrap because we only support parquet load now.
+ public void testInsertAllowsDuplication() {
+ // ignore the test because only COW table supports INSERT duplication
+ }
+
+ @Override
+ protected Map<String, String> getExpectedBeforeCheckpointComplete() {
+ return EXPECTED1;
}
protected Map<String, String> getMiniBatchExpected() {
diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
index 799739c..21f1647 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
@@ -340,6 +340,24 @@
assertThat(conf2.getInteger(FlinkOptions.ARCHIVE_MAX_COMMITS), is(45));
}
+ @Test
+ void testMorTableInsertAllowDuplication() {
+ TableSchema schema = TableSchema.builder()
+ .field("f0", DataTypes.INT().notNull())
+ .field("f1", DataTypes.VARCHAR(20))
+ .field("f2", DataTypes.TIMESTAMP(3))
+ .field("ts", DataTypes.TIMESTAMP(3))
+ .primaryKey("f0")
+ .build();
+ // overwrite the operation
+ this.conf.setString(FlinkOptions.OPERATION.key(), "insert");
+ this.conf.setString(FlinkOptions.TABLE_TYPE.key(), FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
+
+ final MockContext sinkContext = MockContext.getInstance(this.conf, schema, "f2");
+ assertThrows(ValidationException.class, () -> new HoodieTableFactory().createDynamicTableSink(sinkContext),
+ "Option 'write.insert.allow_dup' is only allowed for COPY_ON_WRITE table.");
+ }
+
// -------------------------------------------------------------------------
// Inner Class
// -------------------------------------------------------------------------
diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index 5ddb99c..f5ac9c5 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -395,6 +395,48 @@
}
/**
+ * Checks the source data set are written as expected.
+ * Different with {@link #checkWrittenData}, it reads all the data files.
+ *
+ * <p>Note: Replace it with the Flink reader when it is supported.
+ *
+ * @param baseFile The file base to check, should be a directory
+ * @param expected The expected results mapping, the key should be the partition path
+ * and value should be values list with the key partition
+ * @param partitions The expected partition number
+ */
+ public static void checkWrittenAllData(
+ File baseFile,
+ Map<String, String> expected,
+ int partitions) throws IOException {
+ assert baseFile.isDirectory();
+ FileFilter filter = file -> !file.getName().startsWith(".");
+ File[] partitionDirs = baseFile.listFiles(filter);
+
+ assertNotNull(partitionDirs);
+ assertThat(partitionDirs.length, is(partitions));
+
+ for (File partitionDir : partitionDirs) {
+ File[] dataFiles = partitionDir.listFiles(filter);
+ assertNotNull(dataFiles);
+
+ List<String> readBuffer = new ArrayList<>();
+ for (File dataFile : dataFiles) {
+ ParquetReader<GenericRecord> reader = AvroParquetReader
+ .<GenericRecord>builder(new Path(dataFile.getAbsolutePath())).build();
+ GenericRecord nextRecord = reader.read();
+ while (nextRecord != null) {
+ readBuffer.add(filterOutVariables(nextRecord));
+ nextRecord = reader.read();
+ }
+ }
+
+ readBuffer.sort(Comparator.naturalOrder());
+ assertThat(readBuffer.toString(), is(expected.get(partitionDir.getName())));
+ }
+ }
+
+ /**
* Checks the source data are written as expected.
*
* <p>Note: Replace it with the Flink reader when it is supported.