| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.iceberg.parquet; |
| |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.stream.Stream; |
| import org.apache.iceberg.FieldMetrics; |
| import org.apache.iceberg.Metrics; |
| import org.apache.iceberg.MetricsConfig; |
| import org.apache.iceberg.MetricsModes; |
| import org.apache.iceberg.MetricsModes.MetricsMode; |
| import org.apache.iceberg.MetricsUtil; |
| import org.apache.iceberg.Schema; |
| import org.apache.iceberg.exceptions.RuntimeIOException; |
| import org.apache.iceberg.expressions.Literal; |
| import org.apache.iceberg.io.InputFile; |
| import org.apache.iceberg.mapping.NameMapping; |
| import org.apache.iceberg.relocated.com.google.common.collect.Maps; |
| import org.apache.iceberg.relocated.com.google.common.collect.Sets; |
| import org.apache.iceberg.types.Conversions; |
| import org.apache.iceberg.types.Type; |
| import org.apache.iceberg.types.Types; |
| import org.apache.iceberg.util.BinaryUtil; |
| import org.apache.iceberg.util.UnicodeUtil; |
| import org.apache.parquet.column.ColumnDescriptor; |
| import org.apache.parquet.column.Dictionary; |
| import org.apache.parquet.column.Encoding; |
| import org.apache.parquet.column.EncodingStats; |
| import org.apache.parquet.column.page.DictionaryPage; |
| import org.apache.parquet.column.page.PageReader; |
| import org.apache.parquet.column.statistics.Statistics; |
| import org.apache.parquet.hadoop.ParquetFileReader; |
| import org.apache.parquet.hadoop.metadata.BlockMetaData; |
| import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; |
| import org.apache.parquet.hadoop.metadata.ColumnPath; |
| import org.apache.parquet.hadoop.metadata.ParquetMetadata; |
| import org.apache.parquet.io.ParquetDecodingException; |
| import org.apache.parquet.schema.MessageType; |
| import org.apache.parquet.schema.PrimitiveType; |
| |
| public class ParquetUtil { |
| // not meant to be instantiated |
| private ParquetUtil() { |
| } |
| |
| public static Metrics fileMetrics(InputFile file, MetricsConfig metricsConfig) { |
| return fileMetrics(file, metricsConfig, null); |
| } |
| |
| public static Metrics fileMetrics(InputFile file, MetricsConfig metricsConfig, NameMapping nameMapping) { |
| try (ParquetFileReader reader = ParquetFileReader.open(ParquetIO.file(file))) { |
| return footerMetrics(reader.getFooter(), Stream.empty(), metricsConfig, nameMapping); |
| } catch (IOException e) { |
| throw new RuntimeIOException(e, "Failed to read footer of file: %s", file); |
| } |
| } |
| |
| public static Metrics footerMetrics(ParquetMetadata metadata, Stream<FieldMetrics> fieldMetrics, |
| MetricsConfig metricsConfig) { |
| return footerMetrics(metadata, fieldMetrics, metricsConfig, null); |
| } |
| |
| public static Metrics footerMetrics(ParquetMetadata metadata, Stream<FieldMetrics> fieldMetrics, |
| MetricsConfig metricsConfig, NameMapping nameMapping) { |
| long rowCount = 0; |
| Map<Integer, Long> columnSizes = Maps.newHashMap(); |
| Map<Integer, Long> valueCounts = Maps.newHashMap(); |
| Map<Integer, Long> nullValueCounts = Maps.newHashMap(); |
| Map<Integer, Literal<?>> lowerBounds = Maps.newHashMap(); |
| Map<Integer, Literal<?>> upperBounds = Maps.newHashMap(); |
| Set<Integer> missingStats = Sets.newHashSet(); |
| |
| // ignore metrics for fields we failed to determine reliable IDs |
| MessageType parquetTypeWithIds = getParquetTypeWithIds(metadata, nameMapping); |
| Schema fileSchema = ParquetSchemaUtil.convertAndPrune(parquetTypeWithIds); |
| |
| List<BlockMetaData> blocks = metadata.getBlocks(); |
| for (BlockMetaData block : blocks) { |
| rowCount += block.getRowCount(); |
| for (ColumnChunkMetaData column : block.getColumns()) { |
| |
| Integer fieldId = fileSchema.aliasToId(column.getPath().toDotString()); |
| if (fieldId == null) { |
| // fileSchema may contain a subset of columns present in the file |
| // as we prune columns we could not assign ids |
| continue; |
| } |
| |
| increment(columnSizes, fieldId, column.getTotalSize()); |
| |
| MetricsMode metricsMode = MetricsUtil.metricsMode(fileSchema, metricsConfig, fieldId); |
| if (metricsMode == MetricsModes.None.get()) { |
| continue; |
| } |
| increment(valueCounts, fieldId, column.getValueCount()); |
| |
| Statistics stats = column.getStatistics(); |
| if (stats == null) { |
| missingStats.add(fieldId); |
| } else if (!stats.isEmpty()) { |
| increment(nullValueCounts, fieldId, stats.getNumNulls()); |
| |
| if (metricsMode != MetricsModes.Counts.get()) { |
| Types.NestedField field = fileSchema.findField(fieldId); |
| if (field != null && stats.hasNonNullValue() && shouldStoreBounds(column, fileSchema)) { |
| Literal<?> min = ParquetConversions.fromParquetPrimitive( |
| field.type(), column.getPrimitiveType(), stats.genericGetMin()); |
| updateMin(lowerBounds, fieldId, field.type(), min, metricsMode); |
| Literal<?> max = ParquetConversions.fromParquetPrimitive( |
| field.type(), column.getPrimitiveType(), stats.genericGetMax()); |
| updateMax(upperBounds, fieldId, field.type(), max, metricsMode); |
| } |
| } |
| } |
| } |
| } |
| |
| // discard accumulated values if any stats were missing |
| for (Integer fieldId : missingStats) { |
| nullValueCounts.remove(fieldId); |
| lowerBounds.remove(fieldId); |
| upperBounds.remove(fieldId); |
| } |
| |
| return new Metrics(rowCount, columnSizes, valueCounts, nullValueCounts, |
| MetricsUtil.createNanValueCounts(fieldMetrics, metricsConfig, fileSchema), |
| toBufferMap(fileSchema, lowerBounds), toBufferMap(fileSchema, upperBounds)); |
| } |
| |
| private static MessageType getParquetTypeWithIds(ParquetMetadata metadata, NameMapping nameMapping) { |
| MessageType type = metadata.getFileMetaData().getSchema(); |
| |
| if (ParquetSchemaUtil.hasIds(type)) { |
| return type; |
| } |
| |
| if (nameMapping != null) { |
| return ParquetSchemaUtil.applyNameMapping(type, nameMapping); |
| } |
| |
| return ParquetSchemaUtil.addFallbackIds(type); |
| } |
| |
| /** |
| * Returns a list of offsets in ascending order determined by the starting position of the row groups. |
| */ |
| public static List<Long> getSplitOffsets(ParquetMetadata md) { |
| List<Long> splitOffsets = new ArrayList<>(md.getBlocks().size()); |
| for (BlockMetaData blockMetaData : md.getBlocks()) { |
| splitOffsets.add(blockMetaData.getStartingPos()); |
| } |
| Collections.sort(splitOffsets); |
| return splitOffsets; |
| } |
| |
| // we allow struct nesting, but not maps or arrays |
| private static boolean shouldStoreBounds(ColumnChunkMetaData column, Schema schema) { |
| if (column.getPrimitiveType().getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT96) { |
| // stats for INT96 are not reliable |
| return false; |
| } |
| |
| ColumnPath columnPath = column.getPath(); |
| Iterator<String> pathIterator = columnPath.iterator(); |
| Type currentType = schema.asStruct(); |
| |
| while (pathIterator.hasNext()) { |
| if (currentType == null || !currentType.isStructType()) { |
| return false; |
| } |
| String fieldName = pathIterator.next(); |
| currentType = currentType.asStructType().fieldType(fieldName); |
| } |
| |
| return currentType != null && currentType.isPrimitiveType(); |
| } |
| |
| private static void increment(Map<Integer, Long> columns, int fieldId, long amount) { |
| if (columns != null) { |
| if (columns.containsKey(fieldId)) { |
| columns.put(fieldId, columns.get(fieldId) + amount); |
| } else { |
| columns.put(fieldId, amount); |
| } |
| } |
| } |
| |
| @SuppressWarnings("unchecked") |
| private static <T> void updateMin(Map<Integer, Literal<?>> lowerBounds, int id, Type type, |
| Literal<T> min, MetricsMode metricsMode) { |
| Literal<T> currentMin = (Literal<T>) lowerBounds.get(id); |
| if (currentMin == null || min.comparator().compare(min.value(), currentMin.value()) < 0) { |
| if (metricsMode == MetricsModes.Full.get()) { |
| lowerBounds.put(id, min); |
| } else { |
| MetricsModes.Truncate truncateMode = (MetricsModes.Truncate) metricsMode; |
| int truncateLength = truncateMode.length(); |
| switch (type.typeId()) { |
| case STRING: |
| lowerBounds.put(id, UnicodeUtil.truncateStringMin((Literal<CharSequence>) min, truncateLength)); |
| break; |
| case FIXED: |
| case BINARY: |
| lowerBounds.put(id, BinaryUtil.truncateBinaryMin((Literal<ByteBuffer>) min, truncateLength)); |
| break; |
| default: |
| lowerBounds.put(id, min); |
| } |
| } |
| } |
| } |
| |
| @SuppressWarnings("unchecked") |
| private static <T> void updateMax(Map<Integer, Literal<?>> upperBounds, int id, Type type, |
| Literal<T> max, MetricsMode metricsMode) { |
| Literal<T> currentMax = (Literal<T>) upperBounds.get(id); |
| if (currentMax == null || max.comparator().compare(max.value(), currentMax.value()) > 0) { |
| if (metricsMode == MetricsModes.Full.get()) { |
| upperBounds.put(id, max); |
| } else { |
| MetricsModes.Truncate truncateMode = (MetricsModes.Truncate) metricsMode; |
| int truncateLength = truncateMode.length(); |
| switch (type.typeId()) { |
| case STRING: |
| upperBounds.put(id, UnicodeUtil.truncateStringMax((Literal<CharSequence>) max, truncateLength)); |
| break; |
| case FIXED: |
| case BINARY: |
| upperBounds.put(id, BinaryUtil.truncateBinaryMax((Literal<ByteBuffer>) max, truncateLength)); |
| break; |
| default: |
| upperBounds.put(id, max); |
| } |
| } |
| } |
| } |
| |
| private static Map<Integer, ByteBuffer> toBufferMap(Schema schema, Map<Integer, Literal<?>> map) { |
| Map<Integer, ByteBuffer> bufferMap = Maps.newHashMap(); |
| for (Map.Entry<Integer, Literal<?>> entry : map.entrySet()) { |
| bufferMap.put(entry.getKey(), |
| Conversions.toByteBuffer(schema.findType(entry.getKey()), entry.getValue().value())); |
| } |
| return bufferMap; |
| } |
| |
| @SuppressWarnings("deprecation") |
| public static boolean hasNonDictionaryPages(ColumnChunkMetaData meta) { |
| EncodingStats stats = meta.getEncodingStats(); |
| if (stats != null) { |
| return stats.hasNonDictionaryEncodedPages(); |
| } |
| |
| // without EncodingStats, fall back to testing the encoding list |
| Set<Encoding> encodings = new HashSet<>(meta.getEncodings()); |
| if (encodings.remove(Encoding.PLAIN_DICTIONARY)) { |
| // if remove returned true, PLAIN_DICTIONARY was present, which means at |
| // least one page was dictionary encoded and 1.0 encodings are used |
| |
| // RLE and BIT_PACKED are only used for repetition or definition levels |
| encodings.remove(Encoding.RLE); |
| encodings.remove(Encoding.BIT_PACKED); |
| |
| // when empty, no encodings other than dictionary or rep/def levels |
| return !encodings.isEmpty(); |
| } else { |
| // if PLAIN_DICTIONARY wasn't present, then either the column is not |
| // dictionary-encoded, or the 2.0 encoding, RLE_DICTIONARY, was used. |
| // for 2.0, this cannot determine whether a page fell back without |
| // page encoding stats |
| return true; |
| } |
| } |
| |
| public static Dictionary readDictionary(ColumnDescriptor desc, PageReader pageSource) { |
| DictionaryPage dictionaryPage = pageSource.readDictionaryPage(); |
| if (dictionaryPage != null) { |
| try { |
| return dictionaryPage.getEncoding().initDictionary(desc, dictionaryPage); |
| } catch (IOException e) { |
| throw new ParquetDecodingException("could not decode the dictionary for " + desc, e); |
| } |
| } |
| return null; |
| } |
| |
| public static boolean isIntType(PrimitiveType primitiveType) { |
| if (primitiveType.getOriginalType() != null) { |
| switch (primitiveType.getOriginalType()) { |
| case INT_8: |
| case INT_16: |
| case INT_32: |
| case DATE: |
| return true; |
| default: |
| return false; |
| } |
| } |
| return primitiveType.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT32; |
| } |
| } |