blob: aadddbf4b34be522febd93a2998b8ba745845227 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.parquet;
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Files;
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.data.parquet.GenericParquetWriter;
import org.apache.iceberg.deletes.EqualityDeleteWriter;
import org.apache.iceberg.deletes.PositionDeleteWriter;
import org.apache.iceberg.encryption.EncryptionKeyMetadata;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.hadoop.HadoopInputFile;
import org.apache.iceberg.hadoop.HadoopOutputFile;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.DataWriter;
import org.apache.iceberg.io.DeleteSchemaUtil;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.parquet.ParquetValueWriters.PositionDeleteStructWriter;
import org.apache.iceberg.parquet.ParquetValueWriters.StructWriter;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.ArrayUtil;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.parquet.HadoopReadOptions;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.avro.AvroWriteSupport;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.ParquetProperties.WriterVersion;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.api.ReadSupport;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;
import static org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION;
import static org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION_LEVEL;
import static org.apache.iceberg.TableProperties.DELETE_PARQUET_DICT_SIZE_BYTES;
import static org.apache.iceberg.TableProperties.DELETE_PARQUET_PAGE_SIZE_BYTES;
import static org.apache.iceberg.TableProperties.DELETE_PARQUET_ROW_GROUP_SIZE_BYTES;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_DEFAULT;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL_DEFAULT;
import static org.apache.iceberg.TableProperties.PARQUET_DICT_SIZE_BYTES;
import static org.apache.iceberg.TableProperties.PARQUET_DICT_SIZE_BYTES_DEFAULT;
import static org.apache.iceberg.TableProperties.PARQUET_PAGE_SIZE_BYTES;
import static org.apache.iceberg.TableProperties.PARQUET_PAGE_SIZE_BYTES_DEFAULT;
import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES;
import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT;
public class Parquet {
private Parquet() {
}
private static final Collection<String> READ_PROPERTIES_TO_REMOVE = Sets.newHashSet(
"parquet.read.filter", "parquet.private.read.filter.predicate", "parquet.read.support.class");
public static WriteBuilder write(OutputFile file) {
return new WriteBuilder(file);
}
public static class WriteBuilder {
private final OutputFile file;
private final Map<String, String> metadata = Maps.newLinkedHashMap();
private final Map<String, String> config = Maps.newLinkedHashMap();
private Schema schema = null;
private String name = "table";
private WriteSupport<?> writeSupport = null;
private Function<MessageType, ParquetValueWriter<?>> createWriterFunc = null;
private MetricsConfig metricsConfig = MetricsConfig.getDefault();
private ParquetFileWriter.Mode writeMode = ParquetFileWriter.Mode.CREATE;
private WriterVersion writerVersion = WriterVersion.PARQUET_1_0;
private Function<Map<String, String>, Context> createContextFunc = Context::dataContext;
private WriteBuilder(OutputFile file) {
this.file = file;
}
public WriteBuilder forTable(Table table) {
schema(table.schema());
setAll(table.properties());
metricsConfig(MetricsConfig.fromProperties(table.properties()));
return this;
}
public WriteBuilder schema(Schema newSchema) {
this.schema = newSchema;
return this;
}
public WriteBuilder named(String newName) {
this.name = newName;
return this;
}
public WriteBuilder writeSupport(WriteSupport<?> newWriteSupport) {
this.writeSupport = newWriteSupport;
return this;
}
public WriteBuilder set(String property, String value) {
config.put(property, value);
return this;
}
public WriteBuilder setAll(Map<String, String> properties) {
config.putAll(properties);
return this;
}
public WriteBuilder meta(String property, String value) {
metadata.put(property, value);
return this;
}
public WriteBuilder createWriterFunc(Function<MessageType, ParquetValueWriter<?>> newCreateWriterFunc) {
this.createWriterFunc = newCreateWriterFunc;
return this;
}
public WriteBuilder metricsConfig(MetricsConfig newMetricsConfig) {
this.metricsConfig = newMetricsConfig;
return this;
}
public WriteBuilder overwrite() {
return overwrite(true);
}
public WriteBuilder overwrite(boolean enabled) {
this.writeMode = enabled ? ParquetFileWriter.Mode.OVERWRITE : ParquetFileWriter.Mode.CREATE;
return this;
}
public WriteBuilder writerVersion(WriterVersion version) {
this.writerVersion = version;
return this;
}
@SuppressWarnings("unchecked")
private <T> WriteSupport<T> getWriteSupport(MessageType type) {
if (writeSupport != null) {
return (WriteSupport<T>) writeSupport;
} else {
return new AvroWriteSupport<>(
type,
ParquetAvro.parquetAvroSchema(AvroSchemaUtil.convert(schema, name)),
ParquetAvro.DEFAULT_MODEL);
}
}
/*
* Sets the writer version. Default value is PARQUET_1_0 (v1).
*/
@VisibleForTesting
WriteBuilder withWriterVersion(WriterVersion version) {
this.writerVersion = version;
return this;
}
// supposed to always be a private method used strictly by data and delete write builders
private WriteBuilder createContextFunc(Function<Map<String, String>, Context> newCreateContextFunc) {
this.createContextFunc = newCreateContextFunc;
return this;
}
public <D> FileAppender<D> build() throws IOException {
Preconditions.checkNotNull(schema, "Schema is required");
Preconditions.checkNotNull(name, "Table name is required and cannot be null");
// add the Iceberg schema to keyValueMetadata
meta("iceberg.schema", SchemaParser.toJson(schema));
// Map Iceberg properties to pass down to the Parquet writer
Context context = createContextFunc.apply(config);
int rowGroupSize = context.rowGroupSize();
int pageSize = context.pageSize();
int dictionaryPageSize = context.dictionaryPageSize();
String compressionLevel = context.compressionLevel();
CompressionCodecName codec = context.codec();
if (compressionLevel != null) {
switch (codec) {
case GZIP:
config.put("zlib.compress.level", compressionLevel);
break;
case BROTLI:
config.put("compression.brotli.quality", compressionLevel);
break;
case ZSTD:
config.put("io.compression.codec.zstd.level", compressionLevel);
break;
default:
// compression level is not supported; ignore it
}
}
set("parquet.avro.write-old-list-structure", "false");
MessageType type = ParquetSchemaUtil.convert(schema, name);
if (createWriterFunc != null) {
Preconditions.checkArgument(writeSupport == null,
"Cannot write with both write support and Parquet value writer");
Configuration conf;
if (file instanceof HadoopOutputFile) {
conf = ((HadoopOutputFile) file).getConf();
} else {
conf = new Configuration();
}
for (Map.Entry<String, String> entry : config.entrySet()) {
conf.set(entry.getKey(), entry.getValue());
}
ParquetProperties parquetProperties = ParquetProperties.builder()
.withWriterVersion(writerVersion)
.withPageSize(pageSize)
.withDictionaryPageSize(dictionaryPageSize)
.build();
return new org.apache.iceberg.parquet.ParquetWriter<>(
conf, file, schema, rowGroupSize, metadata, createWriterFunc, codec,
parquetProperties, metricsConfig, writeMode);
} else {
return new ParquetWriteAdapter<>(new ParquetWriteBuilder<D>(ParquetIO.file(file))
.withWriterVersion(writerVersion)
.setType(type)
.setConfig(config)
.setKeyValueMetadata(metadata)
.setWriteSupport(getWriteSupport(type))
.withCompressionCodec(codec)
.withWriteMode(writeMode)
.withRowGroupSize(rowGroupSize)
.withPageSize(pageSize)
.withDictionaryPageSize(dictionaryPageSize)
.build(),
metricsConfig);
}
}
private static class Context {
private final int rowGroupSize;
private final int pageSize;
private final int dictionaryPageSize;
private final CompressionCodecName codec;
private final String compressionLevel;
private Context(int rowGroupSize, int pageSize, int dictionaryPageSize,
CompressionCodecName codec, String compressionLevel) {
this.rowGroupSize = rowGroupSize;
this.pageSize = pageSize;
this.dictionaryPageSize = dictionaryPageSize;
this.codec = codec;
this.compressionLevel = compressionLevel;
}
static Context dataContext(Map<String, String> config) {
int rowGroupSize = Integer.parseInt(config.getOrDefault(
PARQUET_ROW_GROUP_SIZE_BYTES, PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT));
int pageSize = Integer.parseInt(config.getOrDefault(
PARQUET_PAGE_SIZE_BYTES, PARQUET_PAGE_SIZE_BYTES_DEFAULT));
int dictionaryPageSize = Integer.parseInt(config.getOrDefault(
PARQUET_DICT_SIZE_BYTES, PARQUET_DICT_SIZE_BYTES_DEFAULT));
String codecAsString = config.getOrDefault(PARQUET_COMPRESSION, PARQUET_COMPRESSION_DEFAULT);
CompressionCodecName codec = toCodec(codecAsString);
String compressionLevel = config.getOrDefault(PARQUET_COMPRESSION_LEVEL, PARQUET_COMPRESSION_LEVEL_DEFAULT);
return new Context(rowGroupSize, pageSize, dictionaryPageSize, codec, compressionLevel);
}
static Context deleteContext(Map<String, String> config) {
// default delete config using data config
Context dataContext = dataContext(config);
int rowGroupSize = PropertyUtil.propertyAsInt(config,
DELETE_PARQUET_ROW_GROUP_SIZE_BYTES, dataContext.rowGroupSize());
int pageSize = PropertyUtil.propertyAsInt(config,
DELETE_PARQUET_PAGE_SIZE_BYTES, dataContext.pageSize());
int dictionaryPageSize = PropertyUtil.propertyAsInt(config,
DELETE_PARQUET_DICT_SIZE_BYTES, dataContext.dictionaryPageSize());
String codecAsString = config.get(DELETE_PARQUET_COMPRESSION);
CompressionCodecName codec = codecAsString != null ? toCodec(codecAsString) : dataContext.codec();
String compressionLevel = config.getOrDefault(DELETE_PARQUET_COMPRESSION_LEVEL, dataContext.compressionLevel());
return new Context(rowGroupSize, pageSize, dictionaryPageSize, codec, compressionLevel);
}
private static CompressionCodecName toCodec(String codecAsString) {
try {
return CompressionCodecName.valueOf(codecAsString.toUpperCase(Locale.ENGLISH));
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Unsupported compression codec: " + codecAsString);
}
}
int rowGroupSize() {
return rowGroupSize;
}
int pageSize() {
return pageSize;
}
int dictionaryPageSize() {
return dictionaryPageSize;
}
CompressionCodecName codec() {
return codec;
}
String compressionLevel() {
return compressionLevel;
}
}
}
public static DataWriteBuilder writeData(OutputFile file) {
return new DataWriteBuilder(file);
}
public static class DataWriteBuilder {
private final WriteBuilder appenderBuilder;
private final String location;
private PartitionSpec spec = null;
private StructLike partition = null;
private EncryptionKeyMetadata keyMetadata = null;
private SortOrder sortOrder = null;
private DataWriteBuilder(OutputFile file) {
this.appenderBuilder = write(file);
this.location = file.location();
}
public DataWriteBuilder forTable(Table table) {
schema(table.schema());
withSpec(table.spec());
setAll(table.properties());
metricsConfig(MetricsConfig.fromProperties(table.properties()));
return this;
}
public DataWriteBuilder schema(Schema newSchema) {
appenderBuilder.schema(newSchema);
return this;
}
public DataWriteBuilder set(String property, String value) {
appenderBuilder.set(property, value);
return this;
}
public DataWriteBuilder setAll(Map<String, String> properties) {
appenderBuilder.setAll(properties);
return this;
}
public DataWriteBuilder meta(String property, String value) {
appenderBuilder.meta(property, value);
return this;
}
public DataWriteBuilder overwrite() {
return overwrite(true);
}
public DataWriteBuilder overwrite(boolean enabled) {
appenderBuilder.overwrite(enabled);
return this;
}
public DataWriteBuilder metricsConfig(MetricsConfig newMetricsConfig) {
appenderBuilder.metricsConfig(newMetricsConfig);
return this;
}
public DataWriteBuilder createWriterFunc(Function<MessageType, ParquetValueWriter<?>> newCreateWriterFunc) {
appenderBuilder.createWriterFunc(newCreateWriterFunc);
return this;
}
public DataWriteBuilder withSpec(PartitionSpec newSpec) {
this.spec = newSpec;
return this;
}
public DataWriteBuilder withPartition(StructLike newPartition) {
this.partition = newPartition;
return this;
}
public DataWriteBuilder withKeyMetadata(EncryptionKeyMetadata metadata) {
this.keyMetadata = metadata;
return this;
}
public DataWriteBuilder withSortOrder(SortOrder newSortOrder) {
this.sortOrder = newSortOrder;
return this;
}
public <T> DataWriter<T> build() throws IOException {
Preconditions.checkArgument(spec != null, "Cannot create data writer without spec");
Preconditions.checkArgument(spec.isUnpartitioned() || partition != null,
"Partition must not be null when creating data writer for partitioned spec");
FileAppender<T> fileAppender = appenderBuilder.build();
return new DataWriter<>(fileAppender, FileFormat.PARQUET, location, spec, partition, keyMetadata, sortOrder);
}
}
public static DeleteWriteBuilder writeDeletes(OutputFile file) {
return new DeleteWriteBuilder(file);
}
public static class DeleteWriteBuilder {
private final WriteBuilder appenderBuilder;
private final String location;
private Function<MessageType, ParquetValueWriter<?>> createWriterFunc = null;
private Schema rowSchema = null;
private PartitionSpec spec = null;
private StructLike partition = null;
private EncryptionKeyMetadata keyMetadata = null;
private int[] equalityFieldIds = null;
private SortOrder sortOrder;
private Function<CharSequence, ?> pathTransformFunc = Function.identity();
private DeleteWriteBuilder(OutputFile file) {
this.appenderBuilder = write(file);
this.location = file.location();
}
public DeleteWriteBuilder forTable(Table table) {
rowSchema(table.schema());
withSpec(table.spec());
setAll(table.properties());
metricsConfig(MetricsConfig.fromProperties(table.properties()));
return this;
}
public DeleteWriteBuilder set(String property, String value) {
appenderBuilder.set(property, value);
return this;
}
public DeleteWriteBuilder setAll(Map<String, String> properties) {
appenderBuilder.setAll(properties);
return this;
}
public DeleteWriteBuilder meta(String property, String value) {
appenderBuilder.meta(property, value);
return this;
}
public DeleteWriteBuilder overwrite() {
return overwrite(true);
}
public DeleteWriteBuilder overwrite(boolean enabled) {
appenderBuilder.overwrite(enabled);
return this;
}
public DeleteWriteBuilder metricsConfig(MetricsConfig newMetricsConfig) {
// TODO: keep full metrics for position delete file columns
appenderBuilder.metricsConfig(newMetricsConfig);
return this;
}
public DeleteWriteBuilder createWriterFunc(Function<MessageType, ParquetValueWriter<?>> newCreateWriterFunc) {
this.createWriterFunc = newCreateWriterFunc;
return this;
}
public DeleteWriteBuilder rowSchema(Schema newSchema) {
this.rowSchema = newSchema;
return this;
}
public DeleteWriteBuilder withSpec(PartitionSpec newSpec) {
this.spec = newSpec;
return this;
}
public DeleteWriteBuilder withPartition(StructLike key) {
this.partition = key;
return this;
}
public DeleteWriteBuilder withKeyMetadata(EncryptionKeyMetadata metadata) {
this.keyMetadata = metadata;
return this;
}
public DeleteWriteBuilder equalityFieldIds(List<Integer> fieldIds) {
this.equalityFieldIds = ArrayUtil.toIntArray(fieldIds);
return this;
}
public DeleteWriteBuilder equalityFieldIds(int... fieldIds) {
this.equalityFieldIds = fieldIds;
return this;
}
public DeleteWriteBuilder transformPaths(Function<CharSequence, ?> newPathTransformFunc) {
this.pathTransformFunc = newPathTransformFunc;
return this;
}
public DeleteWriteBuilder withSortOrder(SortOrder newSortOrder) {
this.sortOrder = newSortOrder;
return this;
}
public <T> EqualityDeleteWriter<T> buildEqualityWriter() throws IOException {
Preconditions.checkState(rowSchema != null, "Cannot create equality delete file without a schema`");
Preconditions.checkState(equalityFieldIds != null, "Cannot create equality delete file without delete field ids");
Preconditions.checkState(createWriterFunc != null,
"Cannot create equality delete file unless createWriterFunc is set");
Preconditions.checkArgument(spec != null,
"Spec must not be null when creating equality delete writer");
Preconditions.checkArgument(spec.isUnpartitioned() || partition != null,
"Partition must not be null for partitioned writes");
meta("delete-type", "equality");
meta("delete-field-ids", IntStream.of(equalityFieldIds)
.mapToObj(Objects::toString)
.collect(Collectors.joining(", ")));
// the appender uses the row schema without extra columns
appenderBuilder.schema(rowSchema);
appenderBuilder.createWriterFunc(createWriterFunc);
appenderBuilder.createContextFunc(WriteBuilder.Context::deleteContext);
return new EqualityDeleteWriter<>(
appenderBuilder.build(), FileFormat.PARQUET, location, spec, partition, keyMetadata,
sortOrder, equalityFieldIds);
}
public <T> PositionDeleteWriter<T> buildPositionWriter() throws IOException {
Preconditions.checkState(equalityFieldIds == null, "Cannot create position delete file using delete field ids");
Preconditions.checkArgument(spec != null,
"Spec must not be null when creating position delete writer");
Preconditions.checkArgument(spec.isUnpartitioned() || partition != null,
"Partition must not be null for partitioned writes");
meta("delete-type", "position");
if (rowSchema != null && createWriterFunc != null) {
// the appender uses the row schema wrapped with position fields
appenderBuilder.schema(DeleteSchemaUtil.posDeleteSchema(rowSchema));
appenderBuilder.createWriterFunc(parquetSchema -> {
ParquetValueWriter<?> writer = createWriterFunc.apply(parquetSchema);
if (writer instanceof StructWriter) {
return new PositionDeleteStructWriter<T>((StructWriter<?>) writer, pathTransformFunc);
} else {
throw new UnsupportedOperationException("Cannot wrap writer for position deletes: " + writer.getClass());
}
});
} else {
appenderBuilder.schema(DeleteSchemaUtil.pathPosSchema());
appenderBuilder.createWriterFunc(parquetSchema ->
new PositionDeleteStructWriter<T>((StructWriter<?>) GenericParquetWriter.buildWriter(parquetSchema),
Function.identity()));
}
appenderBuilder.createContextFunc(WriteBuilder.Context::deleteContext);
return new PositionDeleteWriter<>(
appenderBuilder.build(), FileFormat.PARQUET, location, spec, partition, keyMetadata);
}
}
private static class ParquetWriteBuilder<T> extends ParquetWriter.Builder<T, ParquetWriteBuilder<T>> {
private Map<String, String> keyValueMetadata = Maps.newHashMap();
private Map<String, String> config = Maps.newHashMap();
private MessageType type;
private WriteSupport<T> writeSupport;
private ParquetWriteBuilder(org.apache.parquet.io.OutputFile path) {
super(path);
}
@Override
protected ParquetWriteBuilder<T> self() {
return this;
}
public ParquetWriteBuilder<T> setKeyValueMetadata(Map<String, String> keyValueMetadata) {
this.keyValueMetadata = keyValueMetadata;
return self();
}
public ParquetWriteBuilder<T> setConfig(Map<String, String> config) {
this.config = config;
return self();
}
public ParquetWriteBuilder<T> setType(MessageType type) {
this.type = type;
return self();
}
public ParquetWriteBuilder<T> setWriteSupport(WriteSupport<T> writeSupport) {
this.writeSupport = writeSupport;
return self();
}
@Override
protected WriteSupport<T> getWriteSupport(Configuration configuration) {
for (Map.Entry<String, String> entry : config.entrySet()) {
configuration.set(entry.getKey(), entry.getValue());
}
return new ParquetWriteSupport<>(type, keyValueMetadata, writeSupport);
}
}
public static ReadBuilder read(InputFile file) {
return new ReadBuilder(file);
}
public static class ReadBuilder {
private final InputFile file;
private final Map<String, String> properties = Maps.newHashMap();
private Long start = null;
private Long length = null;
private Schema schema = null;
private Expression filter = null;
private ReadSupport<?> readSupport = null;
private Function<MessageType, VectorizedReader<?>> batchedReaderFunc = null;
private Function<MessageType, ParquetValueReader<?>> readerFunc = null;
private boolean filterRecords = true;
private boolean caseSensitive = true;
private boolean callInit = false;
private boolean reuseContainers = false;
private int maxRecordsPerBatch = 10000;
private NameMapping nameMapping = null;
private ReadBuilder(InputFile file) {
this.file = file;
}
/**
* Restricts the read to the given range: [start, start + length).
*
* @param newStart the start position for this read
* @param newLength the length of the range this read should scan
* @return this builder for method chaining
*/
public ReadBuilder split(long newStart, long newLength) {
this.start = newStart;
this.length = newLength;
return this;
}
public ReadBuilder project(Schema newSchema) {
this.schema = newSchema;
return this;
}
public ReadBuilder caseInsensitive() {
return caseSensitive(false);
}
public ReadBuilder caseSensitive(boolean newCaseSensitive) {
this.caseSensitive = newCaseSensitive;
return this;
}
public ReadBuilder filterRecords(boolean newFilterRecords) {
this.filterRecords = newFilterRecords;
return this;
}
public ReadBuilder filter(Expression newFilter) {
this.filter = newFilter;
return this;
}
public ReadBuilder readSupport(ReadSupport<?> newFilterSupport) {
this.readSupport = newFilterSupport;
return this;
}
public ReadBuilder createReaderFunc(Function<MessageType, ParquetValueReader<?>> newReaderFunction) {
Preconditions.checkArgument(this.batchedReaderFunc == null,
"Reader function cannot be set since the batched version is already set");
this.readerFunc = newReaderFunction;
return this;
}
public ReadBuilder createBatchedReaderFunc(Function<MessageType, VectorizedReader<?>> func) {
Preconditions.checkArgument(this.readerFunc == null,
"Batched reader function cannot be set since the non-batched version is already set");
this.batchedReaderFunc = func;
return this;
}
public ReadBuilder set(String key, String value) {
properties.put(key, value);
return this;
}
public ReadBuilder callInit() {
this.callInit = true;
return this;
}
public ReadBuilder reuseContainers() {
this.reuseContainers = true;
return this;
}
public ReadBuilder recordsPerBatch(int numRowsPerBatch) {
this.maxRecordsPerBatch = numRowsPerBatch;
return this;
}
public ReadBuilder withNameMapping(NameMapping newNameMapping) {
this.nameMapping = newNameMapping;
return this;
}
@SuppressWarnings({"unchecked", "checkstyle:CyclomaticComplexity"})
public <D> CloseableIterable<D> build() {
if (readerFunc != null || batchedReaderFunc != null) {
ParquetReadOptions.Builder optionsBuilder;
if (file instanceof HadoopInputFile) {
// remove read properties already set that may conflict with this read
Configuration conf = new Configuration(((HadoopInputFile) file).getConf());
for (String property : READ_PROPERTIES_TO_REMOVE) {
conf.unset(property);
}
optionsBuilder = HadoopReadOptions.builder(conf);
} else {
optionsBuilder = ParquetReadOptions.builder();
}
for (Map.Entry<String, String> entry : properties.entrySet()) {
optionsBuilder.set(entry.getKey(), entry.getValue());
}
if (start != null) {
optionsBuilder.withRange(start, start + length);
}
ParquetReadOptions options = optionsBuilder.build();
if (batchedReaderFunc != null) {
return new VectorizedParquetReader<>(file, schema, options, batchedReaderFunc, nameMapping, filter,
reuseContainers, caseSensitive, maxRecordsPerBatch);
} else {
return new org.apache.iceberg.parquet.ParquetReader<>(
file, schema, options, readerFunc, nameMapping, filter, reuseContainers, caseSensitive);
}
}
ParquetReadBuilder<D> builder = new ParquetReadBuilder<>(ParquetIO.file(file));
builder.project(schema);
if (readSupport != null) {
builder.readSupport((ReadSupport<D>) readSupport);
} else {
builder.readSupport(new AvroReadSupport<>(ParquetAvro.DEFAULT_MODEL));
}
// default options for readers
builder.set("parquet.strict.typing", "false") // allow type promotion
.set("parquet.avro.compatible", "false") // use the new RecordReader with Utf8 support
.set("parquet.avro.add-list-element-records", "false"); // assume that lists use a 3-level schema
for (Map.Entry<String, String> entry : properties.entrySet()) {
builder.set(entry.getKey(), entry.getValue());
}
if (filter != null) {
// TODO: should not need to get the schema to push down before opening the file.
// Parquet should allow setting a filter inside its read support
MessageType type;
try (ParquetFileReader schemaReader = ParquetFileReader.open(ParquetIO.file(file))) {
type = schemaReader.getFileMetaData().getSchema();
} catch (IOException e) {
throw new RuntimeIOException(e);
}
Schema fileSchema = ParquetSchemaUtil.convert(type);
builder.useStatsFilter()
.useDictionaryFilter()
.useRecordFilter(filterRecords)
.withFilter(ParquetFilters.convert(fileSchema, filter, caseSensitive));
} else {
// turn off filtering
builder.useStatsFilter(false)
.useDictionaryFilter(false)
.useRecordFilter(false);
}
if (callInit) {
builder.callInit();
}
if (start != null) {
builder.withFileRange(start, start + length);
}
if (nameMapping != null) {
builder.withNameMapping(nameMapping);
}
return new ParquetIterable<>(builder);
}
}
private static class ParquetReadBuilder<T> extends ParquetReader.Builder<T> {
private Schema schema = null;
private ReadSupport<T> readSupport = null;
private boolean callInit = false;
private NameMapping nameMapping = null;
private ParquetReadBuilder(org.apache.parquet.io.InputFile file) {
super(file);
}
public ParquetReadBuilder<T> project(Schema newSchema) {
this.schema = newSchema;
return this;
}
public ParquetReadBuilder<T> withNameMapping(NameMapping newNameMapping) {
this.nameMapping = newNameMapping;
return this;
}
public ParquetReadBuilder<T> readSupport(ReadSupport<T> newReadSupport) {
this.readSupport = newReadSupport;
return this;
}
public ParquetReadBuilder<T> callInit() {
this.callInit = true;
return this;
}
@Override
protected ReadSupport<T> getReadSupport() {
return new ParquetReadSupport<>(schema, readSupport, callInit, nameMapping);
}
}
/**
* Combines several files into one
*
* @param inputFiles an {@link Iterable} of parquet files. The order of iteration determines the order in which
* content of files are read and written to the {@code outputFile}
* @param outputFile the output parquet file containing all the data from {@code inputFiles}
* @param rowGroupSize the row group size to use when writing the {@code outputFile}
* @param schema the schema of the data
* @param metadata extraMetadata to write at the footer of the {@code outputFile}
*/
public static void concat(Iterable<File> inputFiles, File outputFile, int rowGroupSize, Schema schema,
Map<String, String> metadata) throws IOException {
OutputFile file = Files.localOutput(outputFile);
ParquetFileWriter writer = new ParquetFileWriter(
ParquetIO.file(file), ParquetSchemaUtil.convert(schema, "table"),
ParquetFileWriter.Mode.CREATE, rowGroupSize, 0);
writer.start();
for (File inputFile : inputFiles) {
writer.appendFile(ParquetIO.file(Files.localInput(inputFile)));
}
writer.end(metadata);
}
}