blob: 1186cff8ae31f8c4484098182103fe23ab1be460 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.format.mor;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.keygen.KeyGenUtils;
import org.apache.hudi.table.format.FilePathUtils;
import org.apache.hudi.table.format.FormatUtils;
import org.apache.hudi.table.format.cow.ParquetColumnarRowSplitReader;
import org.apache.hudi.table.format.cow.ParquetSplitReaderUtil;
import org.apache.hudi.util.AvroToRowDataConverters;
import org.apache.hudi.util.RowDataProjection;
import org.apache.hudi.util.RowDataToAvroConverters;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.util.StringToRowDataConverter;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.generic.IndexedRecord;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.IntStream;
import static org.apache.flink.table.data.vector.VectorizedColumnBatch.DEFAULT_SIZE;
import static org.apache.flink.table.filesystem.RowPartitionComputer.restorePartValueFromType;
import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_COMMIT_TIME_COL_POS;
import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS;
import static org.apache.hudi.table.format.FormatUtils.buildAvroRecordBySchema;
/**
* The base InputFormat class to read from Hoodie data + log files.
*
* <P>Use {@link org.apache.flink.formats.parquet.utils.ParquetRecordReader}
* to read files instead of {@link org.apache.flink.core.fs.FSDataInputStream},
* overrides {@link #createInputSplits(int)} and {@link #close()} to change the behaviors.
*/
public class MergeOnReadInputFormat
extends RichInputFormat<RowData, MergeOnReadInputSplit> {
private static final long serialVersionUID = 1L;
private final Configuration conf;
private transient org.apache.hadoop.conf.Configuration hadoopConf;
private Path[] paths;
private final MergeOnReadTableState tableState;
/**
* Uniform iterator view for the underneath records.
*/
private transient RecordIterator iterator;
// for project push down
/**
* Full table names.
*/
private final List<String> fieldNames;
/**
* Full field data types.
*/
private final List<DataType> fieldTypes;
/**
* Default partition name when the field value is null.
*/
private final String defaultPartName;
/**
* Required field positions.
*/
private final int[] requiredPos;
// for limit push down
/**
* Limit for the reader, -1 when the reading is not limited.
*/
private final long limit;
/**
* Recording the current read count for limit check.
*/
private long currentReadCount = 0;
/**
* Flag saying whether to emit the deletes. In streaming read mode, downstream
* operators need the delete messages to retract the legacy accumulator.
*/
private boolean emitDelete;
private MergeOnReadInputFormat(
Configuration conf,
Path[] paths,
MergeOnReadTableState tableState,
List<DataType> fieldTypes,
String defaultPartName,
long limit,
boolean emitDelete) {
this.conf = conf;
this.paths = paths;
this.tableState = tableState;
this.fieldNames = tableState.getRowType().getFieldNames();
this.fieldTypes = fieldTypes;
this.defaultPartName = defaultPartName;
// Needs improvement: this requiredPos is only suitable for parquet reader,
// because we need to
this.requiredPos = tableState.getRequiredPositions();
this.limit = limit;
this.emitDelete = emitDelete;
}
/**
* Returns the builder for {@link MergeOnReadInputFormat}.
*/
public static Builder builder() {
return new Builder();
}
@Override
public void open(MergeOnReadInputSplit split) throws IOException {
this.currentReadCount = 0L;
this.hadoopConf = StreamerUtil.getHadoopConf();
if (!split.getLogPaths().isPresent()) {
// base file only
this.iterator = new BaseFileOnlyIterator(getRequiredSchemaReader(split.getBasePath().get()));
} else if (!split.getBasePath().isPresent()) {
// log files only
this.iterator = new LogFileOnlyIterator(getLogFileIterator(split));
} else if (split.getMergeType().equals(FlinkOptions.REALTIME_SKIP_MERGE)) {
this.iterator = new SkipMergeIterator(
getRequiredSchemaReader(split.getBasePath().get()),
getLogFileIterator(split));
} else if (split.getMergeType().equals(FlinkOptions.REALTIME_PAYLOAD_COMBINE)) {
this.iterator = new MergeIterator(
hadoopConf,
split,
this.tableState.getRowType(),
this.tableState.getRequiredRowType(),
new Schema.Parser().parse(this.tableState.getAvroSchema()),
new Schema.Parser().parse(this.tableState.getRequiredAvroSchema()),
this.requiredPos,
getFullSchemaReader(split.getBasePath().get()));
} else {
throw new HoodieException("Unable to select an Iterator to read the Hoodie MOR File Split for "
+ "file path: " + split.getBasePath()
+ "log paths: " + split.getLogPaths()
+ "hoodie table path: " + split.getTablePath()
+ "spark partition Index: " + split.getSplitNumber()
+ "merge type: " + split.getMergeType());
}
}
@Override
public void configure(Configuration configuration) {
if (this.paths.length == 0) {
// file path was not specified yet. Try to set it from the parameters.
String filePath = configuration.getString(FlinkOptions.PATH, null);
if (filePath == null) {
throw new IllegalArgumentException("File path was not specified in input format or configuration.");
} else {
this.paths = new Path[] {new Path(filePath)};
}
}
// may supports nested files in the future.
}
@Override
public BaseStatistics getStatistics(BaseStatistics baseStatistics) {
// statistics not supported yet.
return null;
}
@Override
public MergeOnReadInputSplit[] createInputSplits(int minNumSplits) {
return this.tableState.getInputSplits().toArray(new MergeOnReadInputSplit[0]);
}
@Override
public InputSplitAssigner getInputSplitAssigner(MergeOnReadInputSplit[] mergeOnReadInputSplits) {
return new DefaultInputSplitAssigner(mergeOnReadInputSplits);
}
@Override
public boolean reachedEnd() throws IOException {
if (limit > 0 && currentReadCount >= limit) {
return true;
} else {
// log file reaches end ?
return this.iterator.reachedEnd();
}
}
@Override
public RowData nextRecord(RowData o) {
currentReadCount++;
return this.iterator.nextRecord();
}
@Override
public void close() throws IOException {
if (this.iterator != null) {
this.iterator.close();
}
this.iterator = null;
}
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
private ParquetColumnarRowSplitReader getFullSchemaReader(String path) throws IOException {
return getReader(path, IntStream.range(0, this.tableState.getRowType().getFieldCount()).toArray());
}
private ParquetColumnarRowSplitReader getRequiredSchemaReader(String path) throws IOException {
return getReader(path, this.requiredPos);
}
private ParquetColumnarRowSplitReader getReader(String path, int[] requiredPos) throws IOException {
// generate partition specs.
LinkedHashMap<String, String> partSpec = FilePathUtils.extractPartitionKeyValues(
new org.apache.hadoop.fs.Path(path).getParent(),
this.conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITION),
this.conf.getString(FlinkOptions.PARTITION_PATH_FIELD).split(","));
LinkedHashMap<String, Object> partObjects = new LinkedHashMap<>();
partSpec.forEach((k, v) -> partObjects.put(k, restorePartValueFromType(
defaultPartName.equals(v) ? null : v,
fieldTypes.get(fieldNames.indexOf(k)))));
return ParquetSplitReaderUtil.genPartColumnarRowReader(
this.conf.getBoolean(FlinkOptions.UTC_TIMEZONE),
true,
FormatUtils.getParquetConf(this.conf, hadoopConf),
fieldNames.toArray(new String[0]),
fieldTypes.toArray(new DataType[0]),
partObjects,
requiredPos,
DEFAULT_SIZE,
new org.apache.flink.core.fs.Path(path),
0,
Long.MAX_VALUE); // read the whole file
}
private Iterator<RowData> getLogFileIterator(MergeOnReadInputSplit split) {
final Schema tableSchema = new Schema.Parser().parse(tableState.getAvroSchema());
final Schema requiredSchema = new Schema.Parser().parse(tableState.getRequiredAvroSchema());
final GenericRecordBuilder recordBuilder = new GenericRecordBuilder(requiredSchema);
final AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter =
AvroToRowDataConverters.createRowConverter(tableState.getRequiredRowType());
final Map<String, HoodieRecord<? extends HoodieRecordPayload>> logRecords =
FormatUtils.scanLog(split, tableSchema, hadoopConf).getRecords();
final Iterator<String> logRecordsKeyIterator = logRecords.keySet().iterator();
final int[] pkOffset = tableState.getPkOffsetsInRequired();
// flag saying whether the pk semantics has been dropped by user specified
// projections. For e.g, if the pk fields are [a, b] but user only select a,
// then the pk semantics is lost.
final boolean pkSemanticLost = Arrays.stream(pkOffset).anyMatch(offset -> offset == -1);
final LogicalType[] pkTypes = pkSemanticLost ? null : tableState.getPkTypes(pkOffset);
final StringToRowDataConverter converter = pkSemanticLost ? null : new StringToRowDataConverter(pkTypes);
return new Iterator<RowData>() {
private RowData currentRecord;
@Override
public boolean hasNext() {
if (logRecordsKeyIterator.hasNext()) {
String curAvrokey = logRecordsKeyIterator.next();
Option<IndexedRecord> curAvroRecord = null;
final HoodieRecord<?> hoodieRecord = logRecords.get(curAvrokey);
try {
curAvroRecord = hoodieRecord.getData().getInsertValue(tableSchema);
} catch (IOException e) {
throw new HoodieException("Get avro insert value error for key: " + curAvrokey, e);
}
if (!curAvroRecord.isPresent()) {
if (emitDelete && !pkSemanticLost) {
GenericRowData delete = new GenericRowData(tableState.getRequiredRowType().getFieldCount());
final String recordKey = hoodieRecord.getRecordKey();
final String[] pkFields = KeyGenUtils.extractRecordKeys(recordKey);
final Object[] converted = converter.convert(pkFields);
for (int i = 0; i < pkOffset.length; i++) {
delete.setField(pkOffset[i], converted[i]);
}
delete.setRowKind(RowKind.DELETE);
this.currentRecord = delete;
return true;
} else {
// delete record found, skipping
return hasNext();
}
} else {
// should improve the code when log scanner supports
// seeking by log blocks with commit time which is more
// efficient.
if (split.getInstantRange().isPresent()) {
// based on the fact that commit time is always the first field
String commitTime = curAvroRecord.get().get(HOODIE_COMMIT_TIME_COL_POS).toString();
if (!split.getInstantRange().get().isInRange(commitTime)) {
// filter out the records that are not in range
return hasNext();
}
}
GenericRecord requiredAvroRecord = buildAvroRecordBySchema(
curAvroRecord.get(),
requiredSchema,
requiredPos,
recordBuilder);
currentRecord = (RowData) avroToRowDataConverter.convert(requiredAvroRecord);
return true;
}
} else {
return false;
}
}
@Override
public RowData next() {
return currentRecord;
}
};
}
// -------------------------------------------------------------------------
// Inner Class
// -------------------------------------------------------------------------
private interface RecordIterator {
boolean reachedEnd() throws IOException;
RowData nextRecord();
void close() throws IOException;
}
static class BaseFileOnlyIterator implements RecordIterator {
// base file reader
private final ParquetColumnarRowSplitReader reader;
BaseFileOnlyIterator(ParquetColumnarRowSplitReader reader) {
this.reader = reader;
}
@Override
public boolean reachedEnd() throws IOException {
return this.reader.reachedEnd();
}
@Override
public RowData nextRecord() {
return this.reader.nextRecord();
}
@Override
public void close() throws IOException {
if (this.reader != null) {
this.reader.close();
}
}
}
static class LogFileOnlyIterator implements RecordIterator {
// iterator for log files
private final Iterator<RowData> iterator;
LogFileOnlyIterator(Iterator<RowData> iterator) {
this.iterator = iterator;
}
@Override
public boolean reachedEnd() {
return !this.iterator.hasNext();
}
@Override
public RowData nextRecord() {
return this.iterator.next();
}
@Override
public void close() {
// no operation
}
}
static class SkipMergeIterator implements RecordIterator {
// base file reader
private final ParquetColumnarRowSplitReader reader;
// iterator for log files
private final Iterator<RowData> iterator;
// add the flag because the flink ParquetColumnarRowSplitReader is buggy:
// method #reachedEnd() returns false after it returns true.
// refactor it out once FLINK-22370 is resolved.
private boolean readLogs = false;
private RowData currentRecord;
SkipMergeIterator(ParquetColumnarRowSplitReader reader, Iterator<RowData> iterator) {
this.reader = reader;
this.iterator = iterator;
}
@Override
public boolean reachedEnd() throws IOException {
if (!readLogs && !this.reader.reachedEnd()) {
currentRecord = this.reader.nextRecord();
return false;
}
readLogs = true;
if (this.iterator.hasNext()) {
currentRecord = this.iterator.next();
return false;
}
return true;
}
@Override
public RowData nextRecord() {
return currentRecord;
}
@Override
public void close() throws IOException {
if (this.reader != null) {
this.reader.close();
}
}
}
static class MergeIterator implements RecordIterator {
// base file reader
private final ParquetColumnarRowSplitReader reader;
// log keys used for merging
private final Iterator<String> logKeysIterator;
// log records
private final Map<String, HoodieRecord<? extends HoodieRecordPayload>> logRecords;
private final Schema tableSchema;
private final Schema requiredSchema;
private final int[] requiredPos;
private final RowDataToAvroConverters.RowDataToAvroConverter rowDataToAvroConverter;
private final AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter;
private final GenericRecordBuilder recordBuilder;
private final RowDataProjection projection;
// add the flag because the flink ParquetColumnarRowSplitReader is buggy:
// method #reachedEnd() returns false after it returns true.
// refactor it out once FLINK-22370 is resolved.
private boolean readLogs = false;
private Set<String> keyToSkip = new HashSet<>();
private RowData currentRecord;
MergeIterator(
org.apache.hadoop.conf.Configuration hadoopConf,
MergeOnReadInputSplit split,
RowType tableRowType,
RowType requiredRowType,
Schema tableSchema,
Schema requiredSchema,
int[] requiredPos,
ParquetColumnarRowSplitReader reader) { // the reader should be with full schema
this.tableSchema = tableSchema;
this.reader = reader;
this.logRecords = FormatUtils.scanLog(split, tableSchema, hadoopConf).getRecords();
this.logKeysIterator = this.logRecords.keySet().iterator();
this.requiredSchema = requiredSchema;
this.requiredPos = requiredPos;
this.recordBuilder = new GenericRecordBuilder(requiredSchema);
this.rowDataToAvroConverter = RowDataToAvroConverters.createConverter(tableRowType);
this.avroToRowDataConverter = AvroToRowDataConverters.createRowConverter(requiredRowType);
this.projection = RowDataProjection.instance(requiredRowType, requiredPos);
}
@Override
public boolean reachedEnd() throws IOException {
if (!readLogs && !this.reader.reachedEnd()) {
currentRecord = this.reader.nextRecord();
final String curKey = currentRecord.getString(HOODIE_RECORD_KEY_COL_POS).toString();
if (logRecords.containsKey(curKey)) {
keyToSkip.add(curKey);
Option<IndexedRecord> mergedAvroRecord = mergeRowWithLog(currentRecord, curKey);
if (!mergedAvroRecord.isPresent()) {
// deleted
return reachedEnd();
} else {
GenericRecord record = buildAvroRecordBySchema(
mergedAvroRecord.get(),
requiredSchema,
requiredPos,
recordBuilder);
this.currentRecord = (RowData) avroToRowDataConverter.convert(record);
return false;
}
}
// project the full record in base with required positions
currentRecord = projection.project(currentRecord);
return false;
} else {
readLogs = true;
while (logKeysIterator.hasNext()) {
final String curKey = logKeysIterator.next();
if (!keyToSkip.contains(curKey)) {
Option<IndexedRecord> insertAvroRecord =
logRecords.get(curKey).getData().getInsertValue(tableSchema);
if (insertAvroRecord.isPresent()) {
// the record is a DELETE if insertAvroRecord not present, skipping
GenericRecord requiredAvroRecord = buildAvroRecordBySchema(
insertAvroRecord.get(),
requiredSchema,
requiredPos,
recordBuilder);
this.currentRecord = (RowData) avroToRowDataConverter.convert(requiredAvroRecord);
return false;
}
}
}
return true;
}
}
@Override
public RowData nextRecord() {
return currentRecord;
}
@Override
public void close() throws IOException {
if (this.reader != null) {
this.reader.close();
}
}
private Option<IndexedRecord> mergeRowWithLog(
RowData curRow,
String curKey) throws IOException {
GenericRecord historyAvroRecord = (GenericRecord) rowDataToAvroConverter.convert(tableSchema, curRow);
return logRecords.get(curKey).getData().combineAndGetUpdateValue(historyAvroRecord, tableSchema);
}
}
/**
* Builder for {@link MergeOnReadInputFormat}.
*/
public static class Builder {
private Configuration conf;
private Path[] paths;
private MergeOnReadTableState tableState;
private List<DataType> fieldTypes;
private String defaultPartName;
private long limit = -1;
private boolean emitDelete = false;
public Builder config(Configuration conf) {
this.conf = conf;
return this;
}
public Builder paths(Path[] paths) {
this.paths = paths;
return this;
}
public Builder tableState(MergeOnReadTableState tableState) {
this.tableState = tableState;
return this;
}
public Builder fieldTypes(List<DataType> fieldTypes) {
this.fieldTypes = fieldTypes;
return this;
}
public Builder defaultPartName(String defaultPartName) {
this.defaultPartName = defaultPartName;
return this;
}
public Builder limit(long limit) {
this.limit = limit;
return this;
}
public Builder emitDelete(boolean emitDelete) {
this.emitDelete = emitDelete;
return this;
}
public MergeOnReadInputFormat build() {
return new MergeOnReadInputFormat(conf, paths, tableState,
fieldTypes, defaultPartName, limit, emitDelete);
}
}
@VisibleForTesting
public void isEmitDelete(boolean emitDelete) {
this.emitDelete = emitDelete;
}
}