[HUDI-1809] Flink merge on read input split uses wrong base file path for default merge type (#2846)
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
index 12bebdf..1186cff 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
@@ -29,6 +29,7 @@
import org.apache.hudi.table.format.cow.ParquetColumnarRowSplitReader;
import org.apache.hudi.table.format.cow.ParquetSplitReaderUtil;
import org.apache.hudi.util.AvroToRowDataConverters;
+import org.apache.hudi.util.RowDataProjection;
import org.apache.hudi.util.RowDataToAvroConverters;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.util.StringToRowDataConverter;
@@ -63,6 +64,7 @@
import static org.apache.flink.table.data.vector.VectorizedColumnBatch.DEFAULT_SIZE;
import static org.apache.flink.table.filesystem.RowPartitionComputer.restorePartValueFromType;
+import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_COMMIT_TIME_COL_POS;
import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS;
import static org.apache.hudi.table.format.FormatUtils.buildAvroRecordBySchema;
@@ -180,7 +182,7 @@
new Schema.Parser().parse(this.tableState.getAvroSchema()),
new Schema.Parser().parse(this.tableState.getRequiredAvroSchema()),
this.requiredPos,
- getFullSchemaReader(split.getTablePath()));
+ getFullSchemaReader(split.getBasePath().get()));
} else {
throw new HoodieException("Unable to select an Iterator to read the Hoodie MOR File Split for "
+ "file path: " + split.getBasePath()
@@ -337,7 +339,7 @@
// efficient.
if (split.getInstantRange().isPresent()) {
// based on the fact that commit time is always the first field
- String commitTime = curAvroRecord.get().get(0).toString();
+ String commitTime = curAvroRecord.get().get(HOODIE_COMMIT_TIME_COL_POS).toString();
if (!split.getInstantRange().get().isInRange(commitTime)) {
// filter out the records that are not in range
return hasNext();
@@ -431,6 +433,11 @@
// iterator for log files
private final Iterator<RowData> iterator;
+ // add the flag because the flink ParquetColumnarRowSplitReader is buggy:
+ // method #reachedEnd() returns false after it returns true.
+ // refactor it out once FLINK-22370 is resolved.
+ private boolean readLogs = false;
+
private RowData currentRecord;
SkipMergeIterator(ParquetColumnarRowSplitReader reader, Iterator<RowData> iterator) {
@@ -440,10 +447,11 @@
@Override
public boolean reachedEnd() throws IOException {
- if (!this.reader.reachedEnd()) {
+ if (!readLogs && !this.reader.reachedEnd()) {
currentRecord = this.reader.nextRecord();
return false;
}
+ readLogs = true;
if (this.iterator.hasNext()) {
currentRecord = this.iterator.next();
return false;
@@ -479,6 +487,12 @@
private final AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter;
private final GenericRecordBuilder recordBuilder;
+ private final RowDataProjection projection;
+ // add the flag because the flink ParquetColumnarRowSplitReader is buggy:
+ // method #reachedEnd() returns false after it returns true.
+ // refactor it out once FLINK-22370 is resolved.
+ private boolean readLogs = false;
+
private Set<String> keyToSkip = new HashSet<>();
private RowData currentRecord;
@@ -501,11 +515,12 @@
this.recordBuilder = new GenericRecordBuilder(requiredSchema);
this.rowDataToAvroConverter = RowDataToAvroConverters.createConverter(tableRowType);
this.avroToRowDataConverter = AvroToRowDataConverters.createRowConverter(requiredRowType);
+ this.projection = RowDataProjection.instance(requiredRowType, requiredPos);
}
@Override
public boolean reachedEnd() throws IOException {
- if (!this.reader.reachedEnd()) {
+ if (!readLogs && !this.reader.reachedEnd()) {
currentRecord = this.reader.nextRecord();
final String curKey = currentRecord.getString(HOODIE_RECORD_KEY_COL_POS).toString();
if (logRecords.containsKey(curKey)) {
@@ -524,19 +539,18 @@
return false;
}
}
+ // project the full record in base with required positions
+ currentRecord = projection.project(currentRecord);
return false;
} else {
- if (logKeysIterator.hasNext()) {
+ readLogs = true;
+ while (logKeysIterator.hasNext()) {
final String curKey = logKeysIterator.next();
- if (keyToSkip.contains(curKey)) {
- return reachedEnd();
- } else {
+ if (!keyToSkip.contains(curKey)) {
Option<IndexedRecord> insertAvroRecord =
logRecords.get(curKey).getData().getInsertValue(tableSchema);
- if (!insertAvroRecord.isPresent()) {
- // stand alone delete record, skipping
- return reachedEnd();
- } else {
+ if (insertAvroRecord.isPresent()) {
+ // the record is a DELETE if insertAvroRecord not present, skipping
GenericRecord requiredAvroRecord = buildAvroRecordBySchema(
insertAvroRecord.get(),
requiredSchema,
diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/RowDataProjection.java b/hudi-flink/src/main/java/org/apache/hudi/util/RowDataProjection.java
new file mode 100644
index 0000000..67bb8ca
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/util/RowDataProjection.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.util;
+
+import org.apache.hudi.common.util.ValidationUtils;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+/**
+ * Utilities to project the row data with given positions.
+ */
+public class RowDataProjection {
+ private final RowData.FieldGetter[] fieldGetters;
+
+ private RowDataProjection(LogicalType[] types, int[] positions) {
+ ValidationUtils.checkArgument(types.length == positions.length,
+ "types and positions should have the equal number");
+ this.fieldGetters = new RowData.FieldGetter[types.length];
+ for (int i = 0; i < types.length; i++) {
+ final LogicalType type = types[i];
+ final int pos = positions[i];
+ this.fieldGetters[i] = RowData.createFieldGetter(type, pos);
+ }
+ }
+
+ public static RowDataProjection instance(RowType rowType, int[] positions) {
+ final LogicalType[] types = rowType.getChildren().toArray(new LogicalType[0]);
+ return new RowDataProjection(types, positions);
+ }
+
+ /**
+ * Returns the projected row data.
+ */
+ public RowData project(RowData rowData) {
+ GenericRowData genericRowData = new GenericRowData(this.fieldGetters.length);
+ for (int i = 0; i < this.fieldGetters.length; i++) {
+ final Object val = this.fieldGetters[i].getFieldOrNull(rowData);
+ genericRowData.setField(i, val);
+ }
+ return genericRowData;
+ }
+}
diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java b/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
index 5e324de..fcfa7cf 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
@@ -118,6 +118,7 @@
// write parquet first with compaction
conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, true);
+ conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
TestData.writeData(TestData.DATA_SET_INSERT, conf);
InputFormat<RowData, ?> inputFormat = this.tableSource.getInputFormat();