blob: 46c1c23dc07ec8d330fe02276365d6253d889946 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.mr.mapreduce;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.llap.LlapHiveUtils;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataTableScan;
import org.apache.iceberg.DataTask;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.IncrementalAppendScan;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Partitioning;
import org.apache.iceberg.Scan;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.common.DynMethods;
import org.apache.iceberg.data.DeleteFilter;
import org.apache.iceberg.data.GenericDeleteFilter;
import org.apache.iceberg.data.IdentityPartitionConverters;
import org.apache.iceberg.data.InternalRecordWrapper;
import org.apache.iceberg.data.avro.DataReader;
import org.apache.iceberg.data.orc.GenericOrcReader;
import org.apache.iceberg.data.parquet.GenericParquetReaders;
import org.apache.iceberg.encryption.EncryptedFiles;
import org.apache.iceberg.expressions.Evaluator;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.hive.MetastoreUtil;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.mapping.NameMappingParser;
import org.apache.iceberg.mr.Catalogs;
import org.apache.iceberg.mr.InputFormatConfig;
import org.apache.iceberg.mr.hive.HiveIcebergInputFormat;
import org.apache.iceberg.mr.hive.HiveIcebergStorageHandler;
import org.apache.iceberg.mr.hive.IcebergAcidUtil;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.PartitionUtil;
import org.apache.iceberg.util.SerializationUtil;
/**
* Generic Mrv2 InputFormat API for Iceberg.
*
* @param <T> T is the in memory data model which can either be Pig tuples, Hive rows. Default is Iceberg records
*/
public class IcebergInputFormat<T> extends InputFormat<Void, T> {
/**
* Configures the {@code Job} to use the {@code IcebergInputFormat} and
* returns a helper to add further configuration.
*
* @param job the {@code Job} to configure
*/
public static InputFormatConfig.ConfigBuilder configure(Job job) {
job.setInputFormatClass(IcebergInputFormat.class);
return new InputFormatConfig.ConfigBuilder(job.getConfiguration());
}
private static TableScan createTableScan(Table table, Configuration conf) {
TableScan scan = table.newScan();
long snapshotId = -1;
try {
snapshotId = conf.getLong(InputFormatConfig.SNAPSHOT_ID, -1);
} catch (NumberFormatException e) {
String version = conf.get(InputFormatConfig.SNAPSHOT_ID);
SnapshotRef ref = table.refs().get(version);
if (ref == null) {
throw new RuntimeException("Cannot find matching snapshot ID or reference name for version " + version);
}
snapshotId = ref.snapshotId();
}
String branchName = conf.get(InputFormatConfig.OUTPUT_TABLE_BRANCH);
if (StringUtils.isNotEmpty(branchName)) {
scan = scan.useRef(HiveUtils.getTableBranch(branchName));
}
if (snapshotId != -1) {
scan = scan.useSnapshot(snapshotId);
}
long asOfTime = conf.getLong(InputFormatConfig.AS_OF_TIMESTAMP, -1);
if (asOfTime != -1) {
scan = scan.asOfTime(asOfTime);
}
return scan;
}
private static IncrementalAppendScan createIncrementalAppendScan(Table table, Configuration conf) {
long fromSnapshot = conf.getLong(InputFormatConfig.SNAPSHOT_ID_INTERVAL_FROM, -1);
return table.newIncrementalAppendScan().fromSnapshotExclusive(fromSnapshot);
}
private static <
T extends Scan<T, FileScanTask, CombinedScanTask>> Scan<T,
FileScanTask,
CombinedScanTask> applyConfig(
Configuration conf, Scan<T, FileScanTask, CombinedScanTask> scanToConfigure) {
Scan<T, FileScanTask, CombinedScanTask> scan = scanToConfigure.caseSensitive(
conf.getBoolean(InputFormatConfig.CASE_SENSITIVE, InputFormatConfig.CASE_SENSITIVE_DEFAULT));
long splitSize = conf.getLong(InputFormatConfig.SPLIT_SIZE, 0);
if (splitSize > 0) {
scan = scan.option(TableProperties.SPLIT_SIZE, String.valueOf(splitSize));
}
// In case of LLAP-based execution we ask Iceberg not to combine multiple fileScanTasks into one split.
// This is so that cache affinity can work, and each file(split) is executed/cached on always the same LLAP daemon.
MapWork mapWork = LlapHiveUtils.findMapWork((JobConf) conf);
if (mapWork != null && mapWork.getCacheAffinity()) {
// Iceberg splits logically consist of buckets, where the bucket size equals to openFileCost setting if the files
// assigned to such bucket are smaller. This is how Iceberg would combine multiple files into one split, so here
// we need to enforce the bucket size to be equal to split size to avoid file combination.
Long openFileCost = splitSize > 0 ? splitSize : TableProperties.SPLIT_SIZE_DEFAULT;
scan = scan.option(TableProperties.SPLIT_OPEN_FILE_COST, String.valueOf(openFileCost));
}
String schemaStr = conf.get(InputFormatConfig.READ_SCHEMA);
if (schemaStr != null) {
scan.project(SchemaParser.fromJson(schemaStr));
}
String[] selectedColumns = conf.getStrings(InputFormatConfig.SELECTED_COLUMNS);
if (selectedColumns != null) {
scan.select(selectedColumns);
}
// TODO add a filter parser to get rid of Serialization
Expression filter = SerializationUtil.deserializeFromBase64(conf.get(InputFormatConfig.FILTER_EXPRESSION));
if (filter != null) {
// In order to prevent the filter expression to be attached to every file scan task generated we call
// ignoreResiduals() here. The passed in filter will still be effective during split generation.
// On the execution side residual expressions will be mined from the passed job conf.
scan = scan.filter(filter).ignoreResiduals();
}
return scan;
}
@Override
public List<InputSplit> getSplits(JobContext context) {
Configuration conf = context.getConfiguration();
Table table = Optional
.ofNullable(HiveIcebergStorageHandler.table(conf, conf.get(InputFormatConfig.TABLE_IDENTIFIER)))
.orElseGet(() -> Catalogs.loadTable(conf));
List<InputSplit> splits = Lists.newArrayList();
boolean applyResidual = !conf.getBoolean(InputFormatConfig.SKIP_RESIDUAL_FILTERING, false);
InputFormatConfig.InMemoryDataModel model = conf.getEnum(InputFormatConfig.IN_MEMORY_DATA_MODEL,
InputFormatConfig.InMemoryDataModel.GENERIC);
long fromVersion = conf.getLong(InputFormatConfig.SNAPSHOT_ID_INTERVAL_FROM, -1);
Scan<?, FileScanTask, CombinedScanTask> scan;
if (fromVersion != -1) {
scan = applyConfig(conf, createIncrementalAppendScan(table, conf));
} else {
scan = applyConfig(conf, createTableScan(table, conf));
}
try (CloseableIterable<CombinedScanTask> tasksIterable = scan.planTasks()) {
Table serializableTable = SerializableTable.copyOf(table);
tasksIterable.forEach(task -> {
if (applyResidual && (model == InputFormatConfig.InMemoryDataModel.HIVE ||
model == InputFormatConfig.InMemoryDataModel.PIG)) {
// TODO: We do not support residual evaluation for HIVE and PIG in memory data model yet
checkResiduals(task);
}
splits.add(new IcebergSplit(serializableTable, conf, task));
});
} catch (IOException e) {
throw new UncheckedIOException(String.format("Failed to close table scan: %s", scan), e);
}
// If enabled, do not serialize FileIO hadoop config to decrease split size
// However, do not skip serialization for metatable queries, because some metadata tasks cache the IO object and we
// wouldn't be able to inject the config into these tasks on the deserializer-side, unlike for standard queries
if (scan instanceof DataTableScan) {
HiveIcebergStorageHandler.checkAndSkipIoConfigSerialization(conf, table);
}
return splits;
}
private static void checkResiduals(CombinedScanTask task) {
task.files().forEach(fileScanTask -> {
Expression residual = fileScanTask.residual();
if (residual != null && !residual.equals(Expressions.alwaysTrue())) {
throw new UnsupportedOperationException(
String.format(
"Filter expression %s is not completely satisfied. Additional rows " +
"can be returned not satisfied by the filter expression", residual));
}
});
}
@Override
public RecordReader<Void, T> createRecordReader(InputSplit split, TaskAttemptContext context) {
return new IcebergRecordReader<>();
}
private static final class IcebergRecordReader<T> extends RecordReader<Void, T> {
private static final String HIVE_VECTORIZED_READER_CLASS = "org.apache.iceberg.mr.hive.vector.HiveVectorizedReader";
private static final DynMethods.StaticMethod HIVE_VECTORIZED_READER_BUILDER;
static {
if (MetastoreUtil.hive3PresentOnClasspath()) {
HIVE_VECTORIZED_READER_BUILDER = DynMethods.builder("reader")
.impl(HIVE_VECTORIZED_READER_CLASS,
Table.class,
Path.class,
FileScanTask.class,
Map.class,
TaskAttemptContext.class,
Expression.class,
Schema.class)
.buildStatic();
} else {
HIVE_VECTORIZED_READER_BUILDER = null;
}
}
private TaskAttemptContext context;
private Configuration conf;
private Schema expectedSchema;
private String nameMapping;
private boolean reuseContainers;
private boolean caseSensitive;
private InputFormatConfig.InMemoryDataModel inMemoryDataModel;
private Iterator<FileScanTask> tasks;
private T current;
private CloseableIterator<T> currentIterator;
private Table table;
private boolean fetchVirtualColumns;
@Override
public void initialize(InputSplit split, TaskAttemptContext newContext) {
// For now IcebergInputFormat does its own split planning and does not accept FileSplit instances
CombinedScanTask task = ((IcebergSplit) split).task();
this.context = newContext;
this.conf = newContext.getConfiguration();
this.table = ((IcebergSplit) split).table();
HiveIcebergStorageHandler.checkAndSetIoConfig(conf, table);
this.tasks = task.files().iterator();
this.nameMapping = table.properties().get(TableProperties.DEFAULT_NAME_MAPPING);
this.caseSensitive = conf.getBoolean(InputFormatConfig.CASE_SENSITIVE, InputFormatConfig.CASE_SENSITIVE_DEFAULT);
this.expectedSchema = readSchema(conf, table, caseSensitive);
this.reuseContainers = conf.getBoolean(InputFormatConfig.REUSE_CONTAINERS, false);
this.inMemoryDataModel = conf.getEnum(InputFormatConfig.IN_MEMORY_DATA_MODEL,
InputFormatConfig.InMemoryDataModel.GENERIC);
this.fetchVirtualColumns = InputFormatConfig.fetchVirtualColumns(conf);
this.currentIterator = nextTask();
}
private CloseableIterator<T> nextTask() {
CloseableIterator<T> closeableIterator = open(tasks.next(), expectedSchema).iterator();
if (!fetchVirtualColumns || Utilities.getIsVectorized(conf)) {
return closeableIterator;
}
return new IcebergAcidUtil.VirtualColumnAwareIterator<T>(closeableIterator, expectedSchema, conf);
}
@Override
public boolean nextKeyValue() throws IOException {
while (true) {
if (currentIterator.hasNext()) {
current = currentIterator.next();
return true;
} else if (tasks.hasNext()) {
currentIterator.close();
this.currentIterator = nextTask();
} else {
currentIterator.close();
return false;
}
}
}
@Override
public Void getCurrentKey() {
return null;
}
@Override
public T getCurrentValue() {
return current;
}
@Override
public float getProgress() {
// TODO: We could give a more accurate progress based on records read from the file. Context.getProgress does not
// have enough information to give an accurate progress value. This isn't that easy, since we don't know how much
// of the input split has been processed and we are pushing filters into Parquet and ORC. But we do know when a
// file is opened and could count the number of rows returned, so we can estimate. And we could also add a row
// count to the readers so that we can get an accurate count of rows that have been either returned or filtered
// out.
return context.getProgress();
}
@Override
public void close() throws IOException {
currentIterator.close();
}
private CloseableIterable<T> openVectorized(FileScanTask task, Schema readSchema) {
Preconditions.checkArgument(!task.file().format().equals(FileFormat.AVRO),
"Vectorized execution is not yet supported for Iceberg avro tables. " +
"Please turn off vectorization and retry the query.");
Preconditions.checkArgument(MetastoreUtil.hive3PresentOnClasspath(),
"Vectorized read is unsupported for Hive 2 integration.");
Path path = new Path(task.file().path().toString());
Map<Integer, ?> idToConstant = constantsMap(task, IdentityPartitionConverters::convertConstant);
Expression residual = HiveIcebergInputFormat.residualForTask(task, context.getConfiguration());
// TODO: We have to take care of the EncryptionManager when LLAP and vectorization is used
CloseableIterable<T> iterator = HIVE_VECTORIZED_READER_BUILDER.invoke(table, path, task,
idToConstant, context, residual, readSchema);
return applyResidualFiltering(iterator, residual, readSchema);
}
private CloseableIterable<T> openGeneric(FileScanTask task, Schema readSchema) {
if (task.isDataTask()) {
// When querying metadata tables, the currentTask is a DataTask and the data has to
// be fetched from the task instead of reading it from files.
IcebergInternalRecordWrapper wrapper =
new IcebergInternalRecordWrapper(table.schema().asStruct(), readSchema.asStruct());
return (CloseableIterable) CloseableIterable.transform(((DataTask) task).rows(), row -> wrapper.wrap(row));
}
DataFile file = task.file();
InputFile inputFile = table.encryption().decrypt(EncryptedFiles.encryptedInput(
table.io().newInputFile(file.path().toString()),
file.keyMetadata()));
CloseableIterable<T> iterable;
switch (file.format()) {
case AVRO:
iterable = newAvroIterable(inputFile, task, readSchema);
break;
case ORC:
iterable = newOrcIterable(inputFile, task, readSchema);
break;
case PARQUET:
iterable = newParquetIterable(inputFile, task, readSchema);
break;
default:
throw new UnsupportedOperationException(
String.format("Cannot read %s file: %s", file.format().name(), file.path()));
}
return iterable;
}
@SuppressWarnings("unchecked")
private CloseableIterable<T> open(FileScanTask currentTask, Schema readSchema) {
switch (inMemoryDataModel) {
case PIG:
// TODO: Support Pig and Hive object models for IcebergInputFormat
throw new UnsupportedOperationException("Pig and Hive object models are not supported.");
case HIVE:
return openVectorized(currentTask, readSchema);
case GENERIC:
DeleteFilter deletes = new GenericDeleteFilter(table.io(), currentTask, table.schema(), readSchema);
Schema requiredSchema = deletes.requiredSchema();
return deletes.filter(openGeneric(currentTask, requiredSchema));
default:
throw new UnsupportedOperationException("Unsupported memory model");
}
}
private CloseableIterable<T> applyResidualFiltering(CloseableIterable<T> iter, Expression residual,
Schema readSchema) {
boolean applyResidual = !context.getConfiguration().getBoolean(InputFormatConfig.SKIP_RESIDUAL_FILTERING, false);
if (applyResidual && residual != null && residual != Expressions.alwaysTrue()) {
// Date and timestamp values are not the correct type for Evaluator.
// Wrapping to return the expected type.
InternalRecordWrapper wrapper = new InternalRecordWrapper(readSchema.asStruct());
Evaluator filter = new Evaluator(readSchema.asStruct(), residual, caseSensitive);
return CloseableIterable.filter(iter, record -> filter.eval(wrapper.wrap((StructLike) record)));
} else {
return iter;
}
}
private CloseableIterable<T> newAvroIterable(
InputFile inputFile, FileScanTask task, Schema readSchema) {
Expression residual = HiveIcebergInputFormat.residualForTask(task, context.getConfiguration());
Avro.ReadBuilder avroReadBuilder = Avro.read(inputFile)
.project(readSchema)
.split(task.start(), task.length());
if (reuseContainers) {
avroReadBuilder.reuseContainers();
}
if (nameMapping != null) {
avroReadBuilder.withNameMapping(NameMappingParser.fromJson(nameMapping));
}
avroReadBuilder.createReaderFunc(
(expIcebergSchema, expAvroSchema) ->
DataReader.create(expIcebergSchema, expAvroSchema,
constantsMap(task, IdentityPartitionConverters::convertConstant)));
return applyResidualFiltering(avroReadBuilder.build(), residual, readSchema);
}
private CloseableIterable<T> newParquetIterable(InputFile inputFile, FileScanTask task, Schema readSchema) {
Expression residual = HiveIcebergInputFormat.residualForTask(task, context.getConfiguration());
Parquet.ReadBuilder parquetReadBuilder = Parquet.read(inputFile)
.project(readSchema)
.filter(residual)
.caseSensitive(caseSensitive)
.split(task.start(), task.length());
if (reuseContainers) {
parquetReadBuilder.reuseContainers();
}
if (nameMapping != null) {
parquetReadBuilder.withNameMapping(NameMappingParser.fromJson(nameMapping));
}
parquetReadBuilder.createReaderFunc(
fileSchema -> GenericParquetReaders.buildReader(
readSchema, fileSchema, constantsMap(task, IdentityPartitionConverters::convertConstant)));
return applyResidualFiltering(parquetReadBuilder.build(), residual, readSchema);
}
private CloseableIterable<T> newOrcIterable(InputFile inputFile, FileScanTask task, Schema readSchema) {
Map<Integer, ?> idToConstant = constantsMap(task, IdentityPartitionConverters::convertConstant);
Schema readSchemaWithoutConstantAndMetadataFields = schemaWithoutConstantsAndMeta(readSchema, idToConstant);
Expression residual = HiveIcebergInputFormat.residualForTask(task, context.getConfiguration());
ORC.ReadBuilder orcReadBuilder = ORC.read(inputFile)
.project(readSchemaWithoutConstantAndMetadataFields)
.filter(residual)
.caseSensitive(caseSensitive)
.split(task.start(), task.length());
if (nameMapping != null) {
orcReadBuilder.withNameMapping(NameMappingParser.fromJson(nameMapping));
}
orcReadBuilder.createReaderFunc(
fileSchema -> GenericOrcReader.buildReader(
readSchema, fileSchema, idToConstant));
return applyResidualFiltering(orcReadBuilder.build(), residual, readSchema);
}
private Map<Integer, ?> constantsMap(FileScanTask task, BiFunction<Type, Object, Object> converter) {
PartitionSpec spec = task.spec();
Set<Integer> idColumns = spec.identitySourceIds();
Schema partitionSchema = TypeUtil.select(expectedSchema, idColumns);
boolean projectsIdentityPartitionColumns = !partitionSchema.columns().isEmpty();
if (expectedSchema.findField(MetadataColumns.PARTITION_COLUMN_ID) != null) {
Types.StructType partitionType = Partitioning.partitionType(table);
return PartitionUtil.constantsMap(task, partitionType, converter);
} else if (projectsIdentityPartitionColumns) {
return PartitionUtil.constantsMap(task, converter);
} else {
return Collections.emptyMap();
}
}
private static Schema readSchema(Configuration conf, Table table, boolean caseSensitive) {
Schema readSchema = InputFormatConfig.readSchema(conf);
if (readSchema != null) {
return readSchema;
}
String[] selectedColumns = InputFormatConfig.selectedColumns(conf);
if (selectedColumns == null) {
return table.schema();
}
readSchema = caseSensitive ? table.schema().select(selectedColumns) :
table.schema().caseInsensitiveSelect(selectedColumns);
if (InputFormatConfig.fetchVirtualColumns(conf)) {
return IcebergAcidUtil.createFileReadSchemaWithVirtualColums(readSchema.columns(), table);
}
return readSchema;
}
private static Schema schemaWithoutConstantsAndMeta(Schema readSchema, Map<Integer, ?> idToConstant) {
// remove the nested fields of the partition struct
Set<Integer> partitionFields = Optional.ofNullable(readSchema.findField(MetadataColumns.PARTITION_COLUMN_ID))
.map(Types.NestedField::type)
.map(Type::asStructType)
.map(Types.StructType::fields)
.map(fields -> fields.stream().map(Types.NestedField::fieldId).collect(Collectors.toSet()))
.orElseGet(Collections::emptySet);
// remove constants and meta columns too
Set<Integer> collect = Stream.of(idToConstant.keySet(), MetadataColumns.metadataFieldIds(), partitionFields)
.flatMap(Set::stream)
.collect(Collectors.toSet());
return TypeUtil.selectNot(readSchema, collect);
}
}
}