blob: c90c8a910b16278ff6799a72d7f8857a8391a6f7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.pig;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.Schema;
import org.apache.iceberg.parquet.ParquetSchemaUtil;
import org.apache.iceberg.parquet.ParquetValueReader;
import org.apache.iceberg.parquet.ParquetValueReaders;
import org.apache.iceberg.parquet.ParquetValueReaders.BinaryAsDecimalReader;
import org.apache.iceberg.parquet.ParquetValueReaders.FloatAsDoubleReader;
import org.apache.iceberg.parquet.ParquetValueReaders.IntAsLongReader;
import org.apache.iceberg.parquet.ParquetValueReaders.IntegerAsDecimalReader;
import org.apache.iceberg.parquet.ParquetValueReaders.LongAsDecimalReader;
import org.apache.iceberg.parquet.ParquetValueReaders.PrimitiveReader;
import org.apache.iceberg.parquet.ParquetValueReaders.RepeatedKeyValueReader;
import org.apache.iceberg.parquet.ParquetValueReaders.RepeatedReader;
import org.apache.iceberg.parquet.ParquetValueReaders.ReusableEntry;
import org.apache.iceberg.parquet.ParquetValueReaders.StringReader;
import org.apache.iceberg.parquet.ParquetValueReaders.StructReader;
import org.apache.iceberg.parquet.ParquetValueReaders.UnboxedReader;
import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Type.TypeID;
import org.apache.iceberg.types.Types;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.schema.DecimalMetadata;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
public class PigParquetReader {
private PigParquetReader() {
}
@SuppressWarnings("unchecked")
public static ParquetValueReader<Tuple> buildReader(
MessageType fileSchema, Schema expectedSchema, Map<Integer, Object> partitionValues) {
if (ParquetSchemaUtil.hasIds(fileSchema)) {
return (ParquetValueReader<Tuple>)
TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
new ReadBuilder(fileSchema, partitionValues));
} else {
return (ParquetValueReader<Tuple>)
TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
new FallbackReadBuilder(fileSchema, partitionValues));
}
}
private static class FallbackReadBuilder extends ReadBuilder {
FallbackReadBuilder(MessageType type, Map<Integer, Object> partitionValues) {
super(type, partitionValues);
}
@Override
public ParquetValueReader<?> message(
Types.StructType expected, MessageType message, List<ParquetValueReader<?>> fieldReaders) {
// the top level matches by ID, but the remaining IDs are missing
return super.struct(expected, message, fieldReaders);
}
@Override
public ParquetValueReader<?> struct(
Types.StructType ignored, GroupType struct, List<ParquetValueReader<?>> fieldReaders) {
// the expected struct is ignored because nested fields are never found when the
List<ParquetValueReader<?>> newFields = Lists.newArrayListWithExpectedSize(
fieldReaders.size());
List<Type> types = Lists.newArrayListWithExpectedSize(fieldReaders.size());
List<Type> fields = struct.getFields();
for (int i = 0; i < fields.size(); i += 1) {
Type fieldType = fields.get(i);
int fieldD = getMessageType().getMaxDefinitionLevel(path(fieldType.getName())) - 1;
newFields.add(ParquetValueReaders.option(fieldType, fieldD, fieldReaders.get(i)));
types.add(fieldType);
}
return new TupleReader(types, newFields);
}
}
private static class ReadBuilder extends TypeWithSchemaVisitor<ParquetValueReader<?>> {
private final MessageType type;
private final Map<Integer, Object> partitionValues;
ReadBuilder(MessageType type, Map<Integer, Object> partitionValues) {
this.type = type;
this.partitionValues = partitionValues;
}
MessageType getMessageType() {
return this.type;
}
@Override
public ParquetValueReader<?> message(
Types.StructType expected, MessageType message, List<ParquetValueReader<?>> fieldReaders) {
return struct(expected, message.asGroupType(), fieldReaders);
}
@Override
public ParquetValueReader<?> struct(
Types.StructType expected, GroupType struct, List<ParquetValueReader<?>> fieldReaders) {
// match the expected struct's order
Map<Integer, ParquetValueReader<?>> readersById = Maps.newHashMap();
Map<Integer, Type> typesById = Maps.newHashMap();
List<Type> fields = struct.getFields();
for (int i = 0; i < fields.size(); i += 1) {
Type fieldType = fields.get(i);
int fieldD = type.getMaxDefinitionLevel(path(fieldType.getName())) - 1;
int id = fieldType.getId().intValue();
readersById.put(id, ParquetValueReaders.option(fieldType, fieldD, fieldReaders.get(i)));
typesById.put(id, fieldType);
}
List<Types.NestedField> expectedFields = expected != null ?
expected.fields() : ImmutableList.of();
List<ParquetValueReader<?>> reorderedFields = Lists.newArrayListWithExpectedSize(
expectedFields.size());
List<Type> types = Lists.newArrayListWithExpectedSize(expectedFields.size());
for (Types.NestedField field : expectedFields) {
int id = field.fieldId();
if (partitionValues.containsKey(id)) {
// the value may be null so containsKey is used to check for a partition value
reorderedFields.add(ParquetValueReaders.constant(partitionValues.get(id)));
types.add(null);
} else {
ParquetValueReader<?> reader = readersById.get(id);
if (reader != null) {
reorderedFields.add(reader);
types.add(typesById.get(id));
} else {
reorderedFields.add(ParquetValueReaders.nulls());
types.add(null);
}
}
}
return new TupleReader(types, reorderedFields);
}
@Override
public ParquetValueReader<?> list(
Types.ListType expectedList, GroupType array, ParquetValueReader<?> elementReader) {
GroupType repeated = array.getFields().get(0).asGroupType();
String[] repeatedPath = currentPath();
int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1;
int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1;
Type elementType = repeated.getType(0);
int elementD = type.getMaxDefinitionLevel(path(elementType.getName())) - 1;
return new ArrayReader<>(repeatedD, repeatedR, ParquetValueReaders.option(elementType, elementD, elementReader));
}
@Override
public ParquetValueReader<?> map(
Types.MapType expectedMap, GroupType map, ParquetValueReader<?> keyReader, ParquetValueReader<?> valueReader) {
GroupType repeatedKeyValue = map.getFields().get(0).asGroupType();
String[] repeatedPath = currentPath();
int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1;
int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1;
Type keyType = repeatedKeyValue.getType(0);
int keyD = type.getMaxDefinitionLevel(path(keyType.getName())) - 1;
Type valueType = repeatedKeyValue.getType(1);
int valueD = type.getMaxDefinitionLevel(path(valueType.getName())) - 1;
return new MapReader<>(repeatedD, repeatedR,
ParquetValueReaders.option(keyType, keyD, keyReader),
ParquetValueReaders.option(valueType, valueD, valueReader));
}
@Override
public ParquetValueReader<?> primitive(
org.apache.iceberg.types.Type.PrimitiveType expected, PrimitiveType primitive) {
ColumnDescriptor desc = type.getColumnDescription(currentPath());
if (primitive.getOriginalType() != null) {
switch (primitive.getOriginalType()) {
case ENUM:
case JSON:
case UTF8:
return new StringReader(desc);
case DATE:
return new DateReader(desc);
case INT_8:
case INT_16:
case INT_32:
if (expected != null && expected.typeId() == Types.LongType.get().typeId()) {
return new IntAsLongReader(desc);
} else {
return new UnboxedReader(desc);
}
case INT_64: return new UnboxedReader<>(desc);
case TIMESTAMP_MILLIS: return new TimestampMillisReader(desc);
case TIMESTAMP_MICROS: return new TimestampMicrosReader(desc);
case DECIMAL:
DecimalMetadata decimal = primitive.getDecimalMetadata();
switch (primitive.getPrimitiveTypeName()) {
case BINARY:
case FIXED_LEN_BYTE_ARRAY:
return new BinaryAsDecimalReader(desc, decimal.getScale());
case INT32:
return new IntegerAsDecimalReader(desc, decimal.getScale());
case INT64:
return new LongAsDecimalReader(desc, decimal.getScale());
default:
throw new UnsupportedOperationException(
"Unsupported base type for decimal: " + primitive.getPrimitiveTypeName());
}
default:
throw new UnsupportedOperationException("Unsupported type: " + primitive.getOriginalType());
}
}
switch (primitive.getPrimitiveTypeName()) {
case FIXED_LEN_BYTE_ARRAY:
case BINARY:
return new BytesReader(desc);
case INT32:
if (expected != null && expected.typeId() == TypeID.LONG) {
return new IntAsLongReader(desc);
} else {
return new UnboxedReader<>(desc);
}
case FLOAT:
if (expected != null && expected.typeId() == TypeID.DOUBLE) {
return new FloatAsDoubleReader(desc);
} else {
return new UnboxedReader<>(desc);
}
case BOOLEAN:
case INT64:
case DOUBLE:
return new UnboxedReader<>(desc);
default:
throw new UnsupportedOperationException("Unsupported type: " + primitive);
}
}
}
private static class DateReader extends PrimitiveReader<String> {
private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
DateReader(ColumnDescriptor desc) {
super(desc);
}
@Override
public String read(String reuse) {
OffsetDateTime day = EPOCH.plusDays(column.nextInteger());
return String.format("%04d-%02d-%02d", day.getYear(), day.getMonth().getValue(), day.getDayOfMonth());
}
}
private static class BytesReader extends PrimitiveReader<DataByteArray> {
BytesReader(ColumnDescriptor desc) {
super(desc);
}
@Override
public DataByteArray read(DataByteArray reuse) {
byte[] bytes = column.nextBinary().getBytes();
return new DataByteArray(bytes);
}
}
private static class TimestampMicrosReader extends UnboxedReader<String> {
private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
TimestampMicrosReader(ColumnDescriptor desc) {
super(desc);
}
@Override
public String read(String ignored) {
return ChronoUnit.MICROS.addTo(EPOCH, column.nextLong()).toString();
}
}
private static class TimestampMillisReader extends UnboxedReader<String> {
private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
TimestampMillisReader(ColumnDescriptor desc) {
super(desc);
}
@Override
public String read(String ignored) {
return ChronoUnit.MILLIS.addTo(EPOCH, column.nextLong()).toString();
}
}
private static class MapReader<K, V> extends RepeatedKeyValueReader<Map<K, V>, Map<K, V>, K, V> {
private final ReusableEntry<K, V> nullEntry = new ReusableEntry<>();
MapReader(int definitionLevel, int repetitionLevel,
ParquetValueReader<K> keyReader, ParquetValueReader<V> valueReader) {
super(definitionLevel, repetitionLevel, keyReader, valueReader);
}
@Override
protected Map<K, V> newMapData(Map<K, V> reuse) {
return new LinkedHashMap<>();
}
@Override
protected Map.Entry<K, V> getPair(Map<K, V> reuse) {
return nullEntry;
}
@Override
protected void addPair(Map<K, V> map, K key, V value) {
map.put(key, value);
}
@Override
protected Map<K, V> buildMap(Map<K, V> map) {
return map;
}
}
private static class ArrayReader<T> extends RepeatedReader<DataBag, DataBag, T> {
private final BagFactory bagFactory = BagFactory.getInstance();
private final TupleFactory tupleFactory = TupleFactory.getInstance();
ArrayReader(int definitionLevel, int repetitionLevel, ParquetValueReader<T> reader) {
super(definitionLevel, repetitionLevel, reader);
}
@Override
protected DataBag newListData(DataBag reuse) {
return bagFactory.newDefaultBag();
}
@Override
protected T getElement(DataBag list) {
return null;
}
@Override
protected void addElement(DataBag bag, T element) {
bag.add(tupleFactory.newTuple(element));
}
@Override
protected DataBag buildList(DataBag bag) {
return bag;
}
}
private static class TupleReader extends StructReader<Tuple, Tuple> {
private static final TupleFactory TF = TupleFactory.getInstance();
private final int numColumns;
TupleReader(List<Type> types, List<ParquetValueReader<?>> readers) {
super(types, readers);
this.numColumns = readers.size();
}
@Override
protected Tuple newStructData(Tuple reuse) {
return TF.newTuple(numColumns);
}
@Override
protected Object getField(Tuple tuple, int pos) {
return null;
}
@Override
protected Tuple buildStruct(Tuple tuple) {
return tuple;
}
@Override
protected void set(Tuple tuple, int pos, Object value) {
try {
tuple.set(pos, value);
} catch (ExecException e) {
throw new RuntimeException(String.format("Error setting tuple value for pos: %d, value: %s", pos, value), e);
}
}
}
}