[HUDI-1809] Flink merge on read input split uses wrong base file path for default merge type (#2846)

diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
index 12bebdf..1186cff 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
@@ -29,6 +29,7 @@
 import org.apache.hudi.table.format.cow.ParquetColumnarRowSplitReader;
 import org.apache.hudi.table.format.cow.ParquetSplitReaderUtil;
 import org.apache.hudi.util.AvroToRowDataConverters;
+import org.apache.hudi.util.RowDataProjection;
 import org.apache.hudi.util.RowDataToAvroConverters;
 import org.apache.hudi.util.StreamerUtil;
 import org.apache.hudi.util.StringToRowDataConverter;
@@ -63,6 +64,7 @@
 
 import static org.apache.flink.table.data.vector.VectorizedColumnBatch.DEFAULT_SIZE;
 import static org.apache.flink.table.filesystem.RowPartitionComputer.restorePartValueFromType;
+import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_COMMIT_TIME_COL_POS;
 import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS;
 import static org.apache.hudi.table.format.FormatUtils.buildAvroRecordBySchema;
 
@@ -180,7 +182,7 @@
           new Schema.Parser().parse(this.tableState.getAvroSchema()),
           new Schema.Parser().parse(this.tableState.getRequiredAvroSchema()),
           this.requiredPos,
-          getFullSchemaReader(split.getTablePath()));
+          getFullSchemaReader(split.getBasePath().get()));
     } else {
       throw new HoodieException("Unable to select an Iterator to read the Hoodie MOR File Split for "
           + "file path: " + split.getBasePath()
@@ -337,7 +339,7 @@
             // efficient.
             if (split.getInstantRange().isPresent()) {
               // based on the fact that commit time is always the first field
-              String commitTime = curAvroRecord.get().get(0).toString();
+              String commitTime = curAvroRecord.get().get(HOODIE_COMMIT_TIME_COL_POS).toString();
               if (!split.getInstantRange().get().isInRange(commitTime)) {
                 // filter out the records that are not in range
                 return hasNext();
@@ -431,6 +433,11 @@
     // iterator for log files
     private final Iterator<RowData> iterator;
 
+    // add the flag because the flink ParquetColumnarRowSplitReader is buggy:
+    // method #reachedEnd() returns false after it returns true.
+    // refactor it out once FLINK-22370 is resolved.
+    private boolean readLogs = false;
+
     private RowData currentRecord;
 
     SkipMergeIterator(ParquetColumnarRowSplitReader reader, Iterator<RowData> iterator) {
@@ -440,10 +447,11 @@
 
     @Override
     public boolean reachedEnd() throws IOException {
-      if (!this.reader.reachedEnd()) {
+      if (!readLogs && !this.reader.reachedEnd()) {
         currentRecord = this.reader.nextRecord();
         return false;
       }
+      readLogs = true;
       if (this.iterator.hasNext()) {
         currentRecord = this.iterator.next();
         return false;
@@ -479,6 +487,12 @@
     private final AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter;
     private final GenericRecordBuilder recordBuilder;
 
+    private final RowDataProjection projection;
+    // add the flag because the flink ParquetColumnarRowSplitReader is buggy:
+    // method #reachedEnd() returns false after it returns true.
+    // refactor it out once FLINK-22370 is resolved.
+    private boolean readLogs = false;
+
     private Set<String> keyToSkip = new HashSet<>();
 
     private RowData currentRecord;
@@ -501,11 +515,12 @@
       this.recordBuilder = new GenericRecordBuilder(requiredSchema);
       this.rowDataToAvroConverter = RowDataToAvroConverters.createConverter(tableRowType);
       this.avroToRowDataConverter = AvroToRowDataConverters.createRowConverter(requiredRowType);
+      this.projection = RowDataProjection.instance(requiredRowType, requiredPos);
     }
 
     @Override
     public boolean reachedEnd() throws IOException {
-      if (!this.reader.reachedEnd()) {
+      if (!readLogs && !this.reader.reachedEnd()) {
         currentRecord = this.reader.nextRecord();
         final String curKey = currentRecord.getString(HOODIE_RECORD_KEY_COL_POS).toString();
         if (logRecords.containsKey(curKey)) {
@@ -524,19 +539,18 @@
             return false;
           }
         }
+        // project the full record in base with required positions
+        currentRecord = projection.project(currentRecord);
         return false;
       } else {
-        if (logKeysIterator.hasNext()) {
+        readLogs = true;
+        while (logKeysIterator.hasNext()) {
           final String curKey = logKeysIterator.next();
-          if (keyToSkip.contains(curKey)) {
-            return reachedEnd();
-          } else {
+          if (!keyToSkip.contains(curKey)) {
             Option<IndexedRecord> insertAvroRecord =
                 logRecords.get(curKey).getData().getInsertValue(tableSchema);
-            if (!insertAvroRecord.isPresent()) {
-              // stand alone delete record, skipping
-              return reachedEnd();
-            } else {
+            if (insertAvroRecord.isPresent()) {
+              // the record is a DELETE if insertAvroRecord not present, skipping
               GenericRecord requiredAvroRecord = buildAvroRecordBySchema(
                   insertAvroRecord.get(),
                   requiredSchema,
diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/RowDataProjection.java b/hudi-flink/src/main/java/org/apache/hudi/util/RowDataProjection.java
new file mode 100644
index 0000000..67bb8ca
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/util/RowDataProjection.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.util;
+
+import org.apache.hudi.common.util.ValidationUtils;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+/**
+ * Utilities to project the row data with given positions.
+ */
+public class RowDataProjection {
+  private final RowData.FieldGetter[] fieldGetters;
+
+  private RowDataProjection(LogicalType[] types, int[] positions) {
+    ValidationUtils.checkArgument(types.length == positions.length,
+        "types and positions should have the equal number");
+    this.fieldGetters = new RowData.FieldGetter[types.length];
+    for (int i = 0; i < types.length; i++) {
+      final LogicalType type = types[i];
+      final int pos = positions[i];
+      this.fieldGetters[i] = RowData.createFieldGetter(type, pos);
+    }
+  }
+
+  public static RowDataProjection instance(RowType rowType, int[] positions) {
+    final LogicalType[] types = rowType.getChildren().toArray(new LogicalType[0]);
+    return new RowDataProjection(types, positions);
+  }
+
+  /**
+   * Returns the projected row data.
+   */
+  public RowData project(RowData rowData) {
+    GenericRowData genericRowData = new GenericRowData(this.fieldGetters.length);
+    for (int i = 0; i < this.fieldGetters.length; i++) {
+      final Object val = this.fieldGetters[i].getFieldOrNull(rowData);
+      genericRowData.setField(i, val);
+    }
+    return genericRowData;
+  }
+}
diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java b/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
index 5e324de..fcfa7cf 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
@@ -118,6 +118,7 @@
 
     // write parquet first with compaction
     conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, true);
+    conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
     TestData.writeData(TestData.DATA_SET_INSERT, conf);
 
     InputFormat<RowData, ?> inputFormat = this.tableSource.getInputFormat();