blob: ab29fb74e2d60abe2f5e39cee5bd9f2d884e850e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.io;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TestTables;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.deletes.PositionDeleteWriter;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Types;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public abstract class TestWriterMetrics<T> {
private static final int FORMAT_V2 = 2;
protected static final Types.NestedField ID_FIELD = required(1, "id", Types.IntegerType.get());
protected static final Types.NestedField DATA_FIELD = optional(2, "data", Types.StringType.get());
protected static final Types.StructType NESTED_FIELDS =
Types.StructType.of(
required(4, "booleanField", Types.BooleanType.get()),
optional(5, "longValue", Types.LongType.get()));
protected static final Types.NestedField STRUCT_FIELD = optional(3, "structField", NESTED_FIELDS);
// create a schema with all supported fields
protected static final Schema SCHEMA = new Schema(ID_FIELD, DATA_FIELD, STRUCT_FIELD);
protected static final SortOrder sortOrder =
SortOrder.builderFor(SCHEMA).asc("id").asc("structField.longValue").build();
protected static final Map<String, String> properties =
ImmutableMap.of(TableProperties.DEFAULT_WRITE_METRICS_MODE, "none");
@Rule public TemporaryFolder temp = new TemporaryFolder();
protected FileFormat fileFormat;
protected TestTables.TestTable table = null;
private OutputFileFactory fileFactory = null;
@Parameterized.Parameters(name = "FileFormat = {0}")
public static Object[][] parameters() {
return new Object[][] {{FileFormat.ORC}, {FileFormat.PARQUET}};
}
public TestWriterMetrics(FileFormat fileFormat) {
this.fileFormat = fileFormat;
}
protected abstract FileWriterFactory<T> newWriterFactory(Table sourceTable);
protected abstract T toRow(Integer id, String data, boolean boolValue, Long longValue);
protected abstract T toGenericRow(int value, int repeated);
@Before
public void setupTable() throws Exception {
File tableDir = temp.newFolder();
tableDir.delete(); // created by table create
this.table =
TestTables.create(
tableDir, "test", SCHEMA, PartitionSpec.unpartitioned(), sortOrder, FORMAT_V2);
table.updateProperties().set(TableProperties.DEFAULT_WRITE_METRICS_MODE, "none").commit();
this.fileFactory = OutputFileFactory.builderFor(table, 1, 1).format(fileFormat).build();
}
@After
public void after() {
TestTables.clearTables();
}
@Test
public void verifySortedColMetric() throws Exception {
T row = toRow(3, "3", true, 3L);
DataWriter dataWriter =
newWriterFactory(table)
.newDataWriter(fileFactory.newOutputFile(), PartitionSpec.unpartitioned(), null);
dataWriter.write(row);
dataWriter.close();
DataFile dataFile = dataWriter.toDataFile();
// Only two sorted fields (id, structField.longValue) will have metrics
Map<Integer, ByteBuffer> lowerBounds = dataFile.lowerBounds();
Assert.assertEquals(
3, (int) Conversions.fromByteBuffer(Types.IntegerType.get(), lowerBounds.get(1)));
Assert.assertFalse(lowerBounds.containsKey(2));
Assert.assertFalse(lowerBounds.containsKey(3));
Assert.assertFalse(lowerBounds.containsKey(4));
Assert.assertEquals(
3L, (long) Conversions.fromByteBuffer(Types.LongType.get(), lowerBounds.get(5)));
Map<Integer, ByteBuffer> upperBounds = dataFile.upperBounds();
Assert.assertEquals(
3, (int) Conversions.fromByteBuffer(Types.IntegerType.get(), upperBounds.get(1)));
Assert.assertFalse(upperBounds.containsKey(2));
Assert.assertFalse(upperBounds.containsKey(3));
Assert.assertFalse(upperBounds.containsKey(4));
Assert.assertEquals(
3L, (long) Conversions.fromByteBuffer(Types.LongType.get(), upperBounds.get(5)));
}
@Test
public void testPositionDeleteMetrics() throws IOException {
FileWriterFactory<T> writerFactory = newWriterFactory(table);
EncryptedOutputFile outputFile = fileFactory.newOutputFile();
PositionDeleteWriter<T> deleteWriter =
writerFactory.newPositionDeleteWriter(outputFile, table.spec(), null);
try {
T deletedRow = toRow(3, "3", true, 3L);
PositionDelete<T> positionDelete = PositionDelete.create();
positionDelete.set("File A", 1, deletedRow);
deleteWriter.write(positionDelete);
} finally {
deleteWriter.close();
}
DeleteFile deleteFile = deleteWriter.toDeleteFile();
int pathFieldId = MetadataColumns.DELETE_FILE_PATH.fieldId();
int posFieldId = MetadataColumns.DELETE_FILE_POS.fieldId();
// should have metrics for _file and _pos as well as two sorted fields (id,
// structField.longValue)
Map<Integer, ByteBuffer> lowerBounds = deleteFile.lowerBounds();
Assert.assertEquals(
CharBuffer.wrap("File A"),
Conversions.fromByteBuffer(Types.StringType.get(), lowerBounds.get(pathFieldId)));
Assert.assertEquals(
1L, (long) Conversions.fromByteBuffer(Types.LongType.get(), lowerBounds.get(posFieldId)));
Assert.assertEquals(
3, (int) Conversions.fromByteBuffer(Types.IntegerType.get(), lowerBounds.get(1)));
Assert.assertFalse(lowerBounds.containsKey(2));
Assert.assertFalse(lowerBounds.containsKey(3));
Assert.assertFalse(lowerBounds.containsKey(4));
Assert.assertEquals(
3L, (long) Conversions.fromByteBuffer(Types.LongType.get(), lowerBounds.get(5)));
Map<Integer, ByteBuffer> upperBounds = deleteFile.upperBounds();
Assert.assertEquals(
CharBuffer.wrap("File A"),
Conversions.fromByteBuffer(Types.StringType.get(), upperBounds.get(pathFieldId)));
Assert.assertEquals(
1L, (long) Conversions.fromByteBuffer(Types.LongType.get(), upperBounds.get(posFieldId)));
Assert.assertEquals(
3, (int) Conversions.fromByteBuffer(Types.IntegerType.get(), upperBounds.get(1)));
Assert.assertFalse(upperBounds.containsKey(2));
Assert.assertFalse(upperBounds.containsKey(3));
Assert.assertFalse(upperBounds.containsKey(4));
Assert.assertEquals(
3L, (long) Conversions.fromByteBuffer(Types.LongType.get(), upperBounds.get(5)));
}
@Test
public void testMaxColumns() throws IOException {
File tableDir = temp.newFolder();
tableDir.delete(); // created by table create
int numColumns = TableProperties.METRICS_MAX_INFERRED_COLUMN_DEFAULTS_DEFAULT + 1;
List<Types.NestedField> fields = Lists.newArrayListWithCapacity(numColumns);
for (int i = 0; i < numColumns; i++) {
fields.add(required(i, "col" + i, Types.IntegerType.get()));
}
Schema maxColSchema = new Schema(fields);
Table maxColumnTable =
TestTables.create(
tableDir,
"max_col_table",
maxColSchema,
PartitionSpec.unpartitioned(),
SortOrder.unsorted(),
FORMAT_V2);
OutputFileFactory maxColFactory =
OutputFileFactory.builderFor(maxColumnTable, 1, 1).format(fileFormat).build();
T row = toGenericRow(1, numColumns);
DataWriter dataWriter =
newWriterFactory(maxColumnTable)
.newDataWriter(maxColFactory.newOutputFile(), PartitionSpec.unpartitioned(), null);
dataWriter.add(row);
dataWriter.close();
DataFile dataFile = dataWriter.toDataFile();
// start at 1 because IDs were reassigned in the table
int id = 1;
for (; id <= TableProperties.METRICS_MAX_INFERRED_COLUMN_DEFAULTS_DEFAULT; id += 1) {
Assert.assertNotNull("Should have lower bound metrics", dataFile.lowerBounds().get(id));
Assert.assertNotNull("Should have upper bound metrics", dataFile.upperBounds().get(id));
Assert.assertNull(
"Should not have nan value metrics (not floating point)",
dataFile.nanValueCounts().get(id));
Assert.assertNotNull("Should have null value metrics", dataFile.nullValueCounts().get(id));
Assert.assertNotNull("Should have value metrics", dataFile.valueCounts().get(id));
}
// Remaining fields should not have metrics
for (; id <= numColumns; id += 1) {
Assert.assertNull("Should not have any lower bound metrics", dataFile.lowerBounds().get(id));
Assert.assertNull("Should not have any upper bound metrics", dataFile.upperBounds().get(id));
Assert.assertNull("Should not have any nan value metrics", dataFile.nanValueCounts().get(id));
Assert.assertNull(
"Should not have any null value metrics", dataFile.nullValueCounts().get(id));
Assert.assertNull("Should not have any value metrics", dataFile.valueCounts().get(id));
}
}
@Test
public void testMaxColumnsWithDefaultOverride() throws IOException {
File tableDir = temp.newFolder();
tableDir.delete(); // created by table create
int numColumns = TableProperties.METRICS_MAX_INFERRED_COLUMN_DEFAULTS_DEFAULT + 1;
List<Types.NestedField> fields = Lists.newArrayListWithCapacity(numColumns);
for (int i = 0; i < numColumns; i++) {
fields.add(required(i, "col" + i, Types.IntegerType.get()));
}
Schema maxColSchema = new Schema(fields);
Table maxColumnTable =
TestTables.create(
tableDir,
"max_col_table",
maxColSchema,
PartitionSpec.unpartitioned(),
SortOrder.unsorted(),
FORMAT_V2);
maxColumnTable
.updateProperties()
.set(
TableProperties.DEFAULT_WRITE_METRICS_MODE,
TableProperties.DEFAULT_WRITE_METRICS_MODE_DEFAULT)
.commit();
OutputFileFactory maxColFactory =
OutputFileFactory.builderFor(maxColumnTable, 1, 1).format(fileFormat).build();
T row = toGenericRow(1, numColumns);
DataWriter dataWriter =
newWriterFactory(maxColumnTable)
.newDataWriter(maxColFactory.newOutputFile(), PartitionSpec.unpartitioned(), null);
dataWriter.add(row);
dataWriter.close();
DataFile dataFile = dataWriter.toDataFile();
// Field should have metrics because the user set the default explicitly
Map<Integer, ByteBuffer> upperBounds = dataFile.upperBounds();
Map<Integer, ByteBuffer> lowerBounds = dataFile.upperBounds();
for (int i = 0; i < numColumns; i++) {
Assert.assertEquals(
1, (int) Conversions.fromByteBuffer(Types.IntegerType.get(), upperBounds.get(1)));
Assert.assertEquals(
1, (int) Conversions.fromByteBuffer(Types.IntegerType.get(), lowerBounds.get(1)));
}
}
}