[HUDI-9146][part2] Integrating FileGroup reader into Flink merge reader (#13343)
diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
index 0084fb9..56ebbc8 100644
--- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
+++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
@@ -91,6 +91,11 @@
}
@Override
+ public String getMetaFieldValue(InternalRow record, int pos) {
+ return record.getString(pos);
+ }
+
+ @Override
public HoodieRecord<InternalRow> constructHoodieRecord(BufferedRecord<InternalRow> bufferedRecord) {
HoodieKey hoodieKey = new HoodieKey(bufferedRecord.getRecordKey(), partitionPath);
if (bufferedRecord.isDelete()) {
@@ -162,4 +167,9 @@
}
return value;
}
+
+ @Override
+ public InternalRow getDeleteRow(InternalRow record, String recordKey) {
+ throw new UnsupportedOperationException("Not supported for " + this.getClass().getSimpleName());
+ }
}
diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
index ec9efab..1a08c69 100644
--- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
+++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
@@ -25,6 +25,7 @@
import org.apache.hudi.common.engine.HoodieReaderContext
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.common.table.read.PositionBasedFileGroupRecordBuffer.ROW_INDEX_TEMPORARY_COLUMN_NAME
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.common.util.collection.{CachingIterator, ClosableIterator, Pair => HPair}
@@ -35,7 +36,6 @@
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericRecord, IndexedRecord}
import org.apache.hadoop.conf.Configuration
-import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.spark.sql.HoodieInternalRowUtils
import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSerializer}
import org.apache.spark.sql.catalyst.InternalRow
diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
index ef100f1..c15d2b6 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
@@ -99,6 +99,11 @@
}
@Override
+ public IndexedRecord getDeleteRow(IndexedRecord record, String recordKey) {
+ throw new UnsupportedOperationException("Not supported for " + this.getClass().getSimpleName());
+ }
+
+ @Override
public Option<HoodieRecordMerger> getRecordMerger(RecordMergeMode mergeMode, String mergeStrategyId, String mergeImplClasses) {
switch (mergeMode) {
case EVENT_TIME_ORDERING:
@@ -122,6 +127,11 @@
}
@Override
+ public String getMetaFieldValue(IndexedRecord record, int pos) {
+ return record.get(pos).toString();
+ }
+
+ @Override
public HoodieRecord<IndexedRecord> constructHoodieRecord(BufferedRecord<IndexedRecord> bufferedRecord) {
if (bufferedRecord.isDelete()) {
return SpillableMapUtils.generateEmptyPayload(
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
index 638c58a..61f2dea 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
@@ -23,11 +23,13 @@
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.log.InstantRange;
import org.apache.hudi.common.table.read.BufferedRecord;
import org.apache.hudi.common.table.read.FileGroupReaderSchemaHandler;
import org.apache.hudi.common.util.LocalAvroSchemaCache;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.CloseableFilterIterator;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.storage.HoodieStorage;
@@ -46,6 +48,7 @@
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
+import java.util.function.Predicate;
import java.util.function.UnaryOperator;
import static org.apache.hudi.common.model.HoodieRecord.DEFAULT_ORDERING_VALUE;
@@ -74,6 +77,7 @@
private Boolean needsBootstrapMerge = null;
private Boolean shouldMergeUseRecordPosition = null;
protected String partitionPath;
+ protected Option<InstantRange> instantRangeOpt = Option.empty();
// for encoding and decoding schemas to the spillable map
private final LocalAvroSchemaCache localAvroSchemaCache = LocalAvroSchemaCache.getInstance();
@@ -208,6 +212,23 @@
public abstract T convertAvroRecord(IndexedRecord avroRecord);
public abstract GenericRecord convertToAvroRecord(T record, Schema schema);
+
+ /**
+ * There are two cases to handle:
+ * 1). Return the delete record if it's not null;
+ * 2). otherwise fills an empty row with record key fields and returns.
+ *
+ * <p>For case2, when `emitDelete` is true for FileGroup reader and payload for DELETE record is empty,
+ * a record key row is emitted to downstream to delete data from storage by record key with the best effort.
+ * Returns null if the primary key semantics been lost: the requested schema does not include all the record key fields.
+ *
+ * @param record delete record
+ * @param recordKey record key
+ *
+ * @return Engine specific row which contains record key fields.
+ */
+ @Nullable
+ public abstract T getDeleteRow(T record, String recordKey);
/**
* @param mergeMode record merge mode
@@ -229,6 +250,16 @@
public abstract Object getValue(T record, Schema schema, String fieldName);
/**
+ * Get value of metadata field in a more efficient way than #getValue.
+ *
+ * @param record The record in engine-specific type.
+ * @param pos The position of the metadata field.
+ *
+ * @return The value for the target metadata field.
+ */
+ public abstract String getMetaFieldValue(T record, int pos);
+
+ /**
* Cast to Java boolean value.
* If the object is not compatible with boolean type, throws.
*/
@@ -242,6 +273,28 @@
}
/**
+ * Get the {@link InstantRange} filter.
+ */
+ public Option<InstantRange> getInstantRange() {
+ return instantRangeOpt;
+ }
+
+ /**
+ * Apply the {@link InstantRange} filter to the file record iterator.
+ *
+ * @param fileRecordIterator File record iterator.
+ *
+ * @return File record iterator filter by {@link InstantRange}.
+ */
+ public ClosableIterator<T> applyInstantRangeFilter(ClosableIterator<T> fileRecordIterator) {
+ InstantRange instantRange = getInstantRange().get();
+ final Schema.Field commitTimeField = schemaHandler.getRequiredSchema().getField(HoodieRecord.COMMIT_TIME_METADATA_FIELD);
+ final int commitTimePos = commitTimeField.pos();
+ Predicate<T> instantFilter = row -> instantRange.isInRange(getMetaFieldValue(row, commitTimePos));
+ return new CloseableFilterIterator<>(fileRecordIterator, instantFilter);
+ }
+
+ /**
* Gets the record key in String.
*
* @param record The record in engine-specific type.
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java b/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java
index e39dbf3..816ec21 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java
@@ -90,6 +90,7 @@
private final Option<Pair<String, String>> customDeleteMarkerKeyValue;
private final boolean hasBuiltInDelete;
+ private final int hoodieOperationPos;
public FileGroupReaderSchemaHandler(HoodieReaderContext<T> readerContext,
Schema tableSchema,
@@ -109,6 +110,7 @@
this.customDeleteMarkerKeyValue = deleteConfigs.getLeft();
this.hasBuiltInDelete = deleteConfigs.getRight();
this.requiredSchema = AvroSchemaCache.intern(prepareRequiredSchema());
+ this.hoodieOperationPos = Option.ofNullable(requiredSchema.getField(HoodieRecord.OPERATION_METADATA_FIELD)).map(Schema.Field::pos).orElse(-1);
this.internalSchema = pruneInternalSchema(requiredSchema, internalSchemaOpt);
this.internalSchemaOpt = getInternalSchemaOpt(internalSchemaOpt);
readerContext.setNeedsBootstrapMerge(this.needsBootstrapMerge);
@@ -150,6 +152,10 @@
return hasBuiltInDelete;
}
+ public int getHoodieOperationPos() {
+ return hoodieOperationPos;
+ }
+
private InternalSchema pruneInternalSchema(Schema requiredSchema, Option<InternalSchema> internalSchemaOption) {
if (!internalSchemaOption.isPresent()) {
return InternalSchema.getEmptyInternalSchema();
@@ -172,8 +178,13 @@
@VisibleForTesting
Schema generateRequiredSchema() {
- //might need to change this if other queries than mor have mandatory fields
+ boolean hasInstantRange = readerContext.getInstantRange().isPresent();
if (!needsMORMerge) {
+ if (hasInstantRange && !findNestedField(requestedSchema, HoodieRecord.COMMIT_TIME_METADATA_FIELD).isPresent()) {
+ List<Schema.Field> addedFields = new ArrayList<>();
+ addedFields.add(getField(tableSchema, HoodieRecord.COMMIT_TIME_METADATA_FIELD));
+ return appendFieldsToSchemaDedupNested(requestedSchema, addedFields);
+ }
return requestedSchema;
}
@@ -186,14 +197,9 @@
List<Schema.Field> addedFields = new ArrayList<>();
for (String field : getMandatoryFieldsForMerging(
hoodieTableConfig, properties, tableSchema, recordMerger,
- hasBuiltInDelete, customDeleteMarkerKeyValue)) {
+ hasBuiltInDelete, customDeleteMarkerKeyValue, hasInstantRange)) {
if (!findNestedField(requestedSchema, field).isPresent()) {
- Option<Schema.Field> foundFieldOpt = findNestedField(tableSchema, field);
- if (!foundFieldOpt.isPresent()) {
- throw new IllegalArgumentException("Field: " + field + " does not exist in the table schema");
- }
- Schema.Field foundField = foundFieldOpt.get();
- addedFields.add(foundField);
+ addedFields.add(getField(tableSchema, field));
}
}
@@ -209,7 +215,8 @@
Schema tableSchema,
Option<HoodieRecordMerger> recordMerger,
boolean hasBuiltInDelete,
- Option<Pair<String, String>> customDeleteMarkerKeyAndValue) {
+ Option<Pair<String, String>> customDeleteMarkerKeyAndValue,
+ boolean hasInstantRange) {
Triple<RecordMergeMode, String, String> mergingConfigs = HoodieTableConfig.inferCorrectMergingBehavior(
cfg.getRecordMergeMode(),
cfg.getPayloadClass(),
@@ -223,6 +230,11 @@
// Use Set to avoid duplicated fields.
Set<String> requiredFields = new HashSet<>();
+
+ if (hasInstantRange) {
+ requiredFields.add(HoodieRecord.COMMIT_TIME_METADATA_FIELD);
+ }
+
// Add record key fields.
if (cfg.populateMetaFields()) {
requiredFields.add(HoodieRecord.RECORD_KEY_METADATA_FIELD);
@@ -292,6 +304,17 @@
}
/**
+ * Get {@link Schema.Field} from {@link Schema} by field name.
+ */
+ private static Schema.Field getField(Schema schema, String fieldName) {
+ Option<Schema.Field> foundFieldOpt = findNestedField(schema, fieldName);
+ if (!foundFieldOpt.isPresent()) {
+ throw new IllegalArgumentException("Field: " + fieldName + " does not exist in the table schema");
+ }
+ return foundFieldOpt.get();
+ }
+
+ /**
* Fetches the delete configs from the configs.
*
* @param props write and table configs that contain delete related properties
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java b/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java
index 0f89a6c..020e19c 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java
@@ -86,6 +86,7 @@
protected final HoodieReadStats readStats;
protected final boolean shouldCheckCustomDeleteMarker;
protected final boolean shouldCheckBuiltInDeleteMarker;
+ protected final boolean emitDelete;
protected ClosableIterator<T> baseFileIterator;
protected Iterator<BufferedRecord<T>> logRecordIterator;
protected T nextRecord;
@@ -99,7 +100,8 @@
RecordMergeMode recordMergeMode,
TypedProperties props,
HoodieReadStats readStats,
- Option<String> orderingFieldName) {
+ Option<String> orderingFieldName,
+ boolean emitDelete) {
this.readerContext = readerContext;
this.readerSchema = AvroSchemaCache.intern(readerContext.getSchemaHandler().getRequiredSchema());
this.recordMergeMode = recordMergeMode;
@@ -120,6 +122,7 @@
boolean isBitCaskDiskMapCompressionEnabled = props.getBoolean(DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue());
this.readStats = readStats;
+ this.emitDelete = emitDelete;
try {
// Store merged records for all versions for this log file, set the in-memory footprint to maxInMemoryMapSize
this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes, spillableMapBasePath, new DefaultSizeEstimator<>(),
@@ -163,6 +166,18 @@
return columnValue != null && readerContext.castToBoolean(columnValue);
}
+ /**
+ * Returns whether the record is a DELETE marked by the '_hoodie_operation' field.
+ */
+ protected final boolean isDeleteHoodieOperation(T record) {
+ int hoodieOperationPos = readerContext.getSchemaHandler().getHoodieOperationPos();
+ if (hoodieOperationPos < 0) {
+ return false;
+ }
+ String hoodieOperation = readerContext.getMetaFieldValue(record, hoodieOperationPos);
+ return hoodieOperation != null && HoodieOperation.isDeleteRecord(hoodieOperation);
+ }
+
@Override
public void setBaseFileIterator(ClosableIterator<T> baseFileIterator) {
this.baseFileIterator = baseFileIterator;
@@ -539,15 +554,20 @@
protected boolean hasNextBaseRecord(T baseRecord, BufferedRecord<T> logRecordInfo) throws IOException {
if (logRecordInfo != null) {
- BufferedRecord<T> bufferedRecord = BufferedRecord.forRecordWithContext(baseRecord, readerSchema, readerContext, orderingFieldName, false);
- Pair<Boolean, T> isDeleteAndRecord = merge(bufferedRecord, logRecordInfo);
+ BufferedRecord<T> baseRecordInfo = BufferedRecord.forRecordWithContext(baseRecord, readerSchema, readerContext, orderingFieldName, false);
+ Pair<Boolean, T> isDeleteAndRecord = merge(baseRecordInfo, logRecordInfo);
if (!isDeleteAndRecord.getLeft()) {
// Updates
nextRecord = readerContext.seal(isDeleteAndRecord.getRight());
readStats.incrementNumUpdates();
return true;
+ } else if (emitDelete) {
+ // emit Deletes
+ nextRecord = readerContext.getDeleteRow(isDeleteAndRecord.getRight(), baseRecordInfo.getRecordKey());
+ readStats.incrementNumDeletes();
+ return nextRecord != null;
} else {
- // Deletes
+ // not emit Deletes
readStats.incrementNumDeletes();
return false;
}
@@ -570,6 +590,12 @@
nextRecord = nextRecordInfo.getRecord();
readStats.incrementNumInserts();
return true;
+ } else if (emitDelete) {
+ nextRecord = readerContext.getDeleteRow(nextRecordInfo.getRecord(), nextRecordInfo.getRecordKey());
+ readStats.incrementNumDeletes();
+ if (nextRecord != null) {
+ return true;
+ }
} else {
readStats.incrementNumDeletes();
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
index ed9e258..13dc39a 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
@@ -112,13 +112,13 @@
long start, long length, boolean shouldUseRecordPosition) {
this(readerContext, storage, tablePath, latestCommitTime, fileSlice, dataSchema,
requestedSchema, internalSchemaOpt, hoodieTableMetaClient, props, start, length,
- shouldUseRecordPosition, false);
+ shouldUseRecordPosition, false, false);
}
private HoodieFileGroupReader(HoodieReaderContext<T> readerContext, HoodieStorage storage, String tablePath,
String latestCommitTime, FileSlice fileSlice, Schema dataSchema, Schema requestedSchema,
Option<InternalSchema> internalSchemaOpt, HoodieTableMetaClient hoodieTableMetaClient, TypedProperties props,
- long start, long length, boolean shouldUseRecordPosition, boolean allowInflightInstants) {
+ long start, long length, boolean shouldUseRecordPosition, boolean allowInflightInstants, boolean emitDelete) {
this.readerContext = readerContext;
this.storage = storage;
this.hoodieBaseFileOption = fileSlice.getBaseFile();
@@ -169,7 +169,7 @@
this.readStats = new HoodieReadStats();
this.recordBuffer = getRecordBuffer(readerContext, hoodieTableMetaClient,
recordMergeMode, props, hoodieBaseFileOption, this.logFiles.isEmpty(),
- isSkipMerge, shouldUseRecordPosition, readStats);
+ isSkipMerge, shouldUseRecordPosition, readStats, emitDelete);
this.allowInflightInstants = allowInflightInstants;
}
@@ -184,18 +184,19 @@
boolean hasNoLogFiles,
boolean isSkipMerge,
boolean shouldUseRecordPosition,
- HoodieReadStats readStats) {
+ HoodieReadStats readStats,
+ boolean emitDelete) {
if (hasNoLogFiles) {
return null;
} else if (isSkipMerge) {
return new UnmergedFileGroupRecordBuffer<>(
- readerContext, hoodieTableMetaClient, recordMergeMode, props, readStats);
+ readerContext, hoodieTableMetaClient, recordMergeMode, props, readStats, emitDelete);
} else if (shouldUseRecordPosition && baseFileOption.isPresent()) {
return new PositionBasedFileGroupRecordBuffer<>(
- readerContext, hoodieTableMetaClient, recordMergeMode, baseFileOption.get().getCommitTime(), props, readStats, orderingFieldName);
+ readerContext, hoodieTableMetaClient, recordMergeMode, baseFileOption.get().getCommitTime(), props, readStats, orderingFieldName, emitDelete);
} else {
return new KeyBasedFileGroupRecordBuffer<>(
- readerContext, hoodieTableMetaClient, recordMergeMode, props, readStats, orderingFieldName);
+ readerContext, hoodieTableMetaClient, recordMergeMode, props, readStats, orderingFieldName, emitDelete);
}
}
@@ -224,17 +225,19 @@
}
StoragePathInfo baseFileStoragePathInfo = baseFile.getPathInfo();
+ final ClosableIterator<T> recordIterator;
if (baseFileStoragePathInfo != null) {
- return readerContext.getFileRecordIterator(
+ recordIterator = readerContext.getFileRecordIterator(
baseFileStoragePathInfo, start, length,
readerContext.getSchemaHandler().getTableSchema(),
readerContext.getSchemaHandler().getRequiredSchema(), storage);
} else {
- return readerContext.getFileRecordIterator(
+ recordIterator = readerContext.getFileRecordIterator(
baseFile.getStoragePath(), start, length,
readerContext.getSchemaHandler().getTableSchema(),
readerContext.getSchemaHandler().getRequiredSchema(), storage);
}
+ return readerContext.getInstantRange().isPresent() ? readerContext.applyInstantRangeFilter(recordIterator) : recordIterator;
}
private ClosableIterator<T> makeBootstrapBaseFileIterator(HoodieBaseFile baseFile) throws IOException {
@@ -341,6 +344,7 @@
.withLogFiles(logFiles)
.withReverseReader(false)
.withBufferSize(getIntWithAltKeys(props, HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE))
+ .withInstantRange(readerContext.getInstantRange())
.withPartition(getRelativePartitionPath(
new StoragePath(path), logFiles.get(0).getPath().getParent()))
.withRecordBuffer(recordBuffer)
@@ -435,6 +439,7 @@
private long length = Long.MAX_VALUE;
private boolean shouldUseRecordPosition = false;
private boolean allowInflightInstants = false;
+ private boolean emitDelete;
public Builder<T> withReaderContext(HoodieReaderContext<T> readerContext) {
this.readerContext = readerContext;
@@ -497,6 +502,11 @@
return this;
}
+ public Builder<T> withEmitDelete(boolean emitDelete) {
+ this.emitDelete = emitDelete;
+ return this;
+ }
+
public HoodieFileGroupReader<T> build() {
ValidationUtils.checkArgument(readerContext != null, "Reader context is required");
ValidationUtils.checkArgument(hoodieTableMetaClient != null, "Hoodie table meta client is required");
@@ -515,7 +525,7 @@
return new HoodieFileGroupReader<>(
readerContext, storage, tablePath, latestCommitTime, fileSlice,
dataSchema, requestedSchema, internalSchemaOpt, hoodieTableMetaClient,
- props, start, length, shouldUseRecordPosition, allowInflightInstants);
+ props, start, length, shouldUseRecordPosition, allowInflightInstants, emitDelete);
}
}
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/read/KeyBasedFileGroupRecordBuffer.java b/hudi-common/src/main/java/org/apache/hudi/common/table/read/KeyBasedFileGroupRecordBuffer.java
index 2ccc740..5bbce2a 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/read/KeyBasedFileGroupRecordBuffer.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/read/KeyBasedFileGroupRecordBuffer.java
@@ -53,8 +53,9 @@
RecordMergeMode recordMergeMode,
TypedProperties props,
HoodieReadStats readStats,
- Option<String> orderingFieldName) {
- super(readerContext, hoodieTableMetaClient, recordMergeMode, props, readStats, orderingFieldName);
+ Option<String> orderingFieldName,
+ boolean emitDelete) {
+ super(readerContext, hoodieTableMetaClient, recordMergeMode, props, readStats, orderingFieldName, emitDelete);
}
@Override
@@ -77,7 +78,7 @@
try (ClosableIterator<T> recordIterator = recordsIteratorSchemaPair.getLeft()) {
while (recordIterator.hasNext()) {
T nextRecord = recordIterator.next();
- boolean isDelete = isBuiltInDeleteRecord(nextRecord) || isCustomDeleteRecord(nextRecord);
+ boolean isDelete = isBuiltInDeleteRecord(nextRecord) || isCustomDeleteRecord(nextRecord) || isDeleteHoodieOperation(nextRecord);
BufferedRecord<T> bufferedRecord = BufferedRecord.forRecordWithContext(nextRecord, schema, readerContext, orderingFieldName, isDelete);
processNextDataRecord(bufferedRecord, bufferedRecord.getRecordKey());
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/read/PositionBasedFileGroupRecordBuffer.java b/hudi-common/src/main/java/org/apache/hudi/common/table/read/PositionBasedFileGroupRecordBuffer.java
index 5df2ea9..152b710 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/read/PositionBasedFileGroupRecordBuffer.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/read/PositionBasedFileGroupRecordBuffer.java
@@ -71,8 +71,9 @@
String baseFileInstantTime,
TypedProperties props,
HoodieReadStats readStats,
- Option<String> orderingFieldName) {
- super(readerContext, hoodieTableMetaClient, recordMergeMode, props, readStats, orderingFieldName);
+ Option<String> orderingFieldName,
+ boolean emitDelete) {
+ super(readerContext, hoodieTableMetaClient, recordMergeMode, props, readStats, orderingFieldName, emitDelete);
this.baseFileInstantTime = baseFileInstantTime;
}
@@ -130,7 +131,7 @@
long recordPosition = recordPositions.get(recordIndex++);
T evolvedNextRecord = schemaTransformerWithEvolvedSchema.getLeft().apply(nextRecord);
- boolean isDelete = isBuiltInDeleteRecord(evolvedNextRecord) || isCustomDeleteRecord(evolvedNextRecord);
+ boolean isDelete = isBuiltInDeleteRecord(evolvedNextRecord) || isCustomDeleteRecord(evolvedNextRecord) || isDeleteHoodieOperation(evolvedNextRecord);
BufferedRecord<T> bufferedRecord = BufferedRecord.forRecordWithContext(evolvedNextRecord, schema, readerContext, orderingFieldName, isDelete);
processNextDataRecord(bufferedRecord, recordPosition);
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/read/UnmergedFileGroupRecordBuffer.java b/hudi-common/src/main/java/org/apache/hudi/common/table/read/UnmergedFileGroupRecordBuffer.java
index c853eb0..a9e1f66 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/read/UnmergedFileGroupRecordBuffer.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/read/UnmergedFileGroupRecordBuffer.java
@@ -50,8 +50,9 @@
HoodieTableMetaClient hoodieTableMetaClient,
RecordMergeMode recordMergeMode,
TypedProperties props,
- HoodieReadStats readStats) {
- super(readerContext, hoodieTableMetaClient, recordMergeMode, props, readStats, Option.empty());
+ HoodieReadStats readStats,
+ boolean emitDelete) {
+ super(readerContext, hoodieTableMetaClient, recordMergeMode, props, readStats, Option.empty(), emitDelete);
this.currentInstantLogBlocks = new ArrayDeque<>();
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/CloseableFilterIterator.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/CloseableFilterIterator.java
new file mode 100644
index 0000000..c706d84
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/CloseableFilterIterator.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util.collection;
+
+import java.util.Iterator;
+import java.util.function.Predicate;
+
+/**
+ * {@link FilterIterator} requiring to be closed after iteration (to cleanup resources)
+ */
+public class CloseableFilterIterator<R> extends FilterIterator<R> implements ClosableIterator<R> {
+
+ public CloseableFilterIterator(Iterator<R> source, Predicate<R> filter) {
+ super(source, filter);
+ }
+
+ @Override
+ public void close() {
+ ((ClosableIterator<R>) source).close();
+ }
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/FilterIterator.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/FilterIterator.java
index 4ebc7c1..37b5565 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/FilterIterator.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/FilterIterator.java
@@ -29,7 +29,7 @@
*/
public class FilterIterator<R> implements Iterator<R> {
- private final Iterator<R> source;
+ protected final Iterator<R> source;
private final Predicate<R> filter;
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupRecordBuffer.java b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupRecordBuffer.java
index 06552a9..71f0b35 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupRecordBuffer.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupRecordBuffer.java
@@ -153,6 +153,7 @@
HoodieTableVersion tableVersion,
String mergeStrategyId) {
HoodieReaderContext readerContext = mock(HoodieReaderContext.class);
+ when(readerContext.getInstantRange()).thenReturn(Option.empty());
when(readerContext.getHasBootstrapBaseFile()).thenReturn(false);
when(readerContext.getHasLogFiles()).thenReturn(true);
HoodieRecordMerger recordMerger = mock(HoodieRecordMerger.class);
@@ -290,7 +291,8 @@
RecordMergeMode.COMMIT_TIME_ORDERING,
props,
readStats,
- Option.empty());
+ Option.empty(),
+ false);
when(readerContext.getValue(any(), any(), any())).thenReturn(null);
assertFalse(keyBasedBuffer.isCustomDeleteRecord(record));
@@ -302,7 +304,8 @@
RecordMergeMode.COMMIT_TIME_ORDERING,
props,
readStats,
- Option.empty());
+ Option.empty(),
+ false);
when(readerContext.getValue(any(), any(), any())).thenReturn("i");
assertFalse(keyBasedBuffer.isCustomDeleteRecord(record));
when(readerContext.getValue(any(), any(), any())).thenReturn("d");
@@ -323,7 +326,8 @@
RecordMergeMode.COMMIT_TIME_ORDERING,
props,
readStats,
- Option.empty());
+ Option.empty(),
+ false);
// CASE 1: With custom delete marker.
GenericRecord record = new GenericData.Record(schema);
diff --git a/hudi-examples/hudi-examples-flink/src/test/java/org/apache/hudi/examples/quickstart/TestQuickstartData.java b/hudi-examples/hudi-examples-flink/src/test/java/org/apache/hudi/examples/quickstart/TestQuickstartData.java
index 5e7613f..565543e 100644
--- a/hudi-examples/hudi-examples-flink/src/test/java/org/apache/hudi/examples/quickstart/TestQuickstartData.java
+++ b/hudi-examples/hudi-examples-flink/src/test/java/org/apache/hudi/examples/quickstart/TestQuickstartData.java
@@ -342,6 +342,7 @@
/**
* Returns the scanner to read avro log files.
*/
+ @Deprecated
private static HoodieMergedLogRecordScanner getScanner(
HoodieStorage storage,
String basePath,
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java
index 690ef55..7159f9a 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java
@@ -185,7 +185,7 @@
Supplier<InternalSchemaManager> internalSchemaManager = () -> InternalSchemaManager.get(conf, metaClient);
// initialize storage conf lazily.
StorageConfiguration<?> readerConf = writeClient.getEngineContext().getStorageConf();
- return Option.of(new FlinkRowDataReaderContext(readerConf, internalSchemaManager, Collections.emptyList(), metaClient.getTableConfig()));
+ return Option.of(new FlinkRowDataReaderContext(readerConf, internalSchemaManager, Collections.emptyList(), metaClient.getTableConfig(), Option.empty()));
} else {
// always using avro record merger for legacy compaction since log scanner do not support rowdata reading yet.
writeClient.getConfig().setRecordMergerClass(HoodieAvroRecordMerger.class.getName());
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
index 8088604..cfea2d2 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
@@ -22,7 +22,6 @@
import org.apache.hudi.client.model.CommitTimeFlinkRecordMerger;
import org.apache.hudi.client.model.EventTimeFlinkRecordMerger;
import org.apache.hudi.client.model.HoodieFlinkRecord;
-import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.engine.HoodieReaderContext;
@@ -34,7 +33,9 @@
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.log.InstantRange;
import org.apache.hudi.common.table.read.BufferedRecord;
+import org.apache.hudi.common.table.read.FileGroupReaderSchemaHandler;
import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
@@ -43,7 +44,8 @@
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.io.storage.HoodieIOFactory;
-import org.apache.hudi.source.ExpressionPredicates.Predicate;
+import org.apache.hudi.keygen.KeyGenUtils;
+import org.apache.hudi.source.ExpressionPredicates;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
@@ -52,6 +54,7 @@
import org.apache.hudi.util.RowDataUtils;
import org.apache.hudi.util.RowProjection;
import org.apache.hudi.util.SchemaEvolvingRowDataProjection;
+import org.apache.hudi.util.StringToRowDataConverter;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
@@ -65,6 +68,7 @@
import org.apache.flink.types.RowKind;
import java.io.IOException;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
@@ -79,21 +83,28 @@
* log files with Flink parquet reader.
*/
public class FlinkRowDataReaderContext extends HoodieReaderContext<RowData> {
- private final List<Predicate> predicates;
+ private final List<ExpressionPredicates.Predicate> predicates;
private final Supplier<InternalSchemaManager> internalSchemaManager;
private final boolean utcTimezone;
- private final HoodieConfig hoodieConfig;
+ private final HoodieTableConfig tableConfig;
+ // the converter is used to create a RowData contains primary key fields only
+ // for DELETE cases, it'll not be initialized if primary key semantics is lost.
+ // For e.g, if the pk fields are [a, b] but user only select a, then the pk
+ // semantics is lost.
+ private StringToRowDataConverter recordKeyRowConverter;
public FlinkRowDataReaderContext(
StorageConfiguration<?> storageConfiguration,
Supplier<InternalSchemaManager> internalSchemaManager,
- List<Predicate> predicates,
- HoodieTableConfig tableConfig) {
+ List<ExpressionPredicates.Predicate> predicates,
+ HoodieTableConfig tableConfig,
+ Option<InstantRange> instantRangeOpt) {
super(storageConfiguration, tableConfig);
- this.hoodieConfig = tableConfig;
+ this.tableConfig = tableConfig;
this.internalSchemaManager = internalSchemaManager;
this.predicates = predicates;
this.utcTimezone = getStorageConfiguration().getBoolean(FlinkOptions.READ_UTC_TIMEZONE.key(), FlinkOptions.READ_UTC_TIMEZONE.defaultValue());
+ this.instantRangeOpt = instantRangeOpt;
}
@Override
@@ -111,9 +122,32 @@
HoodieRowDataParquetReader rowDataParquetReader =
(HoodieRowDataParquetReader) HoodieIOFactory.getIOFactory(storage)
.getReaderFactory(HoodieRecord.HoodieRecordType.FLINK)
- .getFileReader(hoodieConfig, filePath, HoodieFileFormat.PARQUET, Option.empty());
+ .getFileReader(tableConfig, filePath, HoodieFileFormat.PARQUET, Option.empty());
DataType rowType = RowDataAvroQueryContexts.fromAvroSchema(dataSchema).getRowType();
- return rowDataParquetReader.getRowDataIterator(schemaManager, rowType, requiredSchema);
+ return rowDataParquetReader.getRowDataIterator(schemaManager, rowType, requiredSchema, predicates);
+ }
+
+ @Override
+ public void setSchemaHandler(FileGroupReaderSchemaHandler<RowData> schemaHandler) {
+ super.setSchemaHandler(schemaHandler);
+
+ Option<String[]> recordKeysOpt = tableConfig.getRecordKeyFields();
+ if (recordKeysOpt.isEmpty()) {
+ return;
+ }
+ // primary key semantic is lost if not all primary key fields are included in the request schema.
+ boolean pkSemanticLost = Arrays.stream(recordKeysOpt.get()).anyMatch(k -> schemaHandler.getRequestedSchema().getField(k) == null);
+ if (pkSemanticLost) {
+ return;
+ }
+ // get primary key field position in required schema.
+ Schema requiredSchema = schemaHandler.getRequiredSchema();
+ int[] pkFieldsPos = Arrays.stream(recordKeysOpt.get())
+ .map(k -> Option.ofNullable(requiredSchema.getField(k)).map(Schema.Field::pos).orElse(-1))
+ .mapToInt(Integer::intValue)
+ .toArray();
+ recordKeyRowConverter = new StringToRowDataConverter(
+ pkFieldsPos, (RowType) RowDataAvroQueryContexts.fromAvroSchema(requiredSchema).getRowType().getLogicalType());
}
@Override
@@ -145,6 +179,11 @@
}
@Override
+ public String getMetaFieldValue(RowData record, int pos) {
+ return record.getString(pos).toString();
+ }
+
+ @Override
public HoodieRecord<RowData> constructHoodieRecord(BufferedRecord<RowData> bufferedRecord) {
HoodieKey hoodieKey = new HoodieKey(bufferedRecord.getRecordKey(), partitionPath);
// delete record
@@ -257,6 +296,21 @@
}
@Override
+ public RowData getDeleteRow(RowData record, String recordKey) {
+ if (record != null) {
+ return record;
+ }
+ // don't need to emit record key row if primary key semantic is lost
+ if (recordKeyRowConverter == null) {
+ return null;
+ }
+ final String[] pkVals = KeyGenUtils.extractRecordKeys(recordKey);
+ RowData recordKeyRow = recordKeyRowConverter.convert(pkVals);
+ recordKeyRow.setRowKind(RowKind.DELETE);
+ return recordKeyRow;
+ }
+
+ @Override
public RowData convertAvroRecord(IndexedRecord avroRecord) {
Schema recordSchema = avroRecord.getSchema();
AvroToRowDataConverters.AvroToRowDataConverter converter = RowDataAvroQueryContexts.fromAvroSchema(recordSchema, utcTimezone).getAvroToRowDataConverter();
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
index 811f68d..b4e5d14 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
@@ -135,6 +135,7 @@
}
}
+ @Deprecated
public static HoodieMergedLogRecordScanner logScanner(
MergeOnReadInputSplit split,
Schema logSchema,
@@ -163,6 +164,7 @@
.build();
}
+ @Deprecated
public static HoodieMergedLogRecordScanner logScanner(
List<String> logPaths,
Schema logSchema,
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataParquetReader.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataParquetReader.java
index 2902aea..e68ebcc 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataParquetReader.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataParquetReader.java
@@ -30,6 +30,7 @@
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.io.storage.row.parquet.ParquetSchemaConverter;
+import org.apache.hudi.source.ExpressionPredicates.Predicate;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.util.AvroSchemaConverter;
@@ -43,6 +44,7 @@
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
@@ -81,7 +83,7 @@
@Override
public ClosableIterator<HoodieRecord<RowData>> getRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException {
- ClosableIterator<RowData> rowDataItr = getRowDataIterator(InternalSchemaManager.DISABLED, getRowType(), requestedSchema);
+ ClosableIterator<RowData> rowDataItr = getRowDataIterator(InternalSchemaManager.DISABLED, getRowType(), requestedSchema, Collections.emptyList());
readerIterators.add(rowDataItr);
return new CloseableMappingIterator<>(rowDataItr, HoodieFlinkRecord::new);
}
@@ -89,12 +91,16 @@
@Override
public ClosableIterator<String> getRecordKeyIterator() throws IOException {
Schema schema = HoodieAvroUtils.getRecordKeySchema();
- ClosableIterator<RowData> rowDataItr = getRowDataIterator(InternalSchemaManager.DISABLED, getRowType(), schema);
+ ClosableIterator<RowData> rowDataItr = getRowDataIterator(InternalSchemaManager.DISABLED, getRowType(), schema, Collections.emptyList());
return new CloseableMappingIterator<>(rowDataItr, rowData -> Objects.toString(rowData.getString(0)));
}
- public ClosableIterator<RowData> getRowDataIterator(InternalSchemaManager internalSchemaManager, DataType dataType, Schema requestedSchema) throws IOException {
- return RecordIterators.getParquetRecordIterator(storage.getConf(), internalSchemaManager, dataType, requestedSchema, path);
+ public ClosableIterator<RowData> getRowDataIterator(
+ InternalSchemaManager internalSchemaManager,
+ DataType dataType,
+ Schema requestedSchema,
+ List<Predicate> predicates) throws IOException {
+ return RecordIterators.getParquetRecordIterator(storage.getConf(), internalSchemaManager, dataType, requestedSchema, path, predicates);
}
@Override
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/RecordIterators.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/RecordIterators.java
index 8fd0f83..120e50e 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/RecordIterators.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/RecordIterators.java
@@ -43,7 +43,6 @@
import org.apache.parquet.hadoop.util.SerializationUtil;
import java.io.IOException;
-import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -64,7 +63,8 @@
InternalSchemaManager internalSchemaManager,
DataType dataType,
Schema requestedSchema,
- StoragePath path) throws IOException {
+ StoragePath path,
+ List<Predicate> predicates) throws IOException {
List<String> fieldNames = ((RowType) dataType.getLogicalType()).getFieldNames();
List<DataType> fieldTypes = dataType.getChildren();
int[] selectedFields = requestedSchema.getFields().stream().map(Schema.Field::name)
@@ -87,7 +87,7 @@
new org.apache.flink.core.fs.Path(path.toUri()),
0L,
Long.MAX_VALUE,
- Collections.emptyList());
+ predicates);
}
public static ClosableIterator<RowData> getParquetRecordIterator(
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
index e8b766b..84c9053 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
@@ -135,7 +135,7 @@
return getBaseFileIteratorWithMetadata(split.getBasePath().get());
} else if (!split.getBasePath().isPresent()) {
// log files only
- return new LogFileOnlyIterator(getFullLogFileIterator(split));
+ return getFullLogFileIterator(split);
} else {
Schema tableSchema = new Schema.Parser().parse(this.tableState.getAvroSchema());
return new MergeIterator(
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
index dcd7a5f..ae35cee 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
@@ -34,6 +34,7 @@
import org.apache.hudi.common.table.log.InstantRange;
import org.apache.hudi.common.table.read.HoodieFileGroupReader;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -42,10 +43,8 @@
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.internal.schema.InternalSchema;
-import org.apache.hudi.keygen.KeyGenUtils;
import org.apache.hudi.source.ExpressionPredicates.Predicate;
import org.apache.hudi.table.format.FilePathUtils;
import org.apache.hudi.table.format.FlinkRowDataReaderContext;
@@ -58,11 +57,9 @@
import org.apache.hudi.util.RowDataProjection;
import org.apache.hudi.util.RowDataToAvroConverters;
import org.apache.hudi.util.StreamerUtil;
-import org.apache.hudi.util.StringToRowDataConverter;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.generic.IndexedRecord;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
@@ -70,15 +67,12 @@
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplitAssigner;
-import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import java.io.IOException;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
@@ -91,7 +85,6 @@
import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_COMMIT_TIME_COL_POS;
import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS;
-import static org.apache.hudi.table.format.FormatUtils.buildAvroRecordBySchema;
/**
* The base InputFormat class to read from Hoodie data + log files.
@@ -208,51 +201,24 @@
}
protected ClosableIterator<RowData> initIterator(MergeOnReadInputSplit split) throws IOException {
- if (!(split.getLogPaths().isPresent() && split.getLogPaths().get().size() > 0)) {
- if (split.getInstantRange().isPresent()) {
- // base file only with commit time filtering
- return new BaseFileOnlyFilteringIterator(
- split.getInstantRange().get(),
- this.tableState.getRequiredRowType(),
- this.requiredPos,
- getBaseFileIterator(split.getBasePath().get(), getRequiredPosWithCommitTime(this.requiredPos)));
- } else {
- // base file only
- return getBaseFileIterator(split.getBasePath().get());
- }
- } else if (!split.getBasePath().isPresent()) {
- // log files only
+ String mergeType = split.getMergeType();
+ if (!split.getBasePath().isPresent()) {
if (OptionsResolver.emitDeletes(conf)) {
- return new LogFileOnlyIterator(getUnMergedLogFileIterator(split));
+ mergeType = FlinkOptions.REALTIME_SKIP_MERGE;
} else {
- return new LogFileOnlyIterator(getLogFileIterator(split));
+ // always merge records in log files if there is no base file (aligned with legacy behaviour)
+ mergeType = FlinkOptions.REALTIME_PAYLOAD_COMBINE;
}
- } else if (split.getMergeType().equals(FlinkOptions.REALTIME_SKIP_MERGE)) {
- return new SkipMergeIterator(
- getBaseFileIterator(split.getBasePath().get()),
- getLogFileIterator(split));
- } else if (split.getMergeType().equals(FlinkOptions.REALTIME_PAYLOAD_COMBINE)) {
- return new MergeIterator(
- conf,
- hadoopConf,
- split,
- this.tableState.getRowType(),
- this.tableState.getRequiredRowType(),
- new Schema.Parser().parse(this.tableState.getAvroSchema()),
- new Schema.Parser().parse(this.tableState.getRequiredAvroSchema()),
- internalSchemaManager.getQuerySchema(),
- this.requiredPos,
- this.emitDelete,
- this.tableState.getOperationPos(),
- getBaseFileIteratorWithMetadata(split.getBasePath().get()));
- } else {
- throw new HoodieException("Unable to select an Iterator to read the Hoodie MOR File Split for "
- + "file path: " + split.getBasePath()
- + "log paths: " + split.getLogPaths()
- + "hoodie table path: " + split.getTablePath()
- + "flink partition Index: " + split.getSplitNumber()
- + "merge type: " + split.getMergeType());
}
+ ValidationUtils.checkArgument(
+ mergeType.equals(FlinkOptions.REALTIME_SKIP_MERGE) || mergeType.equals(FlinkOptions.REALTIME_PAYLOAD_COMBINE),
+ "Unable to select an Iterator to read the Hoodie MOR File Split for "
+ + "file path: " + split.getBasePath()
+ + "log paths: " + split.getLogPaths()
+ + "hoodie table path: " + split.getTablePath()
+ + "flink partition Index: " + split.getSplitNumber()
+ + "merge type: " + split.getMergeType());
+ return getSplitIterator(split, mergeType);
}
@Override
@@ -363,93 +329,15 @@
predicates);
}
- private ClosableIterator<RowData> getLogFileIterator(MergeOnReadInputSplit split) {
- final Schema tableSchema = new Schema.Parser().parse(tableState.getAvroSchema());
- final Schema requiredSchema = new Schema.Parser().parse(tableState.getRequiredAvroSchema());
- final GenericRecordBuilder recordBuilder = new GenericRecordBuilder(requiredSchema);
- final AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter =
- AvroToRowDataConverters.createRowConverter(tableState.getRequiredRowType(), conf.getBoolean(FlinkOptions.READ_UTC_TIMEZONE));
- final HoodieMergedLogRecordScanner scanner = FormatUtils.logScanner(split, tableSchema, internalSchemaManager.getQuerySchema(), conf, hadoopConf);
- final Iterator<String> logRecordsKeyIterator = scanner.getRecords().keySet().iterator();
- final int[] pkOffset = tableState.getPkOffsetsInRequired();
- // flag saying whether the pk semantics has been dropped by user specified
- // projections. For e.g, if the pk fields are [a, b] but user only select a,
- // then the pk semantics is lost.
- final boolean pkSemanticLost = Arrays.stream(pkOffset).anyMatch(offset -> offset == -1);
- final LogicalType[] pkTypes = pkSemanticLost ? null : tableState.getPkTypes(pkOffset);
- final StringToRowDataConverter converter = pkSemanticLost ? null : new StringToRowDataConverter(pkTypes);
-
- return new ClosableIterator<RowData>() {
- private RowData currentRecord;
-
- @Override
- public boolean hasNext() {
- while (logRecordsKeyIterator.hasNext()) {
- String curAvroKey = logRecordsKeyIterator.next();
- Option<IndexedRecord> curAvroRecord = null;
- final HoodieAvroRecord<?> hoodieRecord = (HoodieAvroRecord) scanner.getRecords().get(curAvroKey);
- try {
- curAvroRecord = hoodieRecord.getData().getInsertValue(tableSchema);
- } catch (IOException e) {
- throw new HoodieException("Get avro insert value error for key: " + curAvroKey, e);
- }
- if (!curAvroRecord.isPresent()) {
- // delete record found
- if (emitDelete && !pkSemanticLost) {
- GenericRowData delete = new GenericRowData(tableState.getRequiredRowType().getFieldCount());
-
- final String recordKey = hoodieRecord.getRecordKey();
- final String[] pkFields = KeyGenUtils.extractRecordKeys(recordKey);
- final Object[] converted = converter.convert(pkFields);
- for (int i = 0; i < pkOffset.length; i++) {
- delete.setField(pkOffset[i], converted[i]);
- }
- delete.setRowKind(RowKind.DELETE);
-
- this.currentRecord = delete;
- return true;
- }
- // skipping if the condition is unsatisfied
- // continue;
- } else {
- final IndexedRecord avroRecord = curAvroRecord.get();
- final RowKind rowKind = FormatUtils.getRowKindSafely(avroRecord, tableState.getOperationPos());
- if (rowKind == RowKind.DELETE && !emitDelete) {
- // skip the delete record
- continue;
- }
- GenericRecord requiredAvroRecord = buildAvroRecordBySchema(
- avroRecord,
- requiredSchema,
- requiredPos,
- recordBuilder);
- currentRecord = (RowData) avroToRowDataConverter.convert(requiredAvroRecord);
- currentRecord.setRowKind(rowKind);
- return true;
- }
- }
- return false;
- }
-
- @Override
- public RowData next() {
- return currentRecord;
- }
-
- @Override
- public void close() {
- scanner.close();
- }
- };
- }
-
/**
* Get record iterator using {@link HoodieFileGroupReader}.
*
* @param split input split
- * @return {@link RowData} iterator.
+ * @param mergeType merge type for FileGroup reader
+ *
+ * @return {@link RowData} iterator for the given split.
*/
- private ClosableIterator<RowData> getUnMergedLogFileIterator(MergeOnReadInputSplit split) {
+ private ClosableIterator<RowData> getSplitIterator(MergeOnReadInputSplit split, String mergeType) throws IOException {
final Schema tableSchema = new Schema.Parser().parse(tableState.getAvroSchema());
final Schema requiredSchema = new Schema.Parser().parse(tableState.getRequiredAvroSchema());
@@ -466,11 +354,12 @@
HadoopFSUtils.getStorageConf(hadoopConf),
() -> internalSchemaManager,
predicates,
- metaClient.getTableConfig());
+ metaClient.getTableConfig(),
+ split.getInstantRange());
TypedProperties typedProps = FlinkClientUtil.getMergedTableAndWriteProps(metaClient.getTableConfig(), writeConfig);
- typedProps.put(HoodieReaderConfig.MERGE_TYPE.key(), HoodieReaderConfig.REALTIME_SKIP_MERGE);
+ typedProps.put(HoodieReaderConfig.MERGE_TYPE.key(), mergeType);
- try (HoodieFileGroupReader<RowData> fileGroupReader = HoodieFileGroupReader.<RowData>newBuilder()
+ HoodieFileGroupReader<RowData> fileGroupReader = HoodieFileGroupReader.<RowData>newBuilder()
.withReaderContext(readerContext)
.withHoodieTableMetaClient(metaClient)
.withLatestCommitTime(split.getLatestCommit())
@@ -480,11 +369,9 @@
.withInternalSchema(Option.ofNullable(internalSchemaManager.getQuerySchema()))
.withProps(typedProps)
.withShouldUseRecordPosition(false)
- .build()) {
- return fileGroupReader.getClosableIterator();
- } catch (IOException e) {
- throw new HoodieUpsertException("Failed to compact file slice: " + fileSlice, e);
- }
+ .withEmitDelete(emitDelete)
+ .build();
+ return fileGroupReader.getClosableIterator();
}
protected static Option<IndexedRecord> getInsertVal(HoodieAvroRecord<?> hoodieRecord, Schema tableSchema) {
@@ -551,130 +438,6 @@
// -------------------------------------------------------------------------
// Inner Class
// -------------------------------------------------------------------------
- /**
- * Base record iterator with instant time filtering.
- */
- static class BaseFileOnlyFilteringIterator implements ClosableIterator<RowData> {
- // base file record iterator
- private final ClosableIterator<RowData> nested;
- private final InstantRange instantRange;
- private final RowDataProjection projection;
-
- private RowData currentRecord;
-
- private int commitTimePos;
-
- BaseFileOnlyFilteringIterator(
- InstantRange instantRange,
- RowType requiredRowType,
- int[] requiredPos,
- ClosableIterator<RowData> nested) {
- this.nested = nested;
- this.instantRange = instantRange;
- this.commitTimePos = getCommitTimePos(requiredPos);
- int[] positions;
- if (commitTimePos < 0) {
- commitTimePos = 0;
- positions = IntStream.range(1, 1 + requiredPos.length).toArray();
- } else {
- positions = IntStream.range(0, requiredPos.length).toArray();
- }
- this.projection = RowDataProjection.instance(requiredRowType, positions);
- }
-
- @Override
- public boolean hasNext() {
- while (this.nested.hasNext()) {
- currentRecord = this.nested.next();
- boolean isInRange = instantRange.isInRange(currentRecord.getString(commitTimePos).toString());
- if (isInRange) {
- return true;
- }
- }
- return false;
- }
-
- @Override
- public RowData next() {
- // can promote: no need to project with null instant range
- return projection.project(currentRecord);
- }
-
- @Override
- public void close() {
- if (this.nested != null) {
- this.nested.close();
- }
- }
- }
-
- protected static class LogFileOnlyIterator implements ClosableIterator<RowData> {
- // iterator for log files
- private final ClosableIterator<RowData> iterator;
-
- public LogFileOnlyIterator(ClosableIterator<RowData> iterator) {
- this.iterator = iterator;
- }
-
- @Override
- public boolean hasNext() {
- return this.iterator.hasNext();
- }
-
- @Override
- public RowData next() {
- return this.iterator.next();
- }
-
- @Override
- public void close() {
- if (this.iterator != null) {
- this.iterator.close();
- }
- }
- }
-
- static class SkipMergeIterator implements ClosableIterator<RowData> {
- // base file record iterator
- private final ClosableIterator<RowData> nested;
- // iterator for log files
- private final ClosableIterator<RowData> iterator;
-
- private RowData currentRecord;
-
- SkipMergeIterator(ClosableIterator<RowData> nested, ClosableIterator<RowData> iterator) {
- this.nested = nested;
- this.iterator = iterator;
- }
-
- @Override
- public boolean hasNext() {
- if (this.nested.hasNext()) {
- currentRecord = this.nested.next();
- return true;
- }
- if (this.iterator.hasNext()) {
- currentRecord = this.iterator.next();
- return true;
- }
- return false;
- }
-
- @Override
- public RowData next() {
- return currentRecord;
- }
-
- @Override
- public void close() {
- if (this.nested != null) {
- this.nested.close();
- }
- if (this.iterator != null) {
- this.iterator.close();
- }
- }
- }
protected static class MergeIterator implements ClosableIterator<RowData> {
// base file record iterator
@@ -710,26 +473,6 @@
RowType tableRowType,
RowType requiredRowType,
Schema tableSchema,
- Schema requiredSchema,
- InternalSchema querySchema,
- int[] requiredPos,
- boolean emitDelete,
- int operationPos,
- ClosableIterator<RowData> nested) { // the iterator should be with full schema
- this(flinkConf, hadoopConf, split, tableRowType, requiredRowType, tableSchema,
- querySchema,
- Option.of(RowDataProjection.instance(requiredRowType, requiredPos)),
- Option.of(record -> buildAvroRecordBySchema(record, requiredSchema, requiredPos, new GenericRecordBuilder(requiredSchema))),
- emitDelete, operationPos, nested);
- }
-
- public MergeIterator(
- Configuration flinkConf,
- org.apache.hadoop.conf.Configuration hadoopConf,
- MergeOnReadInputSplit split,
- RowType tableRowType,
- RowType requiredRowType,
- Schema tableSchema,
InternalSchema querySchema,
Option<RowDataProjection> projection,
Option<Function<IndexedRecord, GenericRecord>> avroProjection,
@@ -905,25 +648,6 @@
// Utilities
// -------------------------------------------------------------------------
- private static int[] getRequiredPosWithCommitTime(int[] requiredPos) {
- if (getCommitTimePos(requiredPos) >= 0) {
- return requiredPos;
- }
- int[] requiredPos2 = new int[requiredPos.length + 1];
- requiredPos2[0] = HOODIE_COMMIT_TIME_COL_POS;
- System.arraycopy(requiredPos, 0, requiredPos2, 1, requiredPos.length);
- return requiredPos2;
- }
-
- private static int getCommitTimePos(int[] requiredPos) {
- for (int i = 0; i < requiredPos.length; i++) {
- if (requiredPos[i] == HOODIE_COMMIT_TIME_COL_POS) {
- return i;
- }
- }
- return -1;
- }
-
@VisibleForTesting
public void isEmitDelete(boolean emitDelete) {
this.emitDelete = emitDelete;
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java
index 36dfecb..7f55b38 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java
@@ -20,11 +20,9 @@
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import java.io.Serializable;
-import java.util.Arrays;
import java.util.List;
/**
@@ -89,29 +87,4 @@
.mapToInt(i -> i)
.toArray();
}
-
- /**
- * Get the primary key positions in required row type.
- */
- public int[] getPkOffsetsInRequired() {
- final List<String> fieldNames = requiredRowType.getFieldNames();
- return Arrays.stream(pkFields)
- .map(fieldNames::indexOf)
- .mapToInt(i -> i)
- .toArray();
- }
-
- /**
- * Returns the primary key fields logical type with given offsets.
- *
- * @param pkOffsets the pk offsets in required row type
- * @return pk field logical types
- * @see #getPkOffsetsInRequired()
- */
- public LogicalType[] getPkTypes(int[] pkOffsets) {
- final LogicalType[] requiredTypes = requiredRowType.getFields().stream()
- .map(RowType.RowField::getType).toArray(LogicalType[]::new);
- return Arrays.stream(pkOffsets).mapToObj(offset -> requiredTypes[offset])
- .toArray(LogicalType[]::new);
- }
}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StringToRowDataConverter.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StringToRowDataConverter.java
index 6c4aae3..b9f1baf 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StringToRowDataConverter.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StringToRowDataConverter.java
@@ -22,10 +22,13 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;
import java.math.BigDecimal;
@@ -44,22 +47,26 @@
@Internal
public class StringToRowDataConverter {
private final Converter[] converters;
+ private final int[] fieldsPos;
+ private final int rowArity;
- public StringToRowDataConverter(LogicalType[] fieldTypes) {
- this.converters = Arrays.stream(fieldTypes)
- .map(StringToRowDataConverter::getConverter)
+ public StringToRowDataConverter(int[] fieldsPos, RowType rowType) {
+ this.fieldsPos = fieldsPos;
+ this.rowArity = rowType.getFieldCount();
+ this.converters = Arrays.stream(fieldsPos)
+ .mapToObj(f -> getConverter(rowType.getTypeAt(f)))
.toArray(Converter[]::new);
}
- public Object[] convert(String[] fields) {
+ public RowData convert(String[] fields) {
ValidationUtils.checkArgument(converters.length == fields.length,
"Field types and values should equal with number");
- Object[] converted = new Object[fields.length];
+ GenericRowData rowData = new GenericRowData(rowArity);
for (int i = 0; i < fields.length; i++) {
- converted[i] = converters[i].convert(fields[i]);
+ rowData.setField(fieldsPos[i], converters[i].convert(fields[i]));
}
- return converted;
+ return rowData;
}
private interface Converter {
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
index 0d3fde9..e915581 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
@@ -110,7 +110,8 @@
storageConf,
() -> InternalSchemaManager.DISABLED,
Collections.emptyList(),
- metaClient.getTableConfig());
+ metaClient.getTableConfig(),
+ Option.empty());
}
@Override
@@ -164,7 +165,7 @@
HoodieTableConfig tableConfig = Mockito.mock(HoodieTableConfig.class);
when(tableConfig.populateMetaFields()).thenReturn(true);
FlinkRowDataReaderContext readerContext =
- new FlinkRowDataReaderContext(getStorageConf(), () -> InternalSchemaManager.DISABLED, Collections.emptyList(), tableConfig);
+ new FlinkRowDataReaderContext(getStorageConf(), () -> InternalSchemaManager.DISABLED, Collections.emptyList(), tableConfig, Option.empty());
Schema schema = SchemaBuilder.builder()
.record("test")
.fields()
@@ -182,7 +183,7 @@
HoodieTableConfig tableConfig = Mockito.mock(HoodieTableConfig.class);
when(tableConfig.populateMetaFields()).thenReturn(true);
FlinkRowDataReaderContext readerContext =
- new FlinkRowDataReaderContext(getStorageConf(), () -> InternalSchemaManager.DISABLED, Collections.emptyList(), tableConfig);
+ new FlinkRowDataReaderContext(getStorageConf(), () -> InternalSchemaManager.DISABLED, Collections.emptyList(), tableConfig, Option.empty());
Schema schema = SchemaBuilder.builder()
.record("test")
.fields()
@@ -200,7 +201,7 @@
when(tableConfig.populateMetaFields()).thenReturn(false);
when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new String[] {"field1"}));
FlinkRowDataReaderContext readerContext =
- new FlinkRowDataReaderContext(getStorageConf(), () -> InternalSchemaManager.DISABLED, Collections.emptyList(), tableConfig);
+ new FlinkRowDataReaderContext(getStorageConf(), () -> InternalSchemaManager.DISABLED, Collections.emptyList(), tableConfig, Option.empty());
Schema schema = SchemaBuilder.builder()
.record("test")
.fields()
@@ -218,7 +219,7 @@
when(tableConfig.populateMetaFields()).thenReturn(false);
when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new String[] {"field1", "field2"}));
FlinkRowDataReaderContext readerContext =
- new FlinkRowDataReaderContext(getStorageConf(), () -> InternalSchemaManager.DISABLED, Collections.emptyList(), tableConfig);
+ new FlinkRowDataReaderContext(getStorageConf(), () -> InternalSchemaManager.DISABLED, Collections.emptyList(), tableConfig, Option.empty());
Schema schema = SchemaBuilder.builder()
.record("test")
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
index bbf7e04..e5a497e 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
@@ -305,6 +305,31 @@
}
@Test
+ void testReadWithDeletes() throws Exception {
+ Map<String, String> options = new HashMap<>();
+ options.put(FlinkOptions.RECORD_KEY_FIELD.key(), "uuid");
+ beforeEach(HoodieTableType.MERGE_ON_READ, options);
+
+ // write another commit to read again
+ TestData.writeData(TestData.DATA_SET_UPDATE_DELETE, conf);
+
+ InputFormat<RowData, ?> inputFormat = this.tableSource.getInputFormat();
+ assertThat(inputFormat, instanceOf(MergeOnReadInputFormat.class));
+ ((MergeOnReadInputFormat) inputFormat).isEmitDelete(true);
+
+ List<RowData> result = readData(inputFormat);
+
+ final String actual = TestData.rowDataToString(result);
+ final String expected = "["
+ + "+I[id1, Danny, 24, 1970-01-01T00:00:00.001, par1], "
+ + "+I[id2, Stephen, 34, 1970-01-01T00:00:00.002, par1], "
+ + "-D[id3, null, null, null, null], "
+ + "-D[id5, null, null, null, null], "
+ + "-D[id9, null, null, null, null]]";
+ assertThat(actual, is(expected));
+ }
+
+ @Test
void testReadWithDeletesMOR() throws Exception {
Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.CHANGELOG_ENABLED.key(), "true");
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index 1c671e3..ad8d2d4 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -982,6 +982,7 @@
/**
* Returns the scanner to read avro log files.
*/
+ @Deprecated
private static HoodieMergedLogRecordScanner getScanner(
HoodieStorage storage,
String basePath,
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestStringToRowDataConverter.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestStringToRowDataConverter.java
index 8f7ecad..b5f140b 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestStringToRowDataConverter.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestStringToRowDataConverter.java
@@ -27,6 +27,7 @@
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
@@ -41,7 +42,7 @@
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
/**
* Test cases for {@link StringToRowDataConverter}.
@@ -59,8 +60,9 @@
DataTypes.TIMESTAMP(6).getLogicalType(),
DataTypes.DECIMAL(7, 2).getLogicalType()
};
- StringToRowDataConverter converter = new StringToRowDataConverter(fieldTypes);
- Object[] converted = converter.convert(fields);
+ RowType rowType = RowType.of(fieldTypes);
+ StringToRowDataConverter converter = new StringToRowDataConverter(new int[]{0, 1, 2, 3, 4, 5, 6}, rowType);
+ RowData actual = converter.convert(fields);
Object[] expected = new Object[] {
1.1f, 3.4D, (int) LocalDate.parse("2021-03-30").toEpochDay(),
LocalTime.parse("15:44:29").get(ChronoField.MILLI_OF_DAY),
@@ -68,7 +70,8 @@
TimestampData.fromInstant(Instant.parse("2021-03-30T15:44:29.666111Z")),
DecimalData.fromBigDecimal(new BigDecimal("12345.67"), 7, 2)
};
- assertArrayEquals(expected, converted);
+ GenericRowData expectedRow = GenericRowData.of(expected);
+ assertEquals(expectedRow, actual);
}
@Test
@@ -97,15 +100,10 @@
GenericRecord avroRecord =
(GenericRecord) converter.convert(AvroSchemaConverter.convertToSchema(rowType), rowData);
StringToRowDataConverter stringToRowDataConverter =
- new StringToRowDataConverter(rowType.getChildren().toArray(new LogicalType[0]));
+ new StringToRowDataConverter(new int[]{0, 1, 2, 3, 4, 5, 6}, rowType);
final String recordKey = KeyGenUtils.getRecordKey(avroRecord, rowType.getFieldNames(), false);
final String[] recordKeys = KeyGenUtils.extractRecordKeys(recordKey);
- Object[] convertedKeys = stringToRowDataConverter.convert(recordKeys);
-
- GenericRowData converted = new GenericRowData(7);
- for (int i = 0; i < 7; i++) {
- converted.setField(i, convertedKeys[i]);
- }
+ RowData converted = stringToRowDataConverter.convert(recordKeys);
assertThat(converted, is(rowData));
}
}
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
index fa05136..8941d6b 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
@@ -178,6 +178,11 @@
}
@Override
+ public ArrayWritable getDeleteRow(ArrayWritable record, String recordKey) {
+ throw new UnsupportedOperationException("Not supported for " + this.getClass().getSimpleName());
+ }
+
+ @Override
public Option<HoodieRecordMerger> getRecordMerger(RecordMergeMode mergeMode, String mergeStrategyId, String mergeImplClasses) {
// TODO(HUDI-7843):
// get rid of event time and commit time ordering. Just return Option.empty
@@ -203,6 +208,11 @@
}
@Override
+ public String getMetaFieldValue(ArrayWritable record, int pos) {
+ return record.get()[pos].toString();
+ }
+
+ @Override
public boolean castToBoolean(Object value) {
if (value instanceof BooleanWritable) {
return ((BooleanWritable) value).get();
diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestPositionBasedFileGroupRecordBuffer.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestPositionBasedFileGroupRecordBuffer.java
index 47e7f42..4211b24 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestPositionBasedFileGroupRecordBuffer.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestPositionBasedFileGroupRecordBuffer.java
@@ -138,7 +138,8 @@
baseFileInstantTime,
props,
readStats,
- Option.of("timestamp"));
+ Option.of("timestamp"),
+ false);
}
public Map<HoodieLogBlock.HeaderMetadataType, String> getHeader(boolean shouldWriteRecordPositions,