| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.iceberg; |
| |
| import static org.apache.iceberg.types.Types.NestedField.optional; |
| |
| import java.util.Comparator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.stream.Collectors; |
| import java.util.stream.Stream; |
| import org.apache.iceberg.relocated.com.google.common.base.Preconditions; |
| import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; |
| import org.apache.iceberg.relocated.com.google.common.collect.Lists; |
| import org.apache.iceberg.relocated.com.google.common.collect.Maps; |
| import org.apache.iceberg.relocated.com.google.common.collect.Sets; |
| import org.apache.iceberg.types.Conversions; |
| import org.apache.iceberg.types.Type; |
| import org.apache.iceberg.types.Types; |
| |
| public class MetricsUtil { |
| |
| private MetricsUtil() {} |
| |
| /** |
| * Construct mapping relationship between column id to NaN value counts from input metrics and |
| * metrics config. |
| */ |
| public static Map<Integer, Long> createNanValueCounts( |
| Stream<FieldMetrics<?>> fieldMetrics, MetricsConfig metricsConfig, Schema inputSchema) { |
| Preconditions.checkNotNull(metricsConfig, "metricsConfig is required"); |
| |
| if (fieldMetrics == null || inputSchema == null) { |
| return Maps.newHashMap(); |
| } |
| |
| return fieldMetrics |
| .filter( |
| metrics -> |
| metricsMode(inputSchema, metricsConfig, metrics.id()) != MetricsModes.None.get()) |
| .collect(Collectors.toMap(FieldMetrics::id, FieldMetrics::nanValueCount)); |
| } |
| |
| /** Extract MetricsMode for the given field id from metrics config. */ |
| public static MetricsModes.MetricsMode metricsMode( |
| Schema inputSchema, MetricsConfig metricsConfig, int fieldId) { |
| Preconditions.checkNotNull(inputSchema, "inputSchema is required"); |
| Preconditions.checkNotNull(metricsConfig, "metricsConfig is required"); |
| |
| String columnName = inputSchema.findColumnName(fieldId); |
| return metricsConfig.columnMode(columnName); |
| } |
| |
| public static final List<ReadableMetricColDefinition> READABLE_METRIC_COLS = |
| ImmutableList.of( |
| new ReadableMetricColDefinition( |
| "column_size", |
| "Total size on disk", |
| DataFile.COLUMN_SIZES, |
| field -> Types.LongType.get(), |
| (file, field) -> |
| file.columnSizes() == null ? null : file.columnSizes().get(field.fieldId())), |
| new ReadableMetricColDefinition( |
| "value_count", |
| "Total count, including null and NaN", |
| DataFile.VALUE_COUNTS, |
| field -> Types.LongType.get(), |
| (file, field) -> |
| file.valueCounts() == null ? null : file.valueCounts().get(field.fieldId())), |
| new ReadableMetricColDefinition( |
| "null_value_count", |
| "Null value count", |
| DataFile.NULL_VALUE_COUNTS, |
| field -> Types.LongType.get(), |
| (file, field) -> |
| file.nullValueCounts() == null |
| ? null |
| : file.nullValueCounts().get(field.fieldId())), |
| new ReadableMetricColDefinition( |
| "nan_value_count", |
| "NaN value count", |
| DataFile.NAN_VALUE_COUNTS, |
| field -> Types.LongType.get(), |
| (file, field) -> |
| file.nanValueCounts() == null |
| ? null |
| : file.nanValueCounts().get(field.fieldId())), |
| new ReadableMetricColDefinition( |
| "lower_bound", |
| "Lower bound", |
| DataFile.LOWER_BOUNDS, |
| Types.NestedField::type, |
| (file, field) -> |
| file.lowerBounds() == null |
| ? null |
| : Conversions.fromByteBuffer( |
| field.type(), file.lowerBounds().get(field.fieldId()))), |
| new ReadableMetricColDefinition( |
| "upper_bound", |
| "Upper bound", |
| DataFile.UPPER_BOUNDS, |
| Types.NestedField::type, |
| (file, field) -> |
| file.upperBounds() == null |
| ? null |
| : Conversions.fromByteBuffer( |
| field.type(), file.upperBounds().get(field.fieldId())))); |
| |
| public static final String READABLE_METRICS = "readable_metrics"; |
| |
| /** |
| * Fixed definition of a readable metric column, ie a mapping of a raw metric to a readable metric |
| */ |
| public static class ReadableMetricColDefinition { |
| private final String name; |
| private final String doc; |
| private final Types.NestedField originalCol; |
| private final TypeFunction typeFunction; |
| private final MetricFunction metricFunction; |
| |
| public interface TypeFunction { |
| Type type(Types.NestedField originalCol); |
| } |
| |
| public interface MetricFunction { |
| Object metric(ContentFile<?> file, Types.NestedField originalCol); |
| } |
| |
| /** |
| * @param name column name |
| * @param doc column doc |
| * @param originalCol original (raw) metric column field on metadata table |
| * @param typeFunction function that returns the readable metric column type from original field |
| * type |
| * @param metricFunction function that returns readable metric from data file |
| */ |
| ReadableMetricColDefinition( |
| String name, |
| String doc, |
| Types.NestedField originalCol, |
| TypeFunction typeFunction, |
| MetricFunction metricFunction) { |
| this.name = name; |
| this.doc = doc; |
| this.originalCol = originalCol; |
| this.typeFunction = typeFunction; |
| this.metricFunction = metricFunction; |
| } |
| |
| Types.NestedField originalCol() { |
| return originalCol; |
| } |
| |
| Type colType(Types.NestedField field) { |
| return typeFunction.type(field); |
| } |
| |
| String name() { |
| return name; |
| } |
| |
| String doc() { |
| return doc; |
| } |
| |
| Object value(ContentFile<?> dataFile, Types.NestedField dataField) { |
| return metricFunction.metric(dataFile, dataField); |
| } |
| } |
| |
| /** A struct of readable metric values for a primitive column */ |
| public static class ReadableColMetricsStruct implements StructLike { |
| |
| private final String columnName; |
| private final Map<Integer, Integer> projectionMap; |
| private final Object[] metrics; |
| |
| public ReadableColMetricsStruct( |
| String columnName, Types.NestedField projection, Object... metrics) { |
| this.columnName = columnName; |
| this.projectionMap = readableMetricsProjection(projection); |
| this.metrics = metrics; |
| } |
| |
| @Override |
| public int size() { |
| return projectionMap.size(); |
| } |
| |
| @Override |
| public <T> T get(int pos, Class<T> javaClass) { |
| Object value = get(pos); |
| return value == null ? null : javaClass.cast(value); |
| } |
| |
| @Override |
| public <T> void set(int pos, T value) { |
| throw new UnsupportedOperationException("ReadableColMetricsStruct is read only"); |
| } |
| |
| private Object get(int pos) { |
| int projectedPos = projectionMap.get(pos); |
| return metrics[projectedPos]; |
| } |
| |
| /** Returns map of projected position to actual position of this struct's fields */ |
| private Map<Integer, Integer> readableMetricsProjection(Types.NestedField projection) { |
| Map<Integer, Integer> result = Maps.newHashMap(); |
| |
| Set<String> projectedFields = |
| Sets.newHashSet( |
| projection.type().asStructType().fields().stream() |
| .map(Types.NestedField::name) |
| .collect(Collectors.toSet())); |
| |
| int projectedIndex = 0; |
| for (int fieldIndex = 0; fieldIndex < READABLE_METRIC_COLS.size(); fieldIndex++) { |
| ReadableMetricColDefinition readableMetric = READABLE_METRIC_COLS.get(fieldIndex); |
| |
| if (projectedFields.contains(readableMetric.name())) { |
| result.put(projectedIndex, fieldIndex); |
| projectedIndex++; |
| } |
| } |
| return result; |
| } |
| |
| String columnName() { |
| return columnName; |
| } |
| } |
| |
| /** |
| * A struct, consisting of all {@link ReadableColMetricsStruct} for all primitive columns of the |
| * table |
| */ |
| public static class ReadableMetricsStruct implements StructLike { |
| |
| private final List<StructLike> columnMetrics; |
| |
| public ReadableMetricsStruct(List<StructLike> columnMetrics) { |
| this.columnMetrics = columnMetrics; |
| } |
| |
| @Override |
| public int size() { |
| return columnMetrics.size(); |
| } |
| |
| @Override |
| public <T> T get(int pos, Class<T> javaClass) { |
| return javaClass.cast(columnMetrics.get(pos)); |
| } |
| |
| @Override |
| public <T> void set(int pos, T value) { |
| throw new UnsupportedOperationException("ReadableMetricsStruct is read only"); |
| } |
| } |
| |
| /** |
| * Calculates a dynamic schema for readable_metrics to add to metadata tables. The type will be |
| * the struct {@link ReadableColMetricsStruct}, composed of {@link ReadableMetricsStruct} for all |
| * primitive columns in the data table |
| * |
| * @param dataTableSchema schema of data table |
| * @param metadataTableSchema schema of existing metadata table (to ensure id uniqueness) |
| * @return schema of readable_metrics struct |
| */ |
| public static Schema readableMetricsSchema(Schema dataTableSchema, Schema metadataTableSchema) { |
| List<Types.NestedField> fields = Lists.newArrayList(); |
| Map<Integer, String> idToName = dataTableSchema.idToName(); |
| AtomicInteger nextId = new AtomicInteger(metadataTableSchema.highestFieldId()); |
| |
| for (int id : idToName.keySet()) { |
| Types.NestedField field = dataTableSchema.findField(id); |
| |
| if (field.type().isPrimitiveType()) { |
| String colName = idToName.get(id); |
| |
| fields.add( |
| Types.NestedField.of( |
| nextId.incrementAndGet(), |
| true, |
| colName, |
| Types.StructType.of( |
| READABLE_METRIC_COLS.stream() |
| .map( |
| m -> |
| optional( |
| nextId.incrementAndGet(), m.name(), m.colType(field), m.doc())) |
| .collect(Collectors.toList())), |
| String.format("Metrics for column %s", colName))); |
| } |
| } |
| |
| fields.sort(Comparator.comparing(Types.NestedField::name)); |
| return new Schema( |
| optional( |
| nextId.incrementAndGet(), |
| "readable_metrics", |
| Types.StructType.of(fields), |
| "Column metrics in readable form")); |
| } |
| |
| /** |
| * Return a readable metrics struct row from file metadata |
| * |
| * @param schema schema of original data table |
| * @param file content file with metrics |
| * @param projectedSchema user requested projection |
| * @return {@link ReadableMetricsStruct} |
| */ |
| public static ReadableMetricsStruct readableMetricsStruct( |
| Schema schema, ContentFile<?> file, Types.StructType projectedSchema) { |
| Map<Integer, String> idToName = schema.idToName(); |
| List<ReadableColMetricsStruct> colMetrics = Lists.newArrayList(); |
| |
| for (int id : idToName.keySet()) { |
| String qualifiedName = idToName.get(id); |
| Types.NestedField field = schema.findField(id); |
| |
| Object[] metrics = |
| READABLE_METRIC_COLS.stream() |
| .map(readableMetric -> readableMetric.value(file, field)) |
| .toArray(); |
| |
| if (field.type().isPrimitiveType() // Iceberg stores metrics only for primitive types |
| && projectedSchema.field(qualifiedName) |
| != null) { // User has requested this column metric |
| colMetrics.add( |
| new ReadableColMetricsStruct( |
| qualifiedName, projectedSchema.field(qualifiedName), metrics)); |
| } |
| } |
| |
| colMetrics.sort(Comparator.comparing(ReadableColMetricsStruct::columnName)); |
| return new ReadableMetricsStruct( |
| colMetrics.stream().map(m -> (StructLike) m).collect(Collectors.toList())); |
| } |
| } |