| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.iceberg; |
| |
| import java.io.IOException; |
| import java.math.BigDecimal; |
| import java.nio.ByteBuffer; |
| import java.nio.CharBuffer; |
| import java.nio.charset.StandardCharsets; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import org.apache.iceberg.data.GenericRecord; |
| import org.apache.iceberg.data.Record; |
| import org.apache.iceberg.io.InputFile; |
| import org.apache.iceberg.io.OutputFile; |
| import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; |
| import org.apache.iceberg.relocated.com.google.common.collect.Lists; |
| import org.apache.iceberg.relocated.com.google.common.collect.Maps; |
| import org.apache.iceberg.types.Type; |
| import org.apache.iceberg.types.Types; |
| import org.apache.iceberg.types.Types.BinaryType; |
| import org.apache.iceberg.types.Types.BooleanType; |
| import org.apache.iceberg.types.Types.DateType; |
| import org.apache.iceberg.types.Types.DecimalType; |
| import org.apache.iceberg.types.Types.DoubleType; |
| import org.apache.iceberg.types.Types.FixedType; |
| import org.apache.iceberg.types.Types.FloatType; |
| import org.apache.iceberg.types.Types.IntegerType; |
| import org.apache.iceberg.types.Types.ListType; |
| import org.apache.iceberg.types.Types.LongType; |
| import org.apache.iceberg.types.Types.MapType; |
| import org.apache.iceberg.types.Types.StringType; |
| import org.apache.iceberg.types.Types.StructType; |
| import org.apache.iceberg.types.Types.TimeType; |
| import org.apache.iceberg.types.Types.TimestampType; |
| import org.apache.iceberg.util.DateTimeUtil; |
| import org.junit.Assert; |
| import org.junit.Assume; |
| import org.junit.Test; |
| |
| import static org.apache.iceberg.types.Conversions.fromByteBuffer; |
| import static org.apache.iceberg.types.Types.NestedField.optional; |
| import static org.apache.iceberg.types.Types.NestedField.required; |
| |
| /** |
| * Tests for Metrics. |
| */ |
| public abstract class TestMetrics { |
| |
| private static final StructType LEAF_STRUCT_TYPE = StructType.of( |
| optional(5, "leafLongCol", LongType.get()), |
| optional(6, "leafBinaryCol", BinaryType.get()) |
| ); |
| |
| private static final StructType NESTED_STRUCT_TYPE = StructType.of( |
| required(3, "longCol", LongType.get()), |
| required(4, "leafStructCol", LEAF_STRUCT_TYPE), |
| required(7, "doubleCol", DoubleType.get()) |
| ); |
| |
| private static final Schema NESTED_SCHEMA = new Schema( |
| required(1, "intCol", IntegerType.get()), |
| required(2, "nestedStructCol", NESTED_STRUCT_TYPE) |
| ); |
| |
| private static final Schema SIMPLE_SCHEMA = new Schema( |
| optional(1, "booleanCol", BooleanType.get()), |
| required(2, "intCol", IntegerType.get()), |
| optional(3, "longCol", LongType.get()), |
| required(4, "floatCol", FloatType.get()), |
| optional(5, "doubleCol", DoubleType.get()), |
| optional(6, "decimalCol", DecimalType.of(10, 2)), |
| required(7, "stringCol", StringType.get()), |
| optional(8, "dateCol", DateType.get()), |
| required(9, "timeCol", TimeType.get()), |
| required(10, "timestampColAboveEpoch", TimestampType.withoutZone()), |
| required(11, "fixedCol", FixedType.ofLength(4)), |
| required(12, "binaryCol", BinaryType.get()), |
| required(13, "timestampColBelowEpoch", TimestampType.withoutZone()) |
| ); |
| |
| private static final Schema FLOAT_DOUBLE_ONLY_SCHEMA = new Schema( |
| optional(1, "floatCol", FloatType.get()), |
| optional(2, "doubleCol", DoubleType.get()) |
| ); |
| |
| private static final Record FLOAT_DOUBLE_RECORD_1 = createRecordWithFloatAndDouble(1.2F, 3.4D); |
| private static final Record FLOAT_DOUBLE_RECORD_2 = createRecordWithFloatAndDouble(5.6F, 7.8D); |
| private static final Record NAN_ONLY_RECORD = createRecordWithFloatAndDouble(Float.NaN, Double.NaN); |
| |
| private final byte[] fixed = "abcd".getBytes(StandardCharsets.UTF_8); |
| |
| private static Record createRecordWithFloatAndDouble(float floatValue, double doubleValue) { |
| Record record = GenericRecord.create(FLOAT_DOUBLE_ONLY_SCHEMA); |
| record.setField("floatCol", floatValue); |
| record.setField("doubleCol", doubleValue); |
| return record; |
| } |
| |
| public abstract FileFormat fileFormat(); |
| |
| public abstract Metrics getMetrics(Schema schema, MetricsConfig metricsConfig, Record... records) throws IOException; |
| |
| public abstract Metrics getMetrics(Schema schema, Record... records) throws IOException; |
| |
| protected abstract Metrics getMetricsForRecordsWithSmallRowGroups(Schema schema, OutputFile outputFile, |
| Record... records) throws IOException; |
| |
| public abstract int splitCount(InputFile inputFile) throws IOException; |
| |
| public boolean supportsSmallRowGroups() { |
| return false; |
| } |
| |
| protected abstract OutputFile createOutputFile() throws IOException; |
| |
| @Test |
| public void testMetricsForRepeatedValues() throws IOException { |
| Record record = GenericRecord.create(SIMPLE_SCHEMA); |
| record.setField("booleanCol", true); |
| record.setField("intCol", 3); |
| record.setField("longCol", null); |
| record.setField("floatCol", Float.NaN); |
| record.setField("doubleCol", 2.0D); |
| record.setField("decimalCol", new BigDecimal("3.50")); |
| record.setField("stringCol", "AAA"); |
| record.setField("dateCol", DateTimeUtil.dateFromDays(1500)); |
| record.setField("timeCol", DateTimeUtil.timeFromMicros(2000L)); |
| record.setField("timestampColAboveEpoch", DateTimeUtil.timestampFromMicros(0L)); |
| record.setField("fixedCol", fixed); |
| record.setField("binaryCol", ByteBuffer.wrap("S".getBytes())); |
| record.setField("timestampColBelowEpoch", DateTimeUtil.timestampFromMicros(0L)); |
| |
| Metrics metrics = getMetrics(SIMPLE_SCHEMA, record, record); |
| Assert.assertEquals(2L, (long) metrics.recordCount()); |
| assertCounts(1, 2L, 0L, metrics); |
| assertCounts(2, 2L, 0L, metrics); |
| assertCounts(3, 2L, 2L, metrics); |
| assertCounts(4, 2L, 0L, 2L, metrics); |
| assertCounts(5, 2L, 0L, 0L, metrics); |
| assertCounts(6, 2L, 0L, metrics); |
| assertCounts(7, 2L, 0L, metrics); |
| assertCounts(8, 2L, 0L, metrics); |
| assertCounts(9, 2L, 0L, metrics); |
| assertCounts(10, 2L, 0L, metrics); |
| assertCounts(11, 2L, 0L, metrics); |
| assertCounts(12, 2L, 0L, metrics); |
| assertCounts(13, 2L, 0L, metrics); |
| } |
| |
| @Test |
| public void testMetricsForTopLevelFields() throws IOException { |
| Record firstRecord = GenericRecord.create(SIMPLE_SCHEMA); |
| firstRecord.setField("booleanCol", true); |
| firstRecord.setField("intCol", 3); |
| firstRecord.setField("longCol", 5L); |
| firstRecord.setField("floatCol", 2.0F); |
| firstRecord.setField("doubleCol", 2.0D); |
| firstRecord.setField("decimalCol", new BigDecimal("3.50")); |
| firstRecord.setField("stringCol", "AAA"); |
| firstRecord.setField("dateCol", DateTimeUtil.dateFromDays(1500)); |
| firstRecord.setField("timeCol", DateTimeUtil.timeFromMicros(2000L)); |
| firstRecord.setField("timestampColAboveEpoch", DateTimeUtil.timestampFromMicros(0L)); |
| firstRecord.setField("fixedCol", fixed); |
| firstRecord.setField("binaryCol", ByteBuffer.wrap("S".getBytes())); |
| firstRecord.setField("timestampColBelowEpoch", DateTimeUtil.timestampFromMicros(-1_900_300L)); |
| Record secondRecord = GenericRecord.create(SIMPLE_SCHEMA); |
| secondRecord.setField("booleanCol", false); |
| secondRecord.setField("intCol", Integer.MIN_VALUE); |
| secondRecord.setField("longCol", null); |
| secondRecord.setField("floatCol", 1.0F); |
| secondRecord.setField("doubleCol", null); |
| secondRecord.setField("decimalCol", null); |
| secondRecord.setField("stringCol", "ZZZ"); |
| secondRecord.setField("dateCol", null); |
| secondRecord.setField("timeCol", DateTimeUtil.timeFromMicros(3000L)); |
| secondRecord.setField("timestampColAboveEpoch", DateTimeUtil.timestampFromMicros(900L)); |
| secondRecord.setField("fixedCol", fixed); |
| secondRecord.setField("binaryCol", ByteBuffer.wrap("W".getBytes())); |
| secondRecord.setField("timestampColBelowEpoch", DateTimeUtil.timestampFromMicros(-7_000L)); |
| |
| Metrics metrics = getMetrics(SIMPLE_SCHEMA, firstRecord, secondRecord); |
| Assert.assertEquals(2L, (long) metrics.recordCount()); |
| assertCounts(1, 2L, 0L, metrics); |
| assertBounds(1, BooleanType.get(), false, true, metrics); |
| assertCounts(2, 2L, 0L, metrics); |
| assertBounds(2, IntegerType.get(), Integer.MIN_VALUE, 3, metrics); |
| assertCounts(3, 2L, 1L, metrics); |
| assertBounds(3, LongType.get(), 5L, 5L, metrics); |
| assertCounts(4, 2L, 0L, 0L, metrics); |
| assertBounds(4, FloatType.get(), 1.0F, 2.0F, metrics); |
| assertCounts(5, 2L, 1L, 0L, metrics); |
| assertBounds(5, DoubleType.get(), 2.0D, 2.0D, metrics); |
| assertCounts(6, 2L, 1L, metrics); |
| assertBounds(6, DecimalType.of(10, 2), new BigDecimal("3.50"), new BigDecimal("3.50"), metrics); |
| assertCounts(7, 2L, 0L, metrics); |
| assertBounds(7, StringType.get(), CharBuffer.wrap("AAA"), CharBuffer.wrap("ZZZ"), metrics); |
| assertCounts(8, 2L, 1L, metrics); |
| assertBounds(8, DateType.get(), 1500, 1500, metrics); |
| assertCounts(9, 2L, 0L, metrics); |
| assertBounds(9, TimeType.get(), 2000L, 3000L, metrics); |
| assertCounts(10, 2L, 0L, metrics); |
| assertBounds(10, TimestampType.withoutZone(), 0L, 900L, metrics); |
| assertCounts(11, 2L, 0L, metrics); |
| assertBounds(11, FixedType.ofLength(4), |
| ByteBuffer.wrap(fixed), ByteBuffer.wrap(fixed), metrics); |
| assertCounts(12, 2L, 0L, metrics); |
| assertBounds(12, BinaryType.get(), |
| ByteBuffer.wrap("S".getBytes()), ByteBuffer.wrap("W".getBytes()), metrics); |
| if (fileFormat() == FileFormat.ORC) { |
| // TODO: The special condition for ORC can be removed when ORC-342 is fixed |
| // ORC-342: ORC writer creates inaccurate timestamp data and stats 1 sec below epoch |
| // Values in the range `[1969-12-31 23:59:59.000,1969-12-31 23:59:59.999]` will have 1 sec added to them |
| // So the upper bound value of -7_000 micros becomes 993_000 micros |
| assertBounds(13, TimestampType.withoutZone(), -1_900_300L, 993_000L, metrics); |
| } else { |
| assertBounds(13, TimestampType.withoutZone(), -1_900_300L, -7_000L, metrics); |
| } |
| } |
| |
| @Test |
| public void testMetricsForDecimals() throws IOException { |
| Schema schema = new Schema( |
| required(1, "decimalAsInt32", DecimalType.of(4, 2)), |
| required(2, "decimalAsInt64", DecimalType.of(14, 2)), |
| required(3, "decimalAsFixed", DecimalType.of(22, 2)) |
| ); |
| |
| Record record = GenericRecord.create(schema); |
| record.setField("decimalAsInt32", new BigDecimal("2.55")); |
| record.setField("decimalAsInt64", new BigDecimal("4.75")); |
| record.setField("decimalAsFixed", new BigDecimal("5.80")); |
| |
| Metrics metrics = getMetrics(schema, record); |
| Assert.assertEquals(1L, (long) metrics.recordCount()); |
| assertCounts(1, 1L, 0L, metrics); |
| assertBounds(1, DecimalType.of(4, 2), new BigDecimal("2.55"), new BigDecimal("2.55"), metrics); |
| assertCounts(2, 1L, 0L, metrics); |
| assertBounds(2, DecimalType.of(14, 2), new BigDecimal("4.75"), new BigDecimal("4.75"), metrics); |
| assertCounts(3, 1L, 0L, metrics); |
| assertBounds(3, DecimalType.of(22, 2), new BigDecimal("5.80"), new BigDecimal("5.80"), metrics); |
| } |
| |
| @Test |
| public void testMetricsForNestedStructFields() throws IOException { |
| Metrics metrics = getMetrics(NESTED_SCHEMA, buildNestedTestRecord()); |
| Assert.assertEquals(1L, (long) metrics.recordCount()); |
| assertCounts(1, 1L, 0L, metrics); |
| assertBounds(1, IntegerType.get(), Integer.MAX_VALUE, Integer.MAX_VALUE, metrics); |
| assertCounts(3, 1L, 0L, metrics); |
| assertBounds(3, LongType.get(), 100L, 100L, metrics); |
| assertCounts(5, 1L, 0L, metrics); |
| assertBounds(5, LongType.get(), 20L, 20L, metrics); |
| assertCounts(6, 1L, 0L, metrics); |
| assertBounds(6, BinaryType.get(), |
| ByteBuffer.wrap("A".getBytes()), ByteBuffer.wrap("A".getBytes()), metrics); |
| assertCounts(7, 1L, 0L, 1L, metrics); |
| assertBounds(7, DoubleType.get(), Double.NaN, Double.NaN, metrics); |
| } |
| |
| private Record buildNestedTestRecord() { |
| Record leafStruct = GenericRecord.create(LEAF_STRUCT_TYPE); |
| leafStruct.setField("leafLongCol", 20L); |
| leafStruct.setField("leafBinaryCol", ByteBuffer.wrap("A".getBytes())); |
| Record nestedStruct = GenericRecord.create(NESTED_STRUCT_TYPE); |
| nestedStruct.setField("longCol", 100L); |
| nestedStruct.setField("leafStructCol", leafStruct); |
| nestedStruct.setField("doubleCol", Double.NaN); |
| Record record = GenericRecord.create(NESTED_SCHEMA); |
| record.setField("intCol", Integer.MAX_VALUE); |
| record.setField("nestedStructCol", nestedStruct); |
| |
| return record; |
| } |
| |
| @Test |
| public void testMetricsForListAndMapElements() throws IOException { |
| StructType structType = StructType.of( |
| required(1, "leafIntCol", IntegerType.get()), |
| optional(2, "leafStringCol", StringType.get()) |
| ); |
| Schema schema = new Schema( |
| optional(3, "intListCol", ListType.ofRequired(4, IntegerType.get())), |
| optional(5, "mapCol", MapType.ofRequired(6, 7, StringType.get(), structType)) |
| ); |
| |
| Record record = GenericRecord.create(schema); |
| record.setField("intListCol", Lists.newArrayList(10, 11, 12)); |
| Record struct = GenericRecord.create(structType); |
| struct.setField("leafIntCol", 1); |
| struct.setField("leafStringCol", "BBB"); |
| Map<String, Record> map = Maps.newHashMap(); |
| map.put("4", struct); |
| record.set(1, map); |
| |
| Metrics metrics = getMetrics(schema, record); |
| Assert.assertEquals(1L, (long) metrics.recordCount()); |
| if (fileFormat() != FileFormat.ORC) { |
| assertCounts(1, 1L, 0L, metrics); |
| assertCounts(2, 1L, 0L, metrics); |
| assertCounts(4, 3L, 0L, metrics); |
| assertCounts(6, 1L, 0L, metrics); |
| } else { |
| assertCounts(1, null, null, metrics); |
| assertCounts(2, null, null, metrics); |
| assertCounts(4, null, null, metrics); |
| assertCounts(6, null, null, metrics); |
| } |
| assertBounds(1, IntegerType.get(), null, null, metrics); |
| assertBounds(2, StringType.get(), null, null, metrics); |
| assertBounds(4, IntegerType.get(), null, null, metrics); |
| assertBounds(6, StringType.get(), null, null, metrics); |
| assertBounds(7, structType, null, null, metrics); |
| } |
| |
| @Test |
| public void testMetricsForNullColumns() throws IOException { |
| Schema schema = new Schema( |
| optional(1, "intCol", IntegerType.get()) |
| ); |
| Record firstRecord = GenericRecord.create(schema); |
| firstRecord.setField("intCol", null); |
| Record secondRecord = GenericRecord.create(schema); |
| secondRecord.setField("intCol", null); |
| |
| Metrics metrics = getMetrics(schema, firstRecord, secondRecord); |
| Assert.assertEquals(2L, (long) metrics.recordCount()); |
| assertCounts(1, 2L, 2L, metrics); |
| assertBounds(1, IntegerType.get(), null, null, metrics); |
| } |
| |
| @Test |
| public void testMetricsForNaNColumns() throws IOException { |
| Metrics metrics = getMetrics(FLOAT_DOUBLE_ONLY_SCHEMA, NAN_ONLY_RECORD, NAN_ONLY_RECORD); |
| Assert.assertEquals(2L, (long) metrics.recordCount()); |
| assertCounts(1, 2L, 0L, 2L, metrics); |
| assertCounts(2, 2L, 0L, 2L, metrics); |
| // below: current behavior; will be null once NaN is excluded from upper/lower bound |
| assertBounds(1, FloatType.get(), Float.NaN, Float.NaN, metrics); |
| assertBounds(2, DoubleType.get(), Double.NaN, Double.NaN, metrics); |
| } |
| |
| @Test |
| public void testColumnBoundsWithNaNValueAtFront() throws IOException { |
| Metrics metrics = getMetrics(FLOAT_DOUBLE_ONLY_SCHEMA, |
| NAN_ONLY_RECORD, FLOAT_DOUBLE_RECORD_1, FLOAT_DOUBLE_RECORD_2); |
| Assert.assertEquals(3L, (long) metrics.recordCount()); |
| assertCounts(1, 3L, 0L, 1L, metrics); |
| assertCounts(2, 3L, 0L, 1L, metrics); |
| |
| // below: current behavior; will be non-NaN values once NaN is excluded from upper/lower bound. ORC and Parquet's |
| // behaviors differ due to their implementation of comparison being different. |
| if (fileFormat() == FileFormat.ORC) { |
| assertBounds(1, FloatType.get(), Float.NaN, Float.NaN, metrics); |
| assertBounds(2, DoubleType.get(), Double.NaN, Double.NaN, metrics); |
| } else { |
| assertBounds(1, FloatType.get(), 1.2F, Float.NaN, metrics); |
| assertBounds(2, DoubleType.get(), 3.4D, Double.NaN, metrics); |
| } |
| } |
| |
| @Test |
| public void testColumnBoundsWithNaNValueInMiddle() throws IOException { |
| Metrics metrics = getMetrics(FLOAT_DOUBLE_ONLY_SCHEMA, |
| FLOAT_DOUBLE_RECORD_1, NAN_ONLY_RECORD, FLOAT_DOUBLE_RECORD_2); |
| Assert.assertEquals(3L, (long) metrics.recordCount()); |
| assertCounts(1, 3L, 0L, 1L, metrics); |
| assertCounts(2, 3L, 0L, 1L, metrics); |
| |
| // below: current behavior; will be non-NaN values once NaN is excluded from upper/lower bound. ORC and Parquet's |
| // behaviors differ due to their implementation of comparison being different. |
| if (fileFormat() == FileFormat.ORC) { |
| assertBounds(1, FloatType.get(), 1.2F, 5.6F, metrics); |
| assertBounds(2, DoubleType.get(), 3.4D, 7.8D, metrics); |
| } else { |
| assertBounds(1, FloatType.get(), 1.2F, Float.NaN, metrics); |
| assertBounds(2, DoubleType.get(), 3.4D, Double.NaN, metrics); |
| } |
| } |
| |
| @Test |
| public void testColumnBoundsWithNaNValueAtEnd() throws IOException { |
| Metrics metrics = getMetrics(FLOAT_DOUBLE_ONLY_SCHEMA, |
| FLOAT_DOUBLE_RECORD_1, FLOAT_DOUBLE_RECORD_2, NAN_ONLY_RECORD); |
| Assert.assertEquals(3L, (long) metrics.recordCount()); |
| assertCounts(1, 3L, 0L, 1L, metrics); |
| assertCounts(2, 3L, 0L, 1L, metrics); |
| |
| // below: current behavior; will be non-NaN values once NaN is excluded from upper/lower bound. ORC and Parquet's |
| // behaviors differ due to their implementation of comparison being different. |
| if (fileFormat() == FileFormat.ORC) { |
| assertBounds(1, FloatType.get(), 1.2F, 5.6F, metrics); |
| assertBounds(2, DoubleType.get(), 3.4D, 7.8D, metrics); |
| } else { |
| assertBounds(1, FloatType.get(), 1.2F, Float.NaN, metrics); |
| assertBounds(2, DoubleType.get(), 3.4D, Double.NaN, metrics); |
| } |
| } |
| |
| @Test |
| public void testMetricsForTopLevelWithMultipleRowGroup() throws Exception { |
| Assume.assumeTrue("Skip test for formats that do not support small row groups", supportsSmallRowGroups()); |
| |
| int recordCount = 201; |
| List<Record> records = new ArrayList<>(recordCount); |
| |
| for (int i = 0; i < recordCount; i++) { |
| Record newRecord = GenericRecord.create(SIMPLE_SCHEMA); |
| newRecord.setField("booleanCol", i == 0 ? false : true); |
| newRecord.setField("intCol", i + 1); |
| newRecord.setField("longCol", i == 0 ? null : i + 1L); |
| newRecord.setField("floatCol", i + 1.0F); |
| newRecord.setField("doubleCol", i == 0 ? null : i + 1.0D); |
| newRecord.setField("decimalCol", i == 0 ? null : new BigDecimal(i + "").add(new BigDecimal("1.00"))); |
| newRecord.setField("stringCol", "AAA"); |
| newRecord.setField("dateCol", DateTimeUtil.dateFromDays(i + 1)); |
| newRecord.setField("timeCol", DateTimeUtil.timeFromMicros(i + 1L)); |
| newRecord.setField("timestampColAboveEpoch", DateTimeUtil.timestampFromMicros(i + 1L)); |
| newRecord.setField("fixedCol", fixed); |
| newRecord.setField("binaryCol", ByteBuffer.wrap("S".getBytes())); |
| newRecord.setField("timestampColBelowEpoch", DateTimeUtil.timestampFromMicros((i + 1L) * -1L)); |
| records.add(newRecord); |
| } |
| |
| // create file with multiple row groups. by using smaller number of bytes |
| OutputFile outputFile = createOutputFile(); |
| Metrics metrics = getMetricsForRecordsWithSmallRowGroups(SIMPLE_SCHEMA, outputFile, records.toArray(new Record[0])); |
| InputFile recordsFile = outputFile.toInputFile(); |
| |
| Assert.assertNotNull(recordsFile); |
| // rowgroup size should be > 1 |
| Assert.assertEquals(3, splitCount(recordsFile)); |
| |
| Assert.assertEquals(201L, (long) metrics.recordCount()); |
| assertCounts(1, 201L, 0L, metrics); |
| assertBounds(1, Types.BooleanType.get(), false, true, metrics); |
| assertBounds(2, Types.IntegerType.get(), 1, 201, metrics); |
| assertCounts(3, 201L, 1L, metrics); |
| assertBounds(3, Types.LongType.get(), 2L, 201L, metrics); |
| assertCounts(4, 201L, 0L, 0L, metrics); |
| assertBounds(4, Types.FloatType.get(), 1.0F, 201.0F, metrics); |
| assertCounts(5, 201L, 1L, 0L, metrics); |
| assertBounds(5, Types.DoubleType.get(), 2.0D, 201.0D, metrics); |
| assertCounts(6, 201L, 1L, metrics); |
| assertBounds(6, Types.DecimalType.of(10, 2), new BigDecimal("2.00"), |
| new BigDecimal("201.00"), metrics); |
| } |
| |
| @Test |
| public void testMetricsForNestedStructFieldsWithMultipleRowGroup() throws IOException { |
| Assume.assumeTrue("Skip test for formats that do not support small row groups", supportsSmallRowGroups()); |
| |
| int recordCount = 201; |
| List<Record> records = Lists.newArrayListWithExpectedSize(recordCount); |
| |
| for (int i = 0; i < recordCount; i++) { |
| Record newLeafStruct = GenericRecord.create(LEAF_STRUCT_TYPE); |
| newLeafStruct.setField("leafLongCol", i + 1L); |
| newLeafStruct.setField("leafBinaryCol", ByteBuffer.wrap("A".getBytes())); |
| Record newNestedStruct = GenericRecord.create(NESTED_STRUCT_TYPE); |
| newNestedStruct.setField("longCol", i + 1L); |
| newNestedStruct.setField("leafStructCol", newLeafStruct); |
| newNestedStruct.setField("doubleCol", Double.NaN); |
| Record newRecord = GenericRecord.create(NESTED_SCHEMA); |
| newRecord.setField("intCol", i + 1); |
| newRecord.setField("nestedStructCol", newNestedStruct); |
| records.add(newRecord); |
| } |
| |
| // create file with multiple row groups. by using smaller number of bytes |
| OutputFile outputFile = createOutputFile(); |
| Metrics metrics = getMetricsForRecordsWithSmallRowGroups(NESTED_SCHEMA, outputFile, records.toArray(new Record[0])); |
| InputFile recordsFile = outputFile.toInputFile(); |
| |
| Assert.assertNotNull(recordsFile); |
| // rowgroup size should be > 1 |
| Assert.assertEquals(3, splitCount(recordsFile)); |
| |
| Assert.assertEquals(201L, (long) metrics.recordCount()); |
| assertCounts(1, 201L, 0L, metrics); |
| assertBounds(1, IntegerType.get(), 1, 201, metrics); |
| assertCounts(3, 201L, 0L, metrics); |
| assertBounds(3, LongType.get(), 1L, 201L, metrics); |
| assertCounts(5, 201L, 0L, metrics); |
| assertBounds(5, LongType.get(), 1L, 201L, metrics); |
| assertCounts(6, 201L, 0L, metrics); |
| assertBounds(6, BinaryType.get(), |
| ByteBuffer.wrap("A".getBytes()), ByteBuffer.wrap("A".getBytes()), metrics); |
| assertCounts(7, 201L, 0L, 201L, metrics); |
| assertBounds(7, DoubleType.get(), Double.NaN, Double.NaN, metrics); |
| } |
| |
| @Test |
| public void testNoneMetricsMode() throws IOException { |
| Metrics metrics = getMetrics( |
| NESTED_SCHEMA, |
| MetricsConfig.fromProperties(ImmutableMap.of("write.metadata.metrics.default", "none")), |
| buildNestedTestRecord()); |
| Assert.assertEquals(1L, (long) metrics.recordCount()); |
| Assert.assertTrue(metrics.columnSizes().values().stream().allMatch(Objects::nonNull)); |
| assertCounts(1, null, null, metrics); |
| assertBounds(1, Types.IntegerType.get(), null, null, metrics); |
| assertCounts(3, null, null, metrics); |
| assertBounds(3, Types.LongType.get(), null, null, metrics); |
| assertCounts(5, null, null, metrics); |
| assertBounds(5, Types.LongType.get(), null, null, metrics); |
| assertCounts(6, null, null, metrics); |
| assertBounds(6, Types.BinaryType.get(), null, null, metrics); |
| assertCounts(7, null, null, metrics); |
| assertBounds(7, Types.DoubleType.get(), null, null, metrics); |
| } |
| |
| @Test |
| public void testCountsMetricsMode() throws IOException { |
| Metrics metrics = getMetrics( |
| NESTED_SCHEMA, |
| MetricsConfig.fromProperties(ImmutableMap.of("write.metadata.metrics.default", "counts")), |
| buildNestedTestRecord()); |
| Assert.assertEquals(1L, (long) metrics.recordCount()); |
| Assert.assertTrue(metrics.columnSizes().values().stream().allMatch(Objects::nonNull)); |
| assertCounts(1, 1L, 0L, metrics); |
| assertBounds(1, Types.IntegerType.get(), null, null, metrics); |
| assertCounts(3, 1L, 0L, metrics); |
| assertBounds(3, Types.LongType.get(), null, null, metrics); |
| assertCounts(5, 1L, 0L, metrics); |
| assertBounds(5, Types.LongType.get(), null, null, metrics); |
| assertCounts(6, 1L, 0L, metrics); |
| assertBounds(6, Types.BinaryType.get(), null, null, metrics); |
| assertCounts(7, 1L, 0L, 1L, metrics); |
| assertBounds(7, Types.DoubleType.get(), null, null, metrics); |
| } |
| |
| @Test |
| public void testFullMetricsMode() throws IOException { |
| Metrics metrics = getMetrics( |
| NESTED_SCHEMA, |
| MetricsConfig.fromProperties(ImmutableMap.of("write.metadata.metrics.default", "full")), |
| buildNestedTestRecord()); |
| Assert.assertEquals(1L, (long) metrics.recordCount()); |
| Assert.assertTrue(metrics.columnSizes().values().stream().allMatch(Objects::nonNull)); |
| assertCounts(1, 1L, 0L, metrics); |
| assertBounds(1, Types.IntegerType.get(), Integer.MAX_VALUE, Integer.MAX_VALUE, metrics); |
| assertCounts(3, 1L, 0L, metrics); |
| assertBounds(3, Types.LongType.get(), 100L, 100L, metrics); |
| assertCounts(5, 1L, 0L, metrics); |
| assertBounds(5, Types.LongType.get(), 20L, 20L, metrics); |
| assertCounts(6, 1L, 0L, metrics); |
| assertBounds(6, Types.BinaryType.get(), |
| ByteBuffer.wrap("A".getBytes()), ByteBuffer.wrap("A".getBytes()), metrics); |
| assertCounts(7, 1L, 0L, 1L, metrics); |
| assertBounds(7, Types.DoubleType.get(), Double.NaN, Double.NaN, metrics); |
| } |
| |
| @Test |
| public void testTruncateStringMetricsMode() throws IOException { |
| String colName = "str_to_truncate"; |
| Schema singleStringColSchema = new Schema( |
| required(1, colName, Types.StringType.get()) |
| ); |
| |
| String value = "Lorem ipsum dolor sit amet"; |
| Record record = GenericRecord.create(singleStringColSchema); |
| record.setField(colName, value); |
| |
| Metrics metrics = getMetrics( |
| singleStringColSchema, |
| MetricsConfig.fromProperties(ImmutableMap.of("write.metadata.metrics.default", "truncate(10)")), |
| record); |
| |
| CharBuffer expectedMinBound = CharBuffer.wrap("Lorem ipsu"); |
| CharBuffer expectedMaxBound = CharBuffer.wrap("Lorem ipsv"); |
| Assert.assertEquals(1L, (long) metrics.recordCount()); |
| Assert.assertTrue(metrics.columnSizes().values().stream().allMatch(Objects::nonNull)); |
| assertCounts(1, 1L, 0L, metrics); |
| assertBounds(1, Types.StringType.get(), expectedMinBound, expectedMaxBound, metrics); |
| } |
| |
| @Test |
| public void testTruncateBinaryMetricsMode() throws IOException { |
| String colName = "bin_to_truncate"; |
| Schema singleBinaryColSchema = new Schema( |
| required(1, colName, Types.BinaryType.get()) |
| ); |
| |
| byte[] value = new byte[]{ 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8, 0x9, 0x10, 0xA, 0xB}; |
| Record record = GenericRecord.create(singleBinaryColSchema); |
| record.setField(colName, ByteBuffer.wrap(value)); |
| |
| Metrics metrics = getMetrics( |
| singleBinaryColSchema, |
| MetricsConfig.fromProperties(ImmutableMap.of("write.metadata.metrics.default", "truncate(5)")), |
| record); |
| |
| ByteBuffer expectedMinBounds = ByteBuffer.wrap(new byte[]{ 0x1, 0x2, 0x3, 0x4, 0x5 }); |
| ByteBuffer expectedMaxBounds = ByteBuffer.wrap(new byte[]{ 0x1, 0x2, 0x3, 0x4, 0x6 }); |
| Assert.assertEquals(1L, (long) metrics.recordCount()); |
| Assert.assertTrue(metrics.columnSizes().values().stream().allMatch(Objects::nonNull)); |
| assertCounts(1, 1L, 0L, metrics); |
| assertBounds(1, Types.BinaryType.get(), expectedMinBounds, expectedMaxBounds, metrics); |
| } |
| |
| protected void assertCounts(int fieldId, Long valueCount, Long nullValueCount, Metrics metrics) { |
| assertCounts(fieldId, valueCount, nullValueCount, null, metrics); |
| } |
| |
| protected void assertCounts(int fieldId, Long valueCount, Long nullValueCount, Long nanValueCount, Metrics metrics) { |
| Map<Integer, Long> valueCounts = metrics.valueCounts(); |
| Map<Integer, Long> nullValueCounts = metrics.nullValueCounts(); |
| Map<Integer, Long> nanValueCounts = metrics.nanValueCounts(); |
| Assert.assertEquals(valueCount, valueCounts.get(fieldId)); |
| Assert.assertEquals(nullValueCount, nullValueCounts.get(fieldId)); |
| if (fileFormat() != FileFormat.ORC) { |
| Assert.assertEquals(nanValueCount, nanValueCounts.get(fieldId)); |
| } |
| } |
| |
| protected <T> void assertBounds(int fieldId, Type type, T lowerBound, T upperBound, Metrics metrics) { |
| Map<Integer, ByteBuffer> lowerBounds = metrics.lowerBounds(); |
| Map<Integer, ByteBuffer> upperBounds = metrics.upperBounds(); |
| |
| Assert.assertEquals( |
| lowerBound, |
| lowerBounds.containsKey(fieldId) ? fromByteBuffer(type, lowerBounds.get(fieldId)) : null); |
| Assert.assertEquals( |
| upperBound, |
| upperBounds.containsKey(fieldId) ? fromByteBuffer(type, upperBounds.get(fieldId)) : null); |
| } |
| |
| } |