[HUDI-9146][part2] Integrating FileGroup reader into Flink merge reader (#13343)

diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
index 0084fb9..56ebbc8 100644
--- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
+++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
@@ -91,6 +91,11 @@
   }
 
   @Override
+  public String getMetaFieldValue(InternalRow record, int pos) {
+    return record.getString(pos);
+  }
+
+  @Override
   public HoodieRecord<InternalRow> constructHoodieRecord(BufferedRecord<InternalRow> bufferedRecord) {
     HoodieKey hoodieKey = new HoodieKey(bufferedRecord.getRecordKey(), partitionPath);
     if (bufferedRecord.isDelete()) {
@@ -162,4 +167,9 @@
     }
     return value;
   }
+
+  @Override
+  public InternalRow getDeleteRow(InternalRow record, String recordKey) {
+    throw new UnsupportedOperationException("Not supported for " + this.getClass().getSimpleName());
+  }
 }
diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
index ec9efab..1a08c69 100644
--- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
+++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
@@ -25,6 +25,7 @@
 import org.apache.hudi.common.engine.HoodieReaderContext
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.common.table.HoodieTableConfig
 import org.apache.hudi.common.table.read.PositionBasedFileGroupRecordBuffer.ROW_INDEX_TEMPORARY_COLUMN_NAME
 import org.apache.hudi.common.util.ValidationUtils.checkState
 import org.apache.hudi.common.util.collection.{CachingIterator, ClosableIterator, Pair => HPair}
@@ -35,7 +36,6 @@
 import org.apache.avro.Schema
 import org.apache.avro.generic.{GenericRecord, IndexedRecord}
 import org.apache.hadoop.conf.Configuration
-import org.apache.hudi.common.table.HoodieTableConfig
 import org.apache.spark.sql.HoodieInternalRowUtils
 import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSerializer}
 import org.apache.spark.sql.catalyst.InternalRow
diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
index ef100f1..c15d2b6 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
@@ -99,6 +99,11 @@
   }
 
   @Override
+  public IndexedRecord getDeleteRow(IndexedRecord record, String recordKey) {
+    throw new UnsupportedOperationException("Not supported for " + this.getClass().getSimpleName());
+  }
+
+  @Override
   public Option<HoodieRecordMerger> getRecordMerger(RecordMergeMode mergeMode, String mergeStrategyId, String mergeImplClasses) {
     switch (mergeMode) {
       case EVENT_TIME_ORDERING:
@@ -122,6 +127,11 @@
   }
 
   @Override
+  public String getMetaFieldValue(IndexedRecord record, int pos) {
+    return record.get(pos).toString();
+  }
+
+  @Override
   public HoodieRecord<IndexedRecord> constructHoodieRecord(BufferedRecord<IndexedRecord> bufferedRecord) {
     if (bufferedRecord.isDelete()) {
       return SpillableMapUtils.generateEmptyPayload(
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
index 638c58a..61f2dea 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
@@ -23,11 +23,13 @@
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.log.InstantRange;
 import org.apache.hudi.common.table.read.BufferedRecord;
 import org.apache.hudi.common.table.read.FileGroupReaderSchemaHandler;
 import org.apache.hudi.common.util.LocalAvroSchemaCache;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.CloseableFilterIterator;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.keygen.KeyGenerator;
 import org.apache.hudi.storage.HoodieStorage;
@@ -46,6 +48,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.function.BiFunction;
+import java.util.function.Predicate;
 import java.util.function.UnaryOperator;
 
 import static org.apache.hudi.common.model.HoodieRecord.DEFAULT_ORDERING_VALUE;
@@ -74,6 +77,7 @@
   private Boolean needsBootstrapMerge = null;
   private Boolean shouldMergeUseRecordPosition = null;
   protected String partitionPath;
+  protected Option<InstantRange> instantRangeOpt = Option.empty();
 
   // for encoding and decoding schemas to the spillable map
   private final LocalAvroSchemaCache localAvroSchemaCache = LocalAvroSchemaCache.getInstance();
@@ -208,6 +212,23 @@
   public abstract T convertAvroRecord(IndexedRecord avroRecord);
 
   public abstract GenericRecord convertToAvroRecord(T record, Schema schema);
+
+  /**
+   * There are two cases to handle:
+   * 1). Return the delete record if it's not null;
+   * 2). otherwise fills an empty row with record key fields and returns.
+   *
+   * <p>For case2, when `emitDelete` is true for FileGroup reader and payload for DELETE record is empty,
+   * a record key row is emitted to downstream to delete data from storage by record key with the best effort.
+   * Returns null if the primary key semantics been lost: the requested schema does not include all the record key fields.
+   *
+   * @param record    delete record
+   * @param recordKey record key
+   *
+   * @return Engine specific row which contains record key fields.
+   */
+  @Nullable
+  public abstract T getDeleteRow(T record, String recordKey);
   
   /**
    * @param mergeMode        record merge mode
@@ -229,6 +250,16 @@
   public abstract Object getValue(T record, Schema schema, String fieldName);
 
   /**
+   * Get value of metadata field in a more efficient way than #getValue.
+   *
+   * @param record The record in engine-specific type.
+   * @param pos    The position of the metadata field.
+   *
+   * @return The value for the target metadata field.
+   */
+  public abstract String getMetaFieldValue(T record, int pos);
+
+  /**
    * Cast to Java boolean value.
    * If the object is not compatible with boolean type, throws.
    */
@@ -242,6 +273,28 @@
   }
 
   /**
+   * Get the {@link InstantRange} filter.
+   */
+  public Option<InstantRange> getInstantRange() {
+    return instantRangeOpt;
+  }
+
+  /**
+   * Apply the {@link InstantRange} filter to the file record iterator.
+   *
+   * @param fileRecordIterator File record iterator.
+   *
+   * @return File record iterator filter by {@link InstantRange}.
+   */
+  public ClosableIterator<T> applyInstantRangeFilter(ClosableIterator<T> fileRecordIterator) {
+    InstantRange instantRange = getInstantRange().get();
+    final Schema.Field commitTimeField = schemaHandler.getRequiredSchema().getField(HoodieRecord.COMMIT_TIME_METADATA_FIELD);
+    final int commitTimePos = commitTimeField.pos();
+    Predicate<T> instantFilter = row -> instantRange.isInRange(getMetaFieldValue(row, commitTimePos));
+    return new CloseableFilterIterator<>(fileRecordIterator, instantFilter);
+  }
+
+  /**
    * Gets the record key in String.
    *
    * @param record The record in engine-specific type.
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java b/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java
index e39dbf3..816ec21 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java
@@ -90,6 +90,7 @@
 
   private final Option<Pair<String, String>> customDeleteMarkerKeyValue;
   private final boolean hasBuiltInDelete;
+  private final int hoodieOperationPos;
 
   public FileGroupReaderSchemaHandler(HoodieReaderContext<T> readerContext,
                                       Schema tableSchema,
@@ -109,6 +110,7 @@
     this.customDeleteMarkerKeyValue = deleteConfigs.getLeft();
     this.hasBuiltInDelete = deleteConfigs.getRight();
     this.requiredSchema = AvroSchemaCache.intern(prepareRequiredSchema());
+    this.hoodieOperationPos = Option.ofNullable(requiredSchema.getField(HoodieRecord.OPERATION_METADATA_FIELD)).map(Schema.Field::pos).orElse(-1);
     this.internalSchema = pruneInternalSchema(requiredSchema, internalSchemaOpt);
     this.internalSchemaOpt = getInternalSchemaOpt(internalSchemaOpt);
     readerContext.setNeedsBootstrapMerge(this.needsBootstrapMerge);
@@ -150,6 +152,10 @@
     return hasBuiltInDelete;
   }
 
+  public int getHoodieOperationPos() {
+    return hoodieOperationPos;
+  }
+
   private InternalSchema pruneInternalSchema(Schema requiredSchema, Option<InternalSchema> internalSchemaOption) {
     if (!internalSchemaOption.isPresent()) {
       return InternalSchema.getEmptyInternalSchema();
@@ -172,8 +178,13 @@
 
   @VisibleForTesting
   Schema generateRequiredSchema() {
-    //might need to change this if other queries than mor have mandatory fields
+    boolean hasInstantRange = readerContext.getInstantRange().isPresent();
     if (!needsMORMerge) {
+      if (hasInstantRange && !findNestedField(requestedSchema, HoodieRecord.COMMIT_TIME_METADATA_FIELD).isPresent()) {
+        List<Schema.Field> addedFields = new ArrayList<>();
+        addedFields.add(getField(tableSchema, HoodieRecord.COMMIT_TIME_METADATA_FIELD));
+        return appendFieldsToSchemaDedupNested(requestedSchema, addedFields);
+      }
       return requestedSchema;
     }
 
@@ -186,14 +197,9 @@
     List<Schema.Field> addedFields = new ArrayList<>();
     for (String field : getMandatoryFieldsForMerging(
         hoodieTableConfig, properties, tableSchema, recordMerger,
-        hasBuiltInDelete, customDeleteMarkerKeyValue)) {
+        hasBuiltInDelete, customDeleteMarkerKeyValue, hasInstantRange)) {
       if (!findNestedField(requestedSchema, field).isPresent()) {
-        Option<Schema.Field> foundFieldOpt = findNestedField(tableSchema, field);
-        if (!foundFieldOpt.isPresent()) {
-          throw new IllegalArgumentException("Field: " + field + " does not exist in the table schema");
-        }
-        Schema.Field foundField = foundFieldOpt.get();
-        addedFields.add(foundField);
+        addedFields.add(getField(tableSchema, field));
       }
     }
 
@@ -209,7 +215,8 @@
                                                        Schema tableSchema,
                                                        Option<HoodieRecordMerger> recordMerger,
                                                        boolean hasBuiltInDelete,
-                                                       Option<Pair<String, String>> customDeleteMarkerKeyAndValue) {
+                                                       Option<Pair<String, String>> customDeleteMarkerKeyAndValue,
+                                                       boolean hasInstantRange) {
     Triple<RecordMergeMode, String, String> mergingConfigs = HoodieTableConfig.inferCorrectMergingBehavior(
         cfg.getRecordMergeMode(),
         cfg.getPayloadClass(),
@@ -223,6 +230,11 @@
 
     // Use Set to avoid duplicated fields.
     Set<String> requiredFields = new HashSet<>();
+
+    if (hasInstantRange) {
+      requiredFields.add(HoodieRecord.COMMIT_TIME_METADATA_FIELD);
+    }
+
     // Add record key fields.
     if (cfg.populateMetaFields()) {
       requiredFields.add(HoodieRecord.RECORD_KEY_METADATA_FIELD);
@@ -292,6 +304,17 @@
   }
 
   /**
+   * Get {@link Schema.Field} from {@link Schema} by field name.
+   */
+  private static Schema.Field getField(Schema schema, String fieldName) {
+    Option<Schema.Field> foundFieldOpt = findNestedField(schema, fieldName);
+    if (!foundFieldOpt.isPresent()) {
+      throw new IllegalArgumentException("Field: " + fieldName + " does not exist in the table schema");
+    }
+    return foundFieldOpt.get();
+  }
+
+  /**
    * Fetches the delete configs from the configs.
    *
    * @param props write and table configs that contain delete related properties
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java b/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java
index 0f89a6c..020e19c 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java
@@ -86,6 +86,7 @@
   protected final HoodieReadStats readStats;
   protected final boolean shouldCheckCustomDeleteMarker;
   protected final boolean shouldCheckBuiltInDeleteMarker;
+  protected final boolean emitDelete;
   protected ClosableIterator<T> baseFileIterator;
   protected Iterator<BufferedRecord<T>> logRecordIterator;
   protected T nextRecord;
@@ -99,7 +100,8 @@
                                   RecordMergeMode recordMergeMode,
                                   TypedProperties props,
                                   HoodieReadStats readStats,
-                                  Option<String> orderingFieldName) {
+                                  Option<String> orderingFieldName,
+                                  boolean emitDelete) {
     this.readerContext = readerContext;
     this.readerSchema = AvroSchemaCache.intern(readerContext.getSchemaHandler().getRequiredSchema());
     this.recordMergeMode = recordMergeMode;
@@ -120,6 +122,7 @@
     boolean isBitCaskDiskMapCompressionEnabled = props.getBoolean(DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
         DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue());
     this.readStats = readStats;
+    this.emitDelete = emitDelete;
     try {
       // Store merged records for all versions for this log file, set the in-memory footprint to maxInMemoryMapSize
       this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes, spillableMapBasePath, new DefaultSizeEstimator<>(),
@@ -163,6 +166,18 @@
     return columnValue != null && readerContext.castToBoolean(columnValue);
   }
 
+  /**
+   * Returns whether the record is a DELETE marked by the '_hoodie_operation' field.
+   */
+  protected final boolean isDeleteHoodieOperation(T record) {
+    int hoodieOperationPos = readerContext.getSchemaHandler().getHoodieOperationPos();
+    if (hoodieOperationPos < 0) {
+      return false;
+    }
+    String hoodieOperation = readerContext.getMetaFieldValue(record, hoodieOperationPos);
+    return hoodieOperation != null && HoodieOperation.isDeleteRecord(hoodieOperation);
+  }
+
   @Override
   public void setBaseFileIterator(ClosableIterator<T> baseFileIterator) {
     this.baseFileIterator = baseFileIterator;
@@ -539,15 +554,20 @@
 
   protected boolean hasNextBaseRecord(T baseRecord, BufferedRecord<T> logRecordInfo) throws IOException {
     if (logRecordInfo != null) {
-      BufferedRecord<T> bufferedRecord = BufferedRecord.forRecordWithContext(baseRecord, readerSchema, readerContext, orderingFieldName, false);
-      Pair<Boolean, T> isDeleteAndRecord = merge(bufferedRecord, logRecordInfo);
+      BufferedRecord<T> baseRecordInfo = BufferedRecord.forRecordWithContext(baseRecord, readerSchema, readerContext, orderingFieldName, false);
+      Pair<Boolean, T> isDeleteAndRecord = merge(baseRecordInfo, logRecordInfo);
       if (!isDeleteAndRecord.getLeft()) {
         // Updates
         nextRecord = readerContext.seal(isDeleteAndRecord.getRight());
         readStats.incrementNumUpdates();
         return true;
+      } else if (emitDelete) {
+        // emit Deletes
+        nextRecord = readerContext.getDeleteRow(isDeleteAndRecord.getRight(), baseRecordInfo.getRecordKey());
+        readStats.incrementNumDeletes();
+        return nextRecord != null;
       } else {
-        // Deletes
+        // not emit Deletes
         readStats.incrementNumDeletes();
         return false;
       }
@@ -570,6 +590,12 @@
         nextRecord = nextRecordInfo.getRecord();
         readStats.incrementNumInserts();
         return true;
+      } else if (emitDelete) {
+        nextRecord = readerContext.getDeleteRow(nextRecordInfo.getRecord(), nextRecordInfo.getRecordKey());
+        readStats.incrementNumDeletes();
+        if (nextRecord != null) {
+          return true;
+        }
       } else {
         readStats.incrementNumDeletes();
       }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
index ed9e258..13dc39a 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
@@ -112,13 +112,13 @@
       long start, long length, boolean shouldUseRecordPosition) {
     this(readerContext, storage, tablePath, latestCommitTime, fileSlice, dataSchema,
         requestedSchema, internalSchemaOpt, hoodieTableMetaClient, props, start, length,
-        shouldUseRecordPosition, false);
+        shouldUseRecordPosition, false, false);
   }
 
   private HoodieFileGroupReader(HoodieReaderContext<T> readerContext, HoodieStorage storage, String tablePath,
                                String latestCommitTime, FileSlice fileSlice, Schema dataSchema, Schema requestedSchema,
                                Option<InternalSchema> internalSchemaOpt, HoodieTableMetaClient hoodieTableMetaClient, TypedProperties props,
-                               long start, long length, boolean shouldUseRecordPosition, boolean allowInflightInstants) {
+                               long start, long length, boolean shouldUseRecordPosition, boolean allowInflightInstants, boolean emitDelete) {
     this.readerContext = readerContext;
     this.storage = storage;
     this.hoodieBaseFileOption = fileSlice.getBaseFile();
@@ -169,7 +169,7 @@
     this.readStats = new HoodieReadStats();
     this.recordBuffer = getRecordBuffer(readerContext, hoodieTableMetaClient,
         recordMergeMode, props, hoodieBaseFileOption, this.logFiles.isEmpty(),
-        isSkipMerge, shouldUseRecordPosition, readStats);
+        isSkipMerge, shouldUseRecordPosition, readStats, emitDelete);
     this.allowInflightInstants = allowInflightInstants;
   }
 
@@ -184,18 +184,19 @@
                                                    boolean hasNoLogFiles,
                                                    boolean isSkipMerge,
                                                    boolean shouldUseRecordPosition,
-                                                   HoodieReadStats readStats) {
+                                                   HoodieReadStats readStats,
+                                                   boolean emitDelete) {
     if (hasNoLogFiles) {
       return null;
     } else if (isSkipMerge) {
       return new UnmergedFileGroupRecordBuffer<>(
-          readerContext, hoodieTableMetaClient, recordMergeMode, props, readStats);
+          readerContext, hoodieTableMetaClient, recordMergeMode, props, readStats, emitDelete);
     } else if (shouldUseRecordPosition && baseFileOption.isPresent()) {
       return new PositionBasedFileGroupRecordBuffer<>(
-          readerContext, hoodieTableMetaClient, recordMergeMode, baseFileOption.get().getCommitTime(), props, readStats, orderingFieldName);
+          readerContext, hoodieTableMetaClient, recordMergeMode, baseFileOption.get().getCommitTime(), props, readStats, orderingFieldName, emitDelete);
     } else {
       return new KeyBasedFileGroupRecordBuffer<>(
-          readerContext, hoodieTableMetaClient, recordMergeMode, props, readStats, orderingFieldName);
+          readerContext, hoodieTableMetaClient, recordMergeMode, props, readStats, orderingFieldName, emitDelete);
     }
   }
 
@@ -224,17 +225,19 @@
     }
 
     StoragePathInfo baseFileStoragePathInfo = baseFile.getPathInfo();
+    final ClosableIterator<T> recordIterator;
     if (baseFileStoragePathInfo != null) {
-      return readerContext.getFileRecordIterator(
+      recordIterator = readerContext.getFileRecordIterator(
           baseFileStoragePathInfo, start, length,
           readerContext.getSchemaHandler().getTableSchema(),
           readerContext.getSchemaHandler().getRequiredSchema(), storage);
     } else {
-      return readerContext.getFileRecordIterator(
+      recordIterator = readerContext.getFileRecordIterator(
           baseFile.getStoragePath(), start, length,
           readerContext.getSchemaHandler().getTableSchema(),
           readerContext.getSchemaHandler().getRequiredSchema(), storage);
     }
+    return readerContext.getInstantRange().isPresent() ? readerContext.applyInstantRangeFilter(recordIterator) : recordIterator;
   }
 
   private ClosableIterator<T> makeBootstrapBaseFileIterator(HoodieBaseFile baseFile) throws IOException {
@@ -341,6 +344,7 @@
         .withLogFiles(logFiles)
         .withReverseReader(false)
         .withBufferSize(getIntWithAltKeys(props, HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE))
+        .withInstantRange(readerContext.getInstantRange())
         .withPartition(getRelativePartitionPath(
             new StoragePath(path), logFiles.get(0).getPath().getParent()))
         .withRecordBuffer(recordBuffer)
@@ -435,6 +439,7 @@
     private long length = Long.MAX_VALUE;
     private boolean shouldUseRecordPosition = false;
     private boolean allowInflightInstants = false;
+    private boolean emitDelete;
 
     public Builder<T> withReaderContext(HoodieReaderContext<T> readerContext) {
       this.readerContext = readerContext;
@@ -497,6 +502,11 @@
       return this;
     }
 
+    public Builder<T> withEmitDelete(boolean emitDelete) {
+      this.emitDelete = emitDelete;
+      return this;
+    }
+
     public HoodieFileGroupReader<T> build() {
       ValidationUtils.checkArgument(readerContext != null, "Reader context is required");
       ValidationUtils.checkArgument(hoodieTableMetaClient != null, "Hoodie table meta client is required");
@@ -515,7 +525,7 @@
       return new HoodieFileGroupReader<>(
           readerContext, storage, tablePath, latestCommitTime, fileSlice,
           dataSchema, requestedSchema, internalSchemaOpt, hoodieTableMetaClient,
-          props, start, length, shouldUseRecordPosition, allowInflightInstants);
+          props, start, length, shouldUseRecordPosition, allowInflightInstants, emitDelete);
     }
   }
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/read/KeyBasedFileGroupRecordBuffer.java b/hudi-common/src/main/java/org/apache/hudi/common/table/read/KeyBasedFileGroupRecordBuffer.java
index 2ccc740..5bbce2a 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/read/KeyBasedFileGroupRecordBuffer.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/read/KeyBasedFileGroupRecordBuffer.java
@@ -53,8 +53,9 @@
                                        RecordMergeMode recordMergeMode,
                                        TypedProperties props,
                                        HoodieReadStats readStats,
-                                       Option<String> orderingFieldName) {
-    super(readerContext, hoodieTableMetaClient, recordMergeMode, props, readStats, orderingFieldName);
+                                       Option<String> orderingFieldName,
+                                       boolean emitDelete) {
+    super(readerContext, hoodieTableMetaClient, recordMergeMode, props, readStats, orderingFieldName, emitDelete);
   }
 
   @Override
@@ -77,7 +78,7 @@
     try (ClosableIterator<T> recordIterator = recordsIteratorSchemaPair.getLeft()) {
       while (recordIterator.hasNext()) {
         T nextRecord = recordIterator.next();
-        boolean isDelete = isBuiltInDeleteRecord(nextRecord) || isCustomDeleteRecord(nextRecord);
+        boolean isDelete = isBuiltInDeleteRecord(nextRecord) || isCustomDeleteRecord(nextRecord) || isDeleteHoodieOperation(nextRecord);
         BufferedRecord<T> bufferedRecord = BufferedRecord.forRecordWithContext(nextRecord, schema, readerContext, orderingFieldName, isDelete);
         processNextDataRecord(bufferedRecord, bufferedRecord.getRecordKey());
       }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/read/PositionBasedFileGroupRecordBuffer.java b/hudi-common/src/main/java/org/apache/hudi/common/table/read/PositionBasedFileGroupRecordBuffer.java
index 5df2ea9..152b710 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/read/PositionBasedFileGroupRecordBuffer.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/read/PositionBasedFileGroupRecordBuffer.java
@@ -71,8 +71,9 @@
                                             String baseFileInstantTime,
                                             TypedProperties props,
                                             HoodieReadStats readStats,
-                                            Option<String> orderingFieldName) {
-    super(readerContext, hoodieTableMetaClient, recordMergeMode, props, readStats, orderingFieldName);
+                                            Option<String> orderingFieldName,
+                                            boolean emitDelete) {
+    super(readerContext, hoodieTableMetaClient, recordMergeMode, props, readStats, orderingFieldName, emitDelete);
     this.baseFileInstantTime = baseFileInstantTime;
   }
 
@@ -130,7 +131,7 @@
 
         long recordPosition = recordPositions.get(recordIndex++);
         T evolvedNextRecord = schemaTransformerWithEvolvedSchema.getLeft().apply(nextRecord);
-        boolean isDelete = isBuiltInDeleteRecord(evolvedNextRecord) || isCustomDeleteRecord(evolvedNextRecord);
+        boolean isDelete = isBuiltInDeleteRecord(evolvedNextRecord) || isCustomDeleteRecord(evolvedNextRecord) || isDeleteHoodieOperation(evolvedNextRecord);
         BufferedRecord<T> bufferedRecord = BufferedRecord.forRecordWithContext(evolvedNextRecord, schema, readerContext, orderingFieldName, isDelete);
         processNextDataRecord(bufferedRecord, recordPosition);
       }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/read/UnmergedFileGroupRecordBuffer.java b/hudi-common/src/main/java/org/apache/hudi/common/table/read/UnmergedFileGroupRecordBuffer.java
index c853eb0..a9e1f66 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/read/UnmergedFileGroupRecordBuffer.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/read/UnmergedFileGroupRecordBuffer.java
@@ -50,8 +50,9 @@
       HoodieTableMetaClient hoodieTableMetaClient,
       RecordMergeMode recordMergeMode,
       TypedProperties props,
-      HoodieReadStats readStats) {
-    super(readerContext, hoodieTableMetaClient, recordMergeMode, props, readStats, Option.empty());
+      HoodieReadStats readStats,
+      boolean emitDelete) {
+    super(readerContext, hoodieTableMetaClient, recordMergeMode, props, readStats, Option.empty(), emitDelete);
     this.currentInstantLogBlocks = new ArrayDeque<>();
   }
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/CloseableFilterIterator.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/CloseableFilterIterator.java
new file mode 100644
index 0000000..c706d84
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/CloseableFilterIterator.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util.collection;
+
+import java.util.Iterator;
+import java.util.function.Predicate;
+
+/**
+ * {@link FilterIterator} requiring to be closed after iteration (to cleanup resources)
+ */
+public class CloseableFilterIterator<R> extends FilterIterator<R> implements ClosableIterator<R> {
+
+  public CloseableFilterIterator(Iterator<R> source, Predicate<R> filter) {
+    super(source, filter);
+  }
+
+  @Override
+  public void close() {
+    ((ClosableIterator<R>) source).close();
+  }
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/FilterIterator.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/FilterIterator.java
index 4ebc7c1..37b5565 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/FilterIterator.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/FilterIterator.java
@@ -29,7 +29,7 @@
  */
 public class FilterIterator<R> implements Iterator<R> {
 
-  private final Iterator<R> source;
+  protected final Iterator<R> source;
 
   private final Predicate<R> filter;
 
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupRecordBuffer.java b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupRecordBuffer.java
index 06552a9..71f0b35 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupRecordBuffer.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupRecordBuffer.java
@@ -153,6 +153,7 @@
                                            HoodieTableVersion tableVersion,
                                            String mergeStrategyId) {
     HoodieReaderContext readerContext = mock(HoodieReaderContext.class);
+    when(readerContext.getInstantRange()).thenReturn(Option.empty());
     when(readerContext.getHasBootstrapBaseFile()).thenReturn(false);
     when(readerContext.getHasLogFiles()).thenReturn(true);
     HoodieRecordMerger recordMerger = mock(HoodieRecordMerger.class);
@@ -290,7 +291,8 @@
             RecordMergeMode.COMMIT_TIME_ORDERING,
             props,
             readStats,
-            Option.empty());
+            Option.empty(),
+            false);
     when(readerContext.getValue(any(), any(), any())).thenReturn(null);
     assertFalse(keyBasedBuffer.isCustomDeleteRecord(record));
 
@@ -302,7 +304,8 @@
             RecordMergeMode.COMMIT_TIME_ORDERING,
             props,
             readStats,
-            Option.empty());
+            Option.empty(),
+            false);
     when(readerContext.getValue(any(), any(), any())).thenReturn("i");
     assertFalse(keyBasedBuffer.isCustomDeleteRecord(record));
     when(readerContext.getValue(any(), any(), any())).thenReturn("d");
@@ -323,7 +326,8 @@
             RecordMergeMode.COMMIT_TIME_ORDERING,
             props,
             readStats,
-            Option.empty());
+            Option.empty(),
+            false);
 
     // CASE 1: With custom delete marker.
     GenericRecord record = new GenericData.Record(schema);
diff --git a/hudi-examples/hudi-examples-flink/src/test/java/org/apache/hudi/examples/quickstart/TestQuickstartData.java b/hudi-examples/hudi-examples-flink/src/test/java/org/apache/hudi/examples/quickstart/TestQuickstartData.java
index 5e7613f..565543e 100644
--- a/hudi-examples/hudi-examples-flink/src/test/java/org/apache/hudi/examples/quickstart/TestQuickstartData.java
+++ b/hudi-examples/hudi-examples-flink/src/test/java/org/apache/hudi/examples/quickstart/TestQuickstartData.java
@@ -342,6 +342,7 @@
   /**
    * Returns the scanner to read avro log files.
    */
+  @Deprecated
   private static HoodieMergedLogRecordScanner getScanner(
       HoodieStorage storage,
       String basePath,
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java
index 690ef55..7159f9a 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java
@@ -185,7 +185,7 @@
       Supplier<InternalSchemaManager> internalSchemaManager = () -> InternalSchemaManager.get(conf, metaClient);
       // initialize storage conf lazily.
       StorageConfiguration<?> readerConf = writeClient.getEngineContext().getStorageConf();
-      return Option.of(new FlinkRowDataReaderContext(readerConf, internalSchemaManager, Collections.emptyList(), metaClient.getTableConfig()));
+      return Option.of(new FlinkRowDataReaderContext(readerConf, internalSchemaManager, Collections.emptyList(), metaClient.getTableConfig(), Option.empty()));
     } else {
       // always using avro record merger for legacy compaction since log scanner do not support rowdata reading yet.
       writeClient.getConfig().setRecordMergerClass(HoodieAvroRecordMerger.class.getName());
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
index 8088604..cfea2d2 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
@@ -22,7 +22,6 @@
 import org.apache.hudi.client.model.CommitTimeFlinkRecordMerger;
 import org.apache.hudi.client.model.EventTimeFlinkRecordMerger;
 import org.apache.hudi.client.model.HoodieFlinkRecord;
-import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.engine.EngineType;
 import org.apache.hudi.common.engine.HoodieReaderContext;
@@ -34,7 +33,9 @@
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.log.InstantRange;
 import org.apache.hudi.common.table.read.BufferedRecord;
+import org.apache.hudi.common.table.read.FileGroupReaderSchemaHandler;
 import org.apache.hudi.common.util.HoodieRecordUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
@@ -43,7 +44,8 @@
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.exception.HoodieValidationException;
 import org.apache.hudi.io.storage.HoodieIOFactory;
-import org.apache.hudi.source.ExpressionPredicates.Predicate;
+import org.apache.hudi.keygen.KeyGenUtils;
+import org.apache.hudi.source.ExpressionPredicates;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
@@ -52,6 +54,7 @@
 import org.apache.hudi.util.RowDataUtils;
 import org.apache.hudi.util.RowProjection;
 import org.apache.hudi.util.SchemaEvolvingRowDataProjection;
+import org.apache.hudi.util.StringToRowDataConverter;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
@@ -65,6 +68,7 @@
 import org.apache.flink.types.RowKind;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Supplier;
@@ -79,21 +83,28 @@
  * log files with Flink parquet reader.
  */
 public class FlinkRowDataReaderContext extends HoodieReaderContext<RowData> {
-  private final List<Predicate> predicates;
+  private final List<ExpressionPredicates.Predicate> predicates;
   private final Supplier<InternalSchemaManager> internalSchemaManager;
   private final boolean utcTimezone;
-  private final HoodieConfig hoodieConfig;
+  private final HoodieTableConfig tableConfig;
+  // the converter is used to create a RowData contains primary key fields only
+  // for DELETE cases, it'll not be initialized if primary key semantics is lost.
+  // For e.g, if the pk fields are [a, b] but user only select a, then the pk
+  // semantics is lost.
+  private StringToRowDataConverter recordKeyRowConverter;
 
   public FlinkRowDataReaderContext(
       StorageConfiguration<?> storageConfiguration,
       Supplier<InternalSchemaManager> internalSchemaManager,
-      List<Predicate> predicates,
-      HoodieTableConfig tableConfig) {
+      List<ExpressionPredicates.Predicate> predicates,
+      HoodieTableConfig tableConfig,
+      Option<InstantRange> instantRangeOpt) {
     super(storageConfiguration, tableConfig);
-    this.hoodieConfig = tableConfig;
+    this.tableConfig = tableConfig;
     this.internalSchemaManager = internalSchemaManager;
     this.predicates = predicates;
     this.utcTimezone = getStorageConfiguration().getBoolean(FlinkOptions.READ_UTC_TIMEZONE.key(), FlinkOptions.READ_UTC_TIMEZONE.defaultValue());
+    this.instantRangeOpt = instantRangeOpt;
   }
 
   @Override
@@ -111,9 +122,32 @@
     HoodieRowDataParquetReader rowDataParquetReader =
         (HoodieRowDataParquetReader) HoodieIOFactory.getIOFactory(storage)
             .getReaderFactory(HoodieRecord.HoodieRecordType.FLINK)
-            .getFileReader(hoodieConfig, filePath, HoodieFileFormat.PARQUET, Option.empty());
+            .getFileReader(tableConfig, filePath, HoodieFileFormat.PARQUET, Option.empty());
     DataType rowType = RowDataAvroQueryContexts.fromAvroSchema(dataSchema).getRowType();
-    return rowDataParquetReader.getRowDataIterator(schemaManager, rowType, requiredSchema);
+    return rowDataParquetReader.getRowDataIterator(schemaManager, rowType, requiredSchema, predicates);
+  }
+
+  @Override
+  public void setSchemaHandler(FileGroupReaderSchemaHandler<RowData> schemaHandler) {
+    super.setSchemaHandler(schemaHandler);
+
+    Option<String[]> recordKeysOpt = tableConfig.getRecordKeyFields();
+    if (recordKeysOpt.isEmpty()) {
+      return;
+    }
+    // primary key semantic is lost if not all primary key fields are included in the request schema.
+    boolean pkSemanticLost = Arrays.stream(recordKeysOpt.get()).anyMatch(k -> schemaHandler.getRequestedSchema().getField(k) == null);
+    if (pkSemanticLost) {
+      return;
+    }
+    // get primary key field position in required schema.
+    Schema requiredSchema = schemaHandler.getRequiredSchema();
+    int[] pkFieldsPos = Arrays.stream(recordKeysOpt.get())
+        .map(k -> Option.ofNullable(requiredSchema.getField(k)).map(Schema.Field::pos).orElse(-1))
+        .mapToInt(Integer::intValue)
+        .toArray();
+    recordKeyRowConverter = new StringToRowDataConverter(
+        pkFieldsPos, (RowType) RowDataAvroQueryContexts.fromAvroSchema(requiredSchema).getRowType().getLogicalType());
   }
 
   @Override
@@ -145,6 +179,11 @@
   }
 
   @Override
+  public String getMetaFieldValue(RowData record, int pos) {
+    return record.getString(pos).toString();
+  }
+
+  @Override
   public HoodieRecord<RowData> constructHoodieRecord(BufferedRecord<RowData> bufferedRecord) {
     HoodieKey hoodieKey = new HoodieKey(bufferedRecord.getRecordKey(), partitionPath);
     // delete record
@@ -257,6 +296,21 @@
   }
 
   @Override
+  public RowData getDeleteRow(RowData record, String recordKey) {
+    if (record != null) {
+      return record;
+    }
+    // don't need to emit record key row if primary key semantic is lost
+    if (recordKeyRowConverter == null) {
+      return null;
+    }
+    final String[] pkVals = KeyGenUtils.extractRecordKeys(recordKey);
+    RowData recordKeyRow = recordKeyRowConverter.convert(pkVals);
+    recordKeyRow.setRowKind(RowKind.DELETE);
+    return recordKeyRow;
+  }
+
+  @Override
   public RowData convertAvroRecord(IndexedRecord avroRecord) {
     Schema recordSchema = avroRecord.getSchema();
     AvroToRowDataConverters.AvroToRowDataConverter converter = RowDataAvroQueryContexts.fromAvroSchema(recordSchema, utcTimezone).getAvroToRowDataConverter();
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
index 811f68d..b4e5d14 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
@@ -135,6 +135,7 @@
     }
   }
 
+  @Deprecated
   public static HoodieMergedLogRecordScanner logScanner(
       MergeOnReadInputSplit split,
       Schema logSchema,
@@ -163,6 +164,7 @@
         .build();
   }
 
+  @Deprecated
   public static HoodieMergedLogRecordScanner logScanner(
       List<String> logPaths,
       Schema logSchema,
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataParquetReader.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataParquetReader.java
index 2902aea..e68ebcc 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataParquetReader.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataParquetReader.java
@@ -30,6 +30,7 @@
 import org.apache.hudi.io.storage.HoodieFileReader;
 import org.apache.hudi.io.storage.HoodieIOFactory;
 import org.apache.hudi.io.storage.row.parquet.ParquetSchemaConverter;
+import org.apache.hudi.source.ExpressionPredicates.Predicate;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.util.AvroSchemaConverter;
@@ -43,6 +44,7 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.Set;
@@ -81,7 +83,7 @@
 
   @Override
   public ClosableIterator<HoodieRecord<RowData>> getRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException {
-    ClosableIterator<RowData> rowDataItr = getRowDataIterator(InternalSchemaManager.DISABLED, getRowType(), requestedSchema);
+    ClosableIterator<RowData> rowDataItr = getRowDataIterator(InternalSchemaManager.DISABLED, getRowType(), requestedSchema, Collections.emptyList());
     readerIterators.add(rowDataItr);
     return new CloseableMappingIterator<>(rowDataItr, HoodieFlinkRecord::new);
   }
@@ -89,12 +91,16 @@
   @Override
   public ClosableIterator<String> getRecordKeyIterator() throws IOException {
     Schema schema = HoodieAvroUtils.getRecordKeySchema();
-    ClosableIterator<RowData> rowDataItr = getRowDataIterator(InternalSchemaManager.DISABLED, getRowType(), schema);
+    ClosableIterator<RowData> rowDataItr = getRowDataIterator(InternalSchemaManager.DISABLED, getRowType(), schema, Collections.emptyList());
     return new CloseableMappingIterator<>(rowDataItr, rowData -> Objects.toString(rowData.getString(0)));
   }
 
-  public ClosableIterator<RowData> getRowDataIterator(InternalSchemaManager internalSchemaManager, DataType dataType, Schema requestedSchema) throws IOException {
-    return RecordIterators.getParquetRecordIterator(storage.getConf(), internalSchemaManager, dataType, requestedSchema, path);
+  public ClosableIterator<RowData> getRowDataIterator(
+      InternalSchemaManager internalSchemaManager,
+      DataType dataType,
+      Schema requestedSchema,
+      List<Predicate> predicates) throws IOException {
+    return RecordIterators.getParquetRecordIterator(storage.getConf(), internalSchemaManager, dataType, requestedSchema, path, predicates);
   }
 
   @Override
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/RecordIterators.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/RecordIterators.java
index 8fd0f83..120e50e 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/RecordIterators.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/RecordIterators.java
@@ -43,7 +43,6 @@
 import org.apache.parquet.hadoop.util.SerializationUtil;
 
 import java.io.IOException;
-import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -64,7 +63,8 @@
       InternalSchemaManager internalSchemaManager,
       DataType dataType,
       Schema requestedSchema,
-      StoragePath path) throws IOException {
+      StoragePath path,
+      List<Predicate> predicates) throws IOException {
     List<String> fieldNames = ((RowType) dataType.getLogicalType()).getFieldNames();
     List<DataType> fieldTypes = dataType.getChildren();
     int[] selectedFields = requestedSchema.getFields().stream().map(Schema.Field::name)
@@ -87,7 +87,7 @@
         new org.apache.flink.core.fs.Path(path.toUri()),
         0L,
         Long.MAX_VALUE,
-        Collections.emptyList());
+        predicates);
   }
 
   public static ClosableIterator<RowData> getParquetRecordIterator(
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
index e8b766b..84c9053 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
@@ -135,7 +135,7 @@
       return getBaseFileIteratorWithMetadata(split.getBasePath().get());
     } else if (!split.getBasePath().isPresent()) {
       // log files only
-      return new LogFileOnlyIterator(getFullLogFileIterator(split));
+      return getFullLogFileIterator(split);
     } else {
       Schema tableSchema = new Schema.Parser().parse(this.tableState.getAvroSchema());
       return new MergeIterator(
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
index dcd7a5f..ae35cee 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
@@ -34,6 +34,7 @@
 import org.apache.hudi.common.table.log.InstantRange;
 import org.apache.hudi.common.table.read.HoodieFileGroupReader;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -42,10 +43,8 @@
 import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.internal.schema.InternalSchema;
-import org.apache.hudi.keygen.KeyGenUtils;
 import org.apache.hudi.source.ExpressionPredicates.Predicate;
 import org.apache.hudi.table.format.FilePathUtils;
 import org.apache.hudi.table.format.FlinkRowDataReaderContext;
@@ -58,11 +57,9 @@
 import org.apache.hudi.util.RowDataProjection;
 import org.apache.hudi.util.RowDataToAvroConverters;
 import org.apache.hudi.util.StreamerUtil;
-import org.apache.hudi.util.StringToRowDataConverter;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.GenericRecordBuilder;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
@@ -70,15 +67,12 @@
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplitAssigner;
-import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.RowKind;
 
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -91,7 +85,6 @@
 
 import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_COMMIT_TIME_COL_POS;
 import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS;
-import static org.apache.hudi.table.format.FormatUtils.buildAvroRecordBySchema;
 
 /**
  * The base InputFormat class to read from Hoodie data + log files.
@@ -208,51 +201,24 @@
   }
 
   protected ClosableIterator<RowData> initIterator(MergeOnReadInputSplit split) throws IOException {
-    if (!(split.getLogPaths().isPresent() && split.getLogPaths().get().size() > 0)) {
-      if (split.getInstantRange().isPresent()) {
-        // base file only with commit time filtering
-        return new BaseFileOnlyFilteringIterator(
-            split.getInstantRange().get(),
-            this.tableState.getRequiredRowType(),
-            this.requiredPos,
-            getBaseFileIterator(split.getBasePath().get(), getRequiredPosWithCommitTime(this.requiredPos)));
-      } else {
-        // base file only
-        return getBaseFileIterator(split.getBasePath().get());
-      }
-    } else if (!split.getBasePath().isPresent()) {
-      // log files only
+    String mergeType = split.getMergeType();
+    if (!split.getBasePath().isPresent()) {
       if (OptionsResolver.emitDeletes(conf)) {
-        return new LogFileOnlyIterator(getUnMergedLogFileIterator(split));
+        mergeType = FlinkOptions.REALTIME_SKIP_MERGE;
       } else {
-        return new LogFileOnlyIterator(getLogFileIterator(split));
+        // always merge records in log files if there is no base file (aligned with legacy behaviour)
+        mergeType = FlinkOptions.REALTIME_PAYLOAD_COMBINE;
       }
-    } else if (split.getMergeType().equals(FlinkOptions.REALTIME_SKIP_MERGE)) {
-      return new SkipMergeIterator(
-          getBaseFileIterator(split.getBasePath().get()),
-          getLogFileIterator(split));
-    } else if (split.getMergeType().equals(FlinkOptions.REALTIME_PAYLOAD_COMBINE)) {
-      return new MergeIterator(
-          conf,
-          hadoopConf,
-          split,
-          this.tableState.getRowType(),
-          this.tableState.getRequiredRowType(),
-          new Schema.Parser().parse(this.tableState.getAvroSchema()),
-          new Schema.Parser().parse(this.tableState.getRequiredAvroSchema()),
-          internalSchemaManager.getQuerySchema(),
-          this.requiredPos,
-          this.emitDelete,
-          this.tableState.getOperationPos(),
-          getBaseFileIteratorWithMetadata(split.getBasePath().get()));
-    } else {
-      throw new HoodieException("Unable to select an Iterator to read the Hoodie MOR File Split for "
-          + "file path: " + split.getBasePath()
-          + "log paths: " + split.getLogPaths()
-          + "hoodie table path: " + split.getTablePath()
-          + "flink partition Index: " + split.getSplitNumber()
-          + "merge type: " + split.getMergeType());
     }
+    ValidationUtils.checkArgument(
+        mergeType.equals(FlinkOptions.REALTIME_SKIP_MERGE) || mergeType.equals(FlinkOptions.REALTIME_PAYLOAD_COMBINE),
+        "Unable to select an Iterator to read the Hoodie MOR File Split for "
+            + "file path: " + split.getBasePath()
+            + "log paths: " + split.getLogPaths()
+            + "hoodie table path: " + split.getTablePath()
+            + "flink partition Index: " + split.getSplitNumber()
+            + "merge type: " + split.getMergeType());
+    return getSplitIterator(split, mergeType);
   }
 
   @Override
@@ -363,93 +329,15 @@
         predicates);
   }
 
-  private ClosableIterator<RowData> getLogFileIterator(MergeOnReadInputSplit split) {
-    final Schema tableSchema = new Schema.Parser().parse(tableState.getAvroSchema());
-    final Schema requiredSchema = new Schema.Parser().parse(tableState.getRequiredAvroSchema());
-    final GenericRecordBuilder recordBuilder = new GenericRecordBuilder(requiredSchema);
-    final AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter =
-        AvroToRowDataConverters.createRowConverter(tableState.getRequiredRowType(), conf.getBoolean(FlinkOptions.READ_UTC_TIMEZONE));
-    final HoodieMergedLogRecordScanner scanner = FormatUtils.logScanner(split, tableSchema, internalSchemaManager.getQuerySchema(), conf, hadoopConf);
-    final Iterator<String> logRecordsKeyIterator = scanner.getRecords().keySet().iterator();
-    final int[] pkOffset = tableState.getPkOffsetsInRequired();
-    // flag saying whether the pk semantics has been dropped by user specified
-    // projections. For e.g, if the pk fields are [a, b] but user only select a,
-    // then the pk semantics is lost.
-    final boolean pkSemanticLost = Arrays.stream(pkOffset).anyMatch(offset -> offset == -1);
-    final LogicalType[] pkTypes = pkSemanticLost ? null : tableState.getPkTypes(pkOffset);
-    final StringToRowDataConverter converter = pkSemanticLost ? null : new StringToRowDataConverter(pkTypes);
-
-    return new ClosableIterator<RowData>() {
-      private RowData currentRecord;
-
-      @Override
-      public boolean hasNext() {
-        while (logRecordsKeyIterator.hasNext()) {
-          String curAvroKey = logRecordsKeyIterator.next();
-          Option<IndexedRecord> curAvroRecord = null;
-          final HoodieAvroRecord<?> hoodieRecord = (HoodieAvroRecord) scanner.getRecords().get(curAvroKey);
-          try {
-            curAvroRecord = hoodieRecord.getData().getInsertValue(tableSchema);
-          } catch (IOException e) {
-            throw new HoodieException("Get avro insert value error for key: " + curAvroKey, e);
-          }
-          if (!curAvroRecord.isPresent()) {
-            // delete record found
-            if (emitDelete && !pkSemanticLost) {
-              GenericRowData delete = new GenericRowData(tableState.getRequiredRowType().getFieldCount());
-
-              final String recordKey = hoodieRecord.getRecordKey();
-              final String[] pkFields = KeyGenUtils.extractRecordKeys(recordKey);
-              final Object[] converted = converter.convert(pkFields);
-              for (int i = 0; i < pkOffset.length; i++) {
-                delete.setField(pkOffset[i], converted[i]);
-              }
-              delete.setRowKind(RowKind.DELETE);
-
-              this.currentRecord = delete;
-              return true;
-            }
-            // skipping if the condition is unsatisfied
-            // continue;
-          } else {
-            final IndexedRecord avroRecord = curAvroRecord.get();
-            final RowKind rowKind = FormatUtils.getRowKindSafely(avroRecord, tableState.getOperationPos());
-            if (rowKind == RowKind.DELETE && !emitDelete) {
-              // skip the delete record
-              continue;
-            }
-            GenericRecord requiredAvroRecord = buildAvroRecordBySchema(
-                avroRecord,
-                requiredSchema,
-                requiredPos,
-                recordBuilder);
-            currentRecord = (RowData) avroToRowDataConverter.convert(requiredAvroRecord);
-            currentRecord.setRowKind(rowKind);
-            return true;
-          }
-        }
-        return false;
-      }
-
-      @Override
-      public RowData next() {
-        return currentRecord;
-      }
-
-      @Override
-      public void close() {
-        scanner.close();
-      }
-    };
-  }
-
   /**
    * Get record iterator using {@link HoodieFileGroupReader}.
    *
    * @param split input split
-   * @return {@link RowData} iterator.
+   * @param mergeType merge type for FileGroup reader
+   *
+   * @return {@link RowData} iterator for the given split.
    */
-  private ClosableIterator<RowData> getUnMergedLogFileIterator(MergeOnReadInputSplit split) {
+  private ClosableIterator<RowData> getSplitIterator(MergeOnReadInputSplit split, String mergeType) throws IOException {
     final Schema tableSchema = new Schema.Parser().parse(tableState.getAvroSchema());
     final Schema requiredSchema = new Schema.Parser().parse(tableState.getRequiredAvroSchema());
 
@@ -466,11 +354,12 @@
             HadoopFSUtils.getStorageConf(hadoopConf),
             () -> internalSchemaManager,
             predicates,
-            metaClient.getTableConfig());
+            metaClient.getTableConfig(),
+            split.getInstantRange());
     TypedProperties typedProps = FlinkClientUtil.getMergedTableAndWriteProps(metaClient.getTableConfig(), writeConfig);
-    typedProps.put(HoodieReaderConfig.MERGE_TYPE.key(), HoodieReaderConfig.REALTIME_SKIP_MERGE);
+    typedProps.put(HoodieReaderConfig.MERGE_TYPE.key(), mergeType);
 
-    try (HoodieFileGroupReader<RowData> fileGroupReader = HoodieFileGroupReader.<RowData>newBuilder()
+    HoodieFileGroupReader<RowData> fileGroupReader = HoodieFileGroupReader.<RowData>newBuilder()
         .withReaderContext(readerContext)
         .withHoodieTableMetaClient(metaClient)
         .withLatestCommitTime(split.getLatestCommit())
@@ -480,11 +369,9 @@
         .withInternalSchema(Option.ofNullable(internalSchemaManager.getQuerySchema()))
         .withProps(typedProps)
         .withShouldUseRecordPosition(false)
-        .build()) {
-      return fileGroupReader.getClosableIterator();
-    } catch (IOException e) {
-      throw new HoodieUpsertException("Failed to compact file slice: " + fileSlice, e);
-    }
+        .withEmitDelete(emitDelete)
+        .build();
+    return fileGroupReader.getClosableIterator();
   }
 
   protected static Option<IndexedRecord> getInsertVal(HoodieAvroRecord<?> hoodieRecord, Schema tableSchema) {
@@ -551,130 +438,6 @@
   // -------------------------------------------------------------------------
   //  Inner Class
   // -------------------------------------------------------------------------
-  /**
-   * Base record iterator with instant time filtering.
-   */
-  static class BaseFileOnlyFilteringIterator implements ClosableIterator<RowData> {
-    // base file record iterator
-    private final ClosableIterator<RowData> nested;
-    private final InstantRange instantRange;
-    private final RowDataProjection projection;
-
-    private RowData currentRecord;
-
-    private int commitTimePos;
-
-    BaseFileOnlyFilteringIterator(
-        InstantRange instantRange,
-        RowType requiredRowType,
-        int[] requiredPos,
-        ClosableIterator<RowData> nested) {
-      this.nested = nested;
-      this.instantRange = instantRange;
-      this.commitTimePos = getCommitTimePos(requiredPos);
-      int[] positions;
-      if (commitTimePos < 0) {
-        commitTimePos = 0;
-        positions = IntStream.range(1, 1 + requiredPos.length).toArray();
-      } else {
-        positions = IntStream.range(0, requiredPos.length).toArray();
-      }
-      this.projection = RowDataProjection.instance(requiredRowType, positions);
-    }
-
-    @Override
-    public boolean hasNext() {
-      while (this.nested.hasNext()) {
-        currentRecord = this.nested.next();
-        boolean isInRange = instantRange.isInRange(currentRecord.getString(commitTimePos).toString());
-        if (isInRange) {
-          return true;
-        }
-      }
-      return false;
-    }
-
-    @Override
-    public RowData next() {
-      // can promote: no need to project with null instant range
-      return projection.project(currentRecord);
-    }
-
-    @Override
-    public void close() {
-      if (this.nested != null) {
-        this.nested.close();
-      }
-    }
-  }
-
-  protected static class LogFileOnlyIterator implements ClosableIterator<RowData> {
-    // iterator for log files
-    private final ClosableIterator<RowData> iterator;
-
-    public LogFileOnlyIterator(ClosableIterator<RowData> iterator) {
-      this.iterator = iterator;
-    }
-
-    @Override
-    public boolean hasNext() {
-      return this.iterator.hasNext();
-    }
-
-    @Override
-    public RowData next() {
-      return this.iterator.next();
-    }
-
-    @Override
-    public void close() {
-      if (this.iterator != null) {
-        this.iterator.close();
-      }
-    }
-  }
-
-  static class SkipMergeIterator implements ClosableIterator<RowData> {
-    // base file record iterator
-    private final ClosableIterator<RowData> nested;
-    // iterator for log files
-    private final ClosableIterator<RowData> iterator;
-
-    private RowData currentRecord;
-
-    SkipMergeIterator(ClosableIterator<RowData> nested, ClosableIterator<RowData> iterator) {
-      this.nested = nested;
-      this.iterator = iterator;
-    }
-
-    @Override
-    public boolean hasNext() {
-      if (this.nested.hasNext()) {
-        currentRecord = this.nested.next();
-        return true;
-      }
-      if (this.iterator.hasNext()) {
-        currentRecord = this.iterator.next();
-        return true;
-      }
-      return false;
-    }
-
-    @Override
-    public RowData next() {
-      return currentRecord;
-    }
-
-    @Override
-    public void close() {
-      if (this.nested != null) {
-        this.nested.close();
-      }
-      if (this.iterator != null) {
-        this.iterator.close();
-      }
-    }
-  }
 
   protected static class MergeIterator implements ClosableIterator<RowData> {
     // base file record iterator
@@ -710,26 +473,6 @@
         RowType tableRowType,
         RowType requiredRowType,
         Schema tableSchema,
-        Schema requiredSchema,
-        InternalSchema querySchema,
-        int[] requiredPos,
-        boolean emitDelete,
-        int operationPos,
-        ClosableIterator<RowData> nested) { // the iterator should be with full schema
-      this(flinkConf, hadoopConf, split, tableRowType, requiredRowType, tableSchema,
-          querySchema,
-          Option.of(RowDataProjection.instance(requiredRowType, requiredPos)),
-          Option.of(record -> buildAvroRecordBySchema(record, requiredSchema, requiredPos, new GenericRecordBuilder(requiredSchema))),
-          emitDelete, operationPos, nested);
-    }
-
-    public MergeIterator(
-        Configuration flinkConf,
-        org.apache.hadoop.conf.Configuration hadoopConf,
-        MergeOnReadInputSplit split,
-        RowType tableRowType,
-        RowType requiredRowType,
-        Schema tableSchema,
         InternalSchema querySchema,
         Option<RowDataProjection> projection,
         Option<Function<IndexedRecord, GenericRecord>> avroProjection,
@@ -905,25 +648,6 @@
   //  Utilities
   // -------------------------------------------------------------------------
 
-  private static int[] getRequiredPosWithCommitTime(int[] requiredPos) {
-    if (getCommitTimePos(requiredPos) >= 0) {
-      return requiredPos;
-    }
-    int[] requiredPos2 = new int[requiredPos.length + 1];
-    requiredPos2[0] = HOODIE_COMMIT_TIME_COL_POS;
-    System.arraycopy(requiredPos, 0, requiredPos2, 1, requiredPos.length);
-    return requiredPos2;
-  }
-
-  private static int getCommitTimePos(int[] requiredPos) {
-    for (int i = 0; i < requiredPos.length; i++) {
-      if (requiredPos[i] == HOODIE_COMMIT_TIME_COL_POS) {
-        return i;
-      }
-    }
-    return -1;
-  }
-
   @VisibleForTesting
   public void isEmitDelete(boolean emitDelete) {
     this.emitDelete = emitDelete;
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java
index 36dfecb..7f55b38 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java
@@ -20,11 +20,9 @@
 
 import org.apache.hudi.common.model.HoodieRecord;
 
-import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 
 import java.io.Serializable;
-import java.util.Arrays;
 import java.util.List;
 
 /**
@@ -89,29 +87,4 @@
         .mapToInt(i -> i)
         .toArray();
   }
-
-  /**
-   * Get the primary key positions in required row type.
-   */
-  public int[] getPkOffsetsInRequired() {
-    final List<String> fieldNames = requiredRowType.getFieldNames();
-    return Arrays.stream(pkFields)
-        .map(fieldNames::indexOf)
-        .mapToInt(i -> i)
-        .toArray();
-  }
-
-  /**
-   * Returns the primary key fields logical type with given offsets.
-   *
-   * @param pkOffsets the pk offsets in required row type
-   * @return pk field logical types
-   * @see #getPkOffsetsInRequired()
-   */
-  public LogicalType[] getPkTypes(int[] pkOffsets) {
-    final LogicalType[] requiredTypes = requiredRowType.getFields().stream()
-        .map(RowType.RowField::getType).toArray(LogicalType[]::new);
-    return Arrays.stream(pkOffsets).mapToObj(offset -> requiredTypes[offset])
-        .toArray(LogicalType[]::new);
-  }
 }
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StringToRowDataConverter.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StringToRowDataConverter.java
index 6c4aae3..b9f1baf 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StringToRowDataConverter.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StringToRowDataConverter.java
@@ -22,10 +22,13 @@
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.TimestampType;
 
 import java.math.BigDecimal;
@@ -44,22 +47,26 @@
 @Internal
 public class StringToRowDataConverter {
   private final Converter[] converters;
+  private final int[] fieldsPos;
+  private final int rowArity;
 
-  public StringToRowDataConverter(LogicalType[] fieldTypes) {
-    this.converters = Arrays.stream(fieldTypes)
-        .map(StringToRowDataConverter::getConverter)
+  public StringToRowDataConverter(int[] fieldsPos, RowType rowType) {
+    this.fieldsPos = fieldsPos;
+    this.rowArity = rowType.getFieldCount();
+    this.converters = Arrays.stream(fieldsPos)
+        .mapToObj(f -> getConverter(rowType.getTypeAt(f)))
         .toArray(Converter[]::new);
   }
 
-  public Object[] convert(String[] fields) {
+  public RowData convert(String[] fields) {
     ValidationUtils.checkArgument(converters.length == fields.length,
         "Field types and values should equal with number");
 
-    Object[] converted = new Object[fields.length];
+    GenericRowData rowData = new GenericRowData(rowArity);
     for (int i = 0; i < fields.length; i++) {
-      converted[i] = converters[i].convert(fields[i]);
+      rowData.setField(fieldsPos[i], converters[i].convert(fields[i]));
     }
-    return converted;
+    return rowData;
   }
 
   private interface Converter {
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
index 0d3fde9..e915581 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
@@ -110,7 +110,8 @@
         storageConf,
         () -> InternalSchemaManager.DISABLED,
         Collections.emptyList(),
-        metaClient.getTableConfig());
+        metaClient.getTableConfig(),
+        Option.empty());
   }
 
   @Override
@@ -164,7 +165,7 @@
     HoodieTableConfig tableConfig = Mockito.mock(HoodieTableConfig.class);
     when(tableConfig.populateMetaFields()).thenReturn(true);
     FlinkRowDataReaderContext readerContext =
-        new FlinkRowDataReaderContext(getStorageConf(), () -> InternalSchemaManager.DISABLED, Collections.emptyList(), tableConfig);
+        new FlinkRowDataReaderContext(getStorageConf(), () -> InternalSchemaManager.DISABLED, Collections.emptyList(), tableConfig, Option.empty());
     Schema schema = SchemaBuilder.builder()
         .record("test")
         .fields()
@@ -182,7 +183,7 @@
     HoodieTableConfig tableConfig = Mockito.mock(HoodieTableConfig.class);
     when(tableConfig.populateMetaFields()).thenReturn(true);
     FlinkRowDataReaderContext readerContext =
-        new FlinkRowDataReaderContext(getStorageConf(), () -> InternalSchemaManager.DISABLED, Collections.emptyList(), tableConfig);
+        new FlinkRowDataReaderContext(getStorageConf(), () -> InternalSchemaManager.DISABLED, Collections.emptyList(), tableConfig, Option.empty());
     Schema schema = SchemaBuilder.builder()
         .record("test")
         .fields()
@@ -200,7 +201,7 @@
     when(tableConfig.populateMetaFields()).thenReturn(false);
     when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new String[] {"field1"}));
     FlinkRowDataReaderContext readerContext =
-        new FlinkRowDataReaderContext(getStorageConf(), () -> InternalSchemaManager.DISABLED, Collections.emptyList(), tableConfig);
+        new FlinkRowDataReaderContext(getStorageConf(), () -> InternalSchemaManager.DISABLED, Collections.emptyList(), tableConfig, Option.empty());
     Schema schema = SchemaBuilder.builder()
         .record("test")
         .fields()
@@ -218,7 +219,7 @@
     when(tableConfig.populateMetaFields()).thenReturn(false);
     when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new String[] {"field1", "field2"}));
     FlinkRowDataReaderContext readerContext =
-        new FlinkRowDataReaderContext(getStorageConf(), () -> InternalSchemaManager.DISABLED, Collections.emptyList(), tableConfig);
+        new FlinkRowDataReaderContext(getStorageConf(), () -> InternalSchemaManager.DISABLED, Collections.emptyList(), tableConfig, Option.empty());
 
     Schema schema = SchemaBuilder.builder()
         .record("test")
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
index bbf7e04..e5a497e 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
@@ -305,6 +305,31 @@
   }
 
   @Test
+  void testReadWithDeletes() throws Exception {
+    Map<String, String> options = new HashMap<>();
+    options.put(FlinkOptions.RECORD_KEY_FIELD.key(), "uuid");
+    beforeEach(HoodieTableType.MERGE_ON_READ, options);
+
+    // write another commit to read again
+    TestData.writeData(TestData.DATA_SET_UPDATE_DELETE, conf);
+
+    InputFormat<RowData, ?> inputFormat = this.tableSource.getInputFormat();
+    assertThat(inputFormat, instanceOf(MergeOnReadInputFormat.class));
+    ((MergeOnReadInputFormat) inputFormat).isEmitDelete(true);
+
+    List<RowData> result = readData(inputFormat);
+
+    final String actual = TestData.rowDataToString(result);
+    final String expected = "["
+        + "+I[id1, Danny, 24, 1970-01-01T00:00:00.001, par1], "
+        + "+I[id2, Stephen, 34, 1970-01-01T00:00:00.002, par1], "
+        + "-D[id3, null, null, null, null], "
+        + "-D[id5, null, null, null, null], "
+        + "-D[id9, null, null, null, null]]";
+    assertThat(actual, is(expected));
+  }
+
+  @Test
   void testReadWithDeletesMOR() throws Exception {
     Map<String, String> options = new HashMap<>();
     options.put(FlinkOptions.CHANGELOG_ENABLED.key(), "true");
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index 1c671e3..ad8d2d4 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -982,6 +982,7 @@
   /**
    * Returns the scanner to read avro log files.
    */
+  @Deprecated
   private static HoodieMergedLogRecordScanner getScanner(
       HoodieStorage storage,
       String basePath,
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestStringToRowDataConverter.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestStringToRowDataConverter.java
index 8f7ecad..b5f140b 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestStringToRowDataConverter.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestStringToRowDataConverter.java
@@ -27,6 +27,7 @@
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.data.DecimalData;
 import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
@@ -41,7 +42,7 @@
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 
 /**
  * Test cases for {@link StringToRowDataConverter}.
@@ -59,8 +60,9 @@
         DataTypes.TIMESTAMP(6).getLogicalType(),
         DataTypes.DECIMAL(7, 2).getLogicalType()
     };
-    StringToRowDataConverter converter = new StringToRowDataConverter(fieldTypes);
-    Object[] converted = converter.convert(fields);
+    RowType rowType = RowType.of(fieldTypes);
+    StringToRowDataConverter converter = new StringToRowDataConverter(new int[]{0, 1, 2, 3, 4, 5, 6}, rowType);
+    RowData actual = converter.convert(fields);
     Object[] expected = new Object[] {
         1.1f, 3.4D, (int) LocalDate.parse("2021-03-30").toEpochDay(),
         LocalTime.parse("15:44:29").get(ChronoField.MILLI_OF_DAY),
@@ -68,7 +70,8 @@
         TimestampData.fromInstant(Instant.parse("2021-03-30T15:44:29.666111Z")),
         DecimalData.fromBigDecimal(new BigDecimal("12345.67"), 7, 2)
     };
-    assertArrayEquals(expected, converted);
+    GenericRowData expectedRow = GenericRowData.of(expected);
+    assertEquals(expectedRow, actual);
   }
 
   @Test
@@ -97,15 +100,10 @@
     GenericRecord avroRecord =
         (GenericRecord) converter.convert(AvroSchemaConverter.convertToSchema(rowType), rowData);
     StringToRowDataConverter stringToRowDataConverter =
-        new StringToRowDataConverter(rowType.getChildren().toArray(new LogicalType[0]));
+        new StringToRowDataConverter(new int[]{0, 1, 2, 3, 4, 5, 6}, rowType);
     final String recordKey = KeyGenUtils.getRecordKey(avroRecord, rowType.getFieldNames(), false);
     final String[] recordKeys = KeyGenUtils.extractRecordKeys(recordKey);
-    Object[] convertedKeys = stringToRowDataConverter.convert(recordKeys);
-
-    GenericRowData converted = new GenericRowData(7);
-    for (int i = 0; i < 7; i++) {
-      converted.setField(i, convertedKeys[i]);
-    }
+    RowData converted = stringToRowDataConverter.convert(recordKeys);
     assertThat(converted, is(rowData));
   }
 }
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
index fa05136..8941d6b 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
@@ -178,6 +178,11 @@
   }
 
   @Override
+  public ArrayWritable getDeleteRow(ArrayWritable record, String recordKey) {
+    throw new UnsupportedOperationException("Not supported for " + this.getClass().getSimpleName());
+  }
+
+  @Override
   public Option<HoodieRecordMerger> getRecordMerger(RecordMergeMode mergeMode, String mergeStrategyId, String mergeImplClasses) {
     // TODO(HUDI-7843):
     // get rid of event time and commit time ordering. Just return Option.empty
@@ -203,6 +208,11 @@
   }
 
   @Override
+  public String getMetaFieldValue(ArrayWritable record, int pos) {
+    return record.get()[pos].toString();
+  }
+
+  @Override
   public boolean castToBoolean(Object value) {
     if (value instanceof BooleanWritable) {
       return ((BooleanWritable) value).get();
diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestPositionBasedFileGroupRecordBuffer.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestPositionBasedFileGroupRecordBuffer.java
index 47e7f42..4211b24 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestPositionBasedFileGroupRecordBuffer.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestPositionBasedFileGroupRecordBuffer.java
@@ -138,7 +138,8 @@
         baseFileInstantTime,
         props,
         readStats,
-        Option.of("timestamp"));
+        Option.of("timestamp"),
+        false);
   }
 
   public Map<HoodieLogBlock.HeaderMetadataType, String> getHeader(boolean shouldWriteRecordPositions,