| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.iceberg.parquet; |
| |
| import java.lang.reflect.Array; |
| import java.math.BigDecimal; |
| import java.nio.ByteBuffer; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.function.Function; |
| import java.util.stream.Stream; |
| import org.apache.avro.util.Utf8; |
| import org.apache.iceberg.FieldMetrics; |
| import org.apache.iceberg.FloatFieldMetrics; |
| import org.apache.iceberg.deletes.PositionDelete; |
| import org.apache.iceberg.relocated.com.google.common.base.Preconditions; |
| import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; |
| import org.apache.iceberg.types.TypeUtil; |
| import org.apache.iceberg.util.DecimalUtil; |
| import org.apache.parquet.column.ColumnDescriptor; |
| import org.apache.parquet.column.ColumnWriteStore; |
| import org.apache.parquet.io.api.Binary; |
| import org.apache.parquet.schema.Type; |
| |
| public class ParquetValueWriters { |
| private ParquetValueWriters() { |
| } |
| |
| public static <T> ParquetValueWriter<T> option(Type type, |
| int definitionLevel, |
| ParquetValueWriter<T> writer) { |
| if (type.isRepetition(Type.Repetition.OPTIONAL)) { |
| return new OptionWriter<>(definitionLevel, writer); |
| } |
| |
| return writer; |
| } |
| |
| public static UnboxedWriter<Boolean> booleans(ColumnDescriptor desc) { |
| return new UnboxedWriter<>(desc); |
| } |
| |
| public static UnboxedWriter<Byte> tinyints(ColumnDescriptor desc) { |
| return new ByteWriter(desc); |
| } |
| |
| public static UnboxedWriter<Short> shorts(ColumnDescriptor desc) { |
| return new ShortWriter(desc); |
| } |
| |
| public static UnboxedWriter<Integer> ints(ColumnDescriptor desc) { |
| return new UnboxedWriter<>(desc); |
| } |
| |
| public static UnboxedWriter<Long> longs(ColumnDescriptor desc) { |
| return new UnboxedWriter<>(desc); |
| } |
| |
| public static UnboxedWriter<Float> floats(ColumnDescriptor desc) { |
| return new FloatWriter(desc); |
| } |
| |
| public static UnboxedWriter<Double> doubles(ColumnDescriptor desc) { |
| return new DoubleWriter(desc); |
| } |
| |
| public static PrimitiveWriter<CharSequence> strings(ColumnDescriptor desc) { |
| return new StringWriter(desc); |
| } |
| |
| public static PrimitiveWriter<BigDecimal> decimalAsInteger(ColumnDescriptor desc, |
| int precision, int scale) { |
| return new IntegerDecimalWriter(desc, precision, scale); |
| } |
| |
| public static PrimitiveWriter<BigDecimal> decimalAsLong(ColumnDescriptor desc, |
| int precision, int scale) { |
| return new LongDecimalWriter(desc, precision, scale); |
| } |
| |
| public static PrimitiveWriter<BigDecimal> decimalAsFixed(ColumnDescriptor desc, |
| int precision, int scale) { |
| return new FixedDecimalWriter(desc, precision, scale); |
| } |
| |
| public static PrimitiveWriter<ByteBuffer> byteBuffers(ColumnDescriptor desc) { |
| return new BytesWriter(desc); |
| } |
| |
| public static <E> CollectionWriter<E> collections(int dl, int rl, ParquetValueWriter<E> writer) { |
| return new CollectionWriter<>(dl, rl, writer); |
| } |
| |
| public static <K, V> MapWriter<K, V> maps(int dl, int rl, |
| ParquetValueWriter<K> keyWriter, |
| ParquetValueWriter<V> valueWriter) { |
| return new MapWriter<>(dl, rl, keyWriter, valueWriter); |
| } |
| |
| public abstract static class PrimitiveWriter<T> implements ParquetValueWriter<T> { |
| @SuppressWarnings("checkstyle:VisibilityModifier") |
| protected final ColumnWriter<T> column; |
| private final List<TripleWriter<?>> children; |
| |
| protected PrimitiveWriter(ColumnDescriptor desc) { |
| this.column = ColumnWriter.newWriter(desc); |
| this.children = ImmutableList.of(column); |
| } |
| |
| @Override |
| public void write(int repetitionLevel, T value) { |
| column.write(repetitionLevel, value); |
| } |
| |
| @Override |
| public List<TripleWriter<?>> columns() { |
| return children; |
| } |
| |
| @Override |
| public void setColumnStore(ColumnWriteStore columnStore) { |
| this.column.setColumnStore(columnStore); |
| } |
| } |
| |
| private static class UnboxedWriter<T> extends PrimitiveWriter<T> { |
| private UnboxedWriter(ColumnDescriptor desc) { |
| super(desc); |
| } |
| |
| public void writeBoolean(int repetitionLevel, boolean value) { |
| column.writeBoolean(repetitionLevel, value); |
| } |
| |
| public void writeInteger(int repetitionLevel, int value) { |
| column.writeInteger(repetitionLevel, value); |
| } |
| |
| public void writeLong(int repetitionLevel, long value) { |
| column.writeLong(repetitionLevel, value); |
| } |
| |
| public void writeFloat(int repetitionLevel, float value) { |
| column.writeFloat(repetitionLevel, value); |
| } |
| |
| public void writeDouble(int repetitionLevel, double value) { |
| column.writeDouble(repetitionLevel, value); |
| } |
| } |
| |
| private static class FloatWriter extends UnboxedWriter<Float> { |
| private final int id; |
| private long nanCount; |
| |
| private FloatWriter(ColumnDescriptor desc) { |
| super(desc); |
| this.id = desc.getPrimitiveType().getId().intValue(); |
| this.nanCount = 0; |
| } |
| |
| @Override |
| public void write(int repetitionLevel, Float value) { |
| writeFloat(repetitionLevel, value); |
| if (Float.isNaN(value)) { |
| nanCount++; |
| } |
| } |
| |
| @Override |
| public Stream<FieldMetrics> metrics() { |
| return Stream.of(new FloatFieldMetrics(id, nanCount)); |
| } |
| } |
| |
| private static class DoubleWriter extends UnboxedWriter<Double> { |
| private final int id; |
| private long nanCount; |
| |
| private DoubleWriter(ColumnDescriptor desc) { |
| super(desc); |
| this.id = desc.getPrimitiveType().getId().intValue(); |
| this.nanCount = 0; |
| } |
| |
| @Override |
| public void write(int repetitionLevel, Double value) { |
| writeDouble(repetitionLevel, value); |
| if (Double.isNaN(value)) { |
| nanCount++; |
| } |
| } |
| |
| @Override |
| public Stream<FieldMetrics> metrics() { |
| return Stream.of(new FloatFieldMetrics(id, nanCount)); |
| } |
| } |
| |
| private static class ByteWriter extends UnboxedWriter<Byte> { |
| private ByteWriter(ColumnDescriptor desc) { |
| super(desc); |
| } |
| |
| @Override |
| public void write(int repetitionLevel, Byte value) { |
| writeInteger(repetitionLevel, value.intValue()); |
| } |
| } |
| |
| private static class ShortWriter extends UnboxedWriter<Short> { |
| private ShortWriter(ColumnDescriptor desc) { |
| super(desc); |
| } |
| |
| @Override |
| public void write(int repetitionLevel, Short value) { |
| writeInteger(repetitionLevel, value.intValue()); |
| } |
| } |
| |
| private static class IntegerDecimalWriter extends PrimitiveWriter<BigDecimal> { |
| private final int precision; |
| private final int scale; |
| |
| private IntegerDecimalWriter(ColumnDescriptor desc, int precision, int scale) { |
| super(desc); |
| this.precision = precision; |
| this.scale = scale; |
| } |
| |
| @Override |
| public void write(int repetitionLevel, BigDecimal decimal) { |
| Preconditions.checkArgument(decimal.scale() == scale, |
| "Cannot write value as decimal(%s,%s), wrong scale: %s", precision, scale, decimal); |
| Preconditions.checkArgument(decimal.precision() <= precision, |
| "Cannot write value as decimal(%s,%s), too large: %s", precision, scale, decimal); |
| |
| column.writeInteger(repetitionLevel, decimal.unscaledValue().intValue()); |
| } |
| } |
| |
| private static class LongDecimalWriter extends PrimitiveWriter<BigDecimal> { |
| private final int precision; |
| private final int scale; |
| |
| private LongDecimalWriter(ColumnDescriptor desc, int precision, int scale) { |
| super(desc); |
| this.precision = precision; |
| this.scale = scale; |
| } |
| |
| @Override |
| public void write(int repetitionLevel, BigDecimal decimal) { |
| Preconditions.checkArgument(decimal.scale() == scale, |
| "Cannot write value as decimal(%s,%s), wrong scale: %s", precision, scale, decimal); |
| Preconditions.checkArgument(decimal.precision() <= precision, |
| "Cannot write value as decimal(%s,%s), too large: %s", precision, scale, decimal); |
| |
| column.writeLong(repetitionLevel, decimal.unscaledValue().longValue()); |
| } |
| } |
| |
| private static class FixedDecimalWriter extends PrimitiveWriter<BigDecimal> { |
| private final int precision; |
| private final int scale; |
| private final ThreadLocal<byte[]> bytes; |
| |
| private FixedDecimalWriter(ColumnDescriptor desc, int precision, int scale) { |
| super(desc); |
| this.precision = precision; |
| this.scale = scale; |
| this.bytes = ThreadLocal.withInitial(() -> new byte[TypeUtil.decimalRequiredBytes(precision)]); |
| } |
| |
| @Override |
| public void write(int repetitionLevel, BigDecimal decimal) { |
| byte[] binary = DecimalUtil.toReusedFixLengthBytes(precision, scale, decimal, bytes.get()); |
| column.writeBinary(repetitionLevel, Binary.fromReusedByteArray(binary)); |
| } |
| } |
| |
| private static class BytesWriter extends PrimitiveWriter<ByteBuffer> { |
| private BytesWriter(ColumnDescriptor desc) { |
| super(desc); |
| } |
| |
| @Override |
| public void write(int repetitionLevel, ByteBuffer buffer) { |
| column.writeBinary(repetitionLevel, Binary.fromReusedByteBuffer(buffer)); |
| } |
| } |
| |
| private static class StringWriter extends PrimitiveWriter<CharSequence> { |
| private StringWriter(ColumnDescriptor desc) { |
| super(desc); |
| } |
| |
| @Override |
| public void write(int repetitionLevel, CharSequence value) { |
| if (value instanceof Utf8) { |
| Utf8 utf8 = (Utf8) value; |
| column.writeBinary(repetitionLevel, |
| Binary.fromReusedByteArray(utf8.getBytes(), 0, utf8.getByteLength())); |
| } else { |
| column.writeBinary(repetitionLevel, Binary.fromString(value.toString())); |
| } |
| } |
| } |
| |
| static class OptionWriter<T> implements ParquetValueWriter<T> { |
| private final int definitionLevel; |
| private final ParquetValueWriter<T> writer; |
| private final List<TripleWriter<?>> children; |
| |
| OptionWriter(int definitionLevel, ParquetValueWriter<T> writer) { |
| this.definitionLevel = definitionLevel; |
| this.writer = writer; |
| this.children = writer.columns(); |
| } |
| |
| @Override |
| public void write(int repetitionLevel, T value) { |
| if (value != null) { |
| writer.write(repetitionLevel, value); |
| |
| } else { |
| for (TripleWriter<?> column : children) { |
| column.writeNull(repetitionLevel, definitionLevel - 1); |
| } |
| } |
| } |
| |
| @Override |
| public List<TripleWriter<?>> columns() { |
| return children; |
| } |
| |
| @Override |
| public void setColumnStore(ColumnWriteStore columnStore) { |
| writer.setColumnStore(columnStore); |
| } |
| |
| @Override |
| public Stream<FieldMetrics> metrics() { |
| return writer.metrics(); |
| } |
| } |
| |
| public abstract static class RepeatedWriter<L, E> implements ParquetValueWriter<L> { |
| private final int definitionLevel; |
| private final int repetitionLevel; |
| private final ParquetValueWriter<E> writer; |
| private final List<TripleWriter<?>> children; |
| |
| protected RepeatedWriter(int definitionLevel, int repetitionLevel, |
| ParquetValueWriter<E> writer) { |
| this.definitionLevel = definitionLevel; |
| this.repetitionLevel = repetitionLevel; |
| this.writer = writer; |
| this.children = writer.columns(); |
| } |
| |
| @Override |
| public void write(int parentRepetition, L value) { |
| Iterator<E> elements = elements(value); |
| |
| if (!elements.hasNext()) { |
| // write the empty list to each column |
| // TODO: make sure this definition level is correct |
| for (TripleWriter<?> column : children) { |
| column.writeNull(parentRepetition, definitionLevel - 1); |
| } |
| |
| } else { |
| boolean first = true; |
| while (elements.hasNext()) { |
| E element = elements.next(); |
| |
| int rl = repetitionLevel; |
| if (first) { |
| rl = parentRepetition; |
| first = false; |
| } |
| |
| writer.write(rl, element); |
| } |
| } |
| } |
| |
| @Override |
| public List<TripleWriter<?>> columns() { |
| return children; |
| } |
| |
| @Override |
| public void setColumnStore(ColumnWriteStore columnStore) { |
| writer.setColumnStore(columnStore); |
| } |
| |
| protected abstract Iterator<E> elements(L value); |
| |
| @Override |
| public Stream<FieldMetrics> metrics() { |
| return writer.metrics(); |
| } |
| } |
| |
| private static class CollectionWriter<E> extends RepeatedWriter<Collection<E>, E> { |
| private CollectionWriter(int definitionLevel, int repetitionLevel, |
| ParquetValueWriter<E> writer) { |
| super(definitionLevel, repetitionLevel, writer); |
| } |
| |
| @Override |
| protected Iterator<E> elements(Collection<E> list) { |
| return list.iterator(); |
| } |
| } |
| |
| public abstract static class RepeatedKeyValueWriter<M, K, V> implements ParquetValueWriter<M> { |
| private final int definitionLevel; |
| private final int repetitionLevel; |
| private final ParquetValueWriter<K> keyWriter; |
| private final ParquetValueWriter<V> valueWriter; |
| private final List<TripleWriter<?>> children; |
| |
| protected RepeatedKeyValueWriter(int definitionLevel, int repetitionLevel, |
| ParquetValueWriter<K> keyWriter, |
| ParquetValueWriter<V> valueWriter) { |
| this.definitionLevel = definitionLevel; |
| this.repetitionLevel = repetitionLevel; |
| this.keyWriter = keyWriter; |
| this.valueWriter = valueWriter; |
| this.children = ImmutableList.<TripleWriter<?>>builder() |
| .addAll(keyWriter.columns()) |
| .addAll(valueWriter.columns()) |
| .build(); |
| } |
| |
| @Override |
| public void write(int parentRepetition, M value) { |
| Iterator<Map.Entry<K, V>> pairs = pairs(value); |
| |
| if (!pairs.hasNext()) { |
| // write the empty map to each column |
| for (TripleWriter<?> column : children) { |
| column.writeNull(parentRepetition, definitionLevel - 1); |
| } |
| |
| } else { |
| boolean first = true; |
| while (pairs.hasNext()) { |
| Map.Entry<K, V> pair = pairs.next(); |
| |
| int rl = repetitionLevel; |
| if (first) { |
| rl = parentRepetition; |
| first = false; |
| } |
| |
| keyWriter.write(rl, pair.getKey()); |
| valueWriter.write(rl, pair.getValue()); |
| } |
| } |
| } |
| |
| @Override |
| public List<TripleWriter<?>> columns() { |
| return children; |
| } |
| |
| @Override |
| public void setColumnStore(ColumnWriteStore columnStore) { |
| keyWriter.setColumnStore(columnStore); |
| valueWriter.setColumnStore(columnStore); |
| } |
| |
| protected abstract Iterator<Map.Entry<K, V>> pairs(M value); |
| |
| @Override |
| public Stream<FieldMetrics> metrics() { |
| return Stream.concat(keyWriter.metrics(), valueWriter.metrics()); |
| } |
| } |
| |
| private static class MapWriter<K, V> extends RepeatedKeyValueWriter<Map<K, V>, K, V> { |
| private MapWriter(int definitionLevel, int repetitionLevel, |
| ParquetValueWriter<K> keyWriter, ParquetValueWriter<V> valueWriter) { |
| super(definitionLevel, repetitionLevel, keyWriter, valueWriter); |
| } |
| |
| @Override |
| protected Iterator<Map.Entry<K, V>> pairs(Map<K, V> map) { |
| return map.entrySet().iterator(); |
| } |
| } |
| |
| public abstract static class StructWriter<S> implements ParquetValueWriter<S> { |
| private final ParquetValueWriter<Object>[] writers; |
| private final List<TripleWriter<?>> children; |
| |
| @SuppressWarnings("unchecked") |
| protected StructWriter(List<ParquetValueWriter<?>> writers) { |
| this.writers = (ParquetValueWriter<Object>[]) Array.newInstance( |
| ParquetValueWriter.class, writers.size()); |
| |
| ImmutableList.Builder<TripleWriter<?>> columnsBuilder = ImmutableList.builder(); |
| for (int i = 0; i < writers.size(); i += 1) { |
| ParquetValueWriter<?> writer = writers.get(i); |
| this.writers[i] = (ParquetValueWriter<Object>) writer; |
| columnsBuilder.addAll(writer.columns()); |
| } |
| |
| this.children = columnsBuilder.build(); |
| } |
| |
| @Override |
| public void write(int repetitionLevel, S value) { |
| for (int i = 0; i < writers.length; i += 1) { |
| Object fieldValue = get(value, i); |
| writers[i].write(repetitionLevel, fieldValue); |
| } |
| } |
| |
| @Override |
| public List<TripleWriter<?>> columns() { |
| return children; |
| } |
| |
| @Override |
| public void setColumnStore(ColumnWriteStore columnStore) { |
| for (ParquetValueWriter<?> writer : writers) { |
| writer.setColumnStore(columnStore); |
| } |
| } |
| |
| protected abstract Object get(S struct, int index); |
| |
| @Override |
| public Stream<FieldMetrics> metrics() { |
| return Arrays.stream(writers).flatMap(ParquetValueWriter::metrics); |
| } |
| } |
| |
| public static class PositionDeleteStructWriter<R> extends StructWriter<PositionDelete<R>> { |
| private final Function<CharSequence, ?> pathTransformFunc; |
| |
| public PositionDeleteStructWriter(StructWriter<?> replacedWriter, Function<CharSequence, ?> pathTransformFunc) { |
| super(Arrays.asList(replacedWriter.writers)); |
| this.pathTransformFunc = pathTransformFunc; |
| } |
| |
| @Override |
| protected Object get(PositionDelete<R> delete, int index) { |
| switch (index) { |
| case 0: |
| return pathTransformFunc.apply(delete.path()); |
| case 1: |
| return delete.pos(); |
| case 2: |
| return delete.row(); |
| } |
| throw new IllegalArgumentException("Cannot get value for invalid index: " + index); |
| } |
| } |
| } |