blob: 1e2a825d8e7609338b6d2f59a2c49596d375788c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.spark.source;
import static org.apache.iceberg.spark.SparkSchemaUtil.convert;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.SparkWriteOptions;
import org.apache.iceberg.spark.data.RandomData;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ByteBuffers;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.InternalRow;
import org.assertj.core.api.Assertions;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
public class TestWriteMetricsConfig {
private static final Configuration CONF = new Configuration();
private static final Schema SIMPLE_SCHEMA =
new Schema(
optional(1, "id", Types.IntegerType.get()), optional(2, "data", Types.StringType.get()));
private static final Schema COMPLEX_SCHEMA =
new Schema(
required(1, "longCol", Types.IntegerType.get()),
optional(2, "strCol", Types.StringType.get()),
required(
3,
"record",
Types.StructType.of(
required(4, "id", Types.IntegerType.get()),
required(5, "data", Types.StringType.get()))));
@Rule public TemporaryFolder temp = new TemporaryFolder();
private static SparkSession spark = null;
private static JavaSparkContext sc = null;
@BeforeClass
public static void startSpark() {
TestWriteMetricsConfig.spark = SparkSession.builder().master("local[2]").getOrCreate();
TestWriteMetricsConfig.sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
}
@AfterClass
public static void stopSpark() {
SparkSession currentSpark = TestWriteMetricsConfig.spark;
TestWriteMetricsConfig.spark = null;
TestWriteMetricsConfig.sc = null;
currentSpark.stop();
}
@Test
public void testFullMetricsCollectionForParquet() throws IOException {
String tableLocation = temp.newFolder("iceberg-table").toString();
HadoopTables tables = new HadoopTables(CONF);
PartitionSpec spec = PartitionSpec.unpartitioned();
Map<String, String> properties = Maps.newHashMap();
properties.put(TableProperties.DEFAULT_WRITE_METRICS_MODE, "full");
Table table = tables.create(SIMPLE_SCHEMA, spec, properties, tableLocation);
List<SimpleRecord> expectedRecords =
Lists.newArrayList(
new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c"));
Dataset<Row> df = spark.createDataFrame(expectedRecords, SimpleRecord.class);
df.select("id", "data")
.coalesce(1)
.write()
.format("iceberg")
.option(SparkWriteOptions.WRITE_FORMAT, "parquet")
.mode(SaveMode.Append)
.save(tableLocation);
for (FileScanTask task : table.newScan().includeColumnStats().planFiles()) {
DataFile file = task.file();
Assert.assertEquals(2, file.nullValueCounts().size());
Assert.assertEquals(2, file.valueCounts().size());
Assert.assertEquals(2, file.lowerBounds().size());
Assert.assertEquals(2, file.upperBounds().size());
}
}
@Test
public void testCountMetricsCollectionForParquet() throws IOException {
String tableLocation = temp.newFolder("iceberg-table").toString();
HadoopTables tables = new HadoopTables(CONF);
PartitionSpec spec = PartitionSpec.unpartitioned();
Map<String, String> properties = Maps.newHashMap();
properties.put(TableProperties.DEFAULT_WRITE_METRICS_MODE, "counts");
Table table = tables.create(SIMPLE_SCHEMA, spec, properties, tableLocation);
List<SimpleRecord> expectedRecords =
Lists.newArrayList(
new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c"));
Dataset<Row> df = spark.createDataFrame(expectedRecords, SimpleRecord.class);
df.select("id", "data")
.coalesce(1)
.write()
.format("iceberg")
.option(SparkWriteOptions.WRITE_FORMAT, "parquet")
.mode(SaveMode.Append)
.save(tableLocation);
for (FileScanTask task : table.newScan().includeColumnStats().planFiles()) {
DataFile file = task.file();
Assert.assertEquals(2, file.nullValueCounts().size());
Assert.assertEquals(2, file.valueCounts().size());
Assert.assertTrue(file.lowerBounds().isEmpty());
Assert.assertTrue(file.upperBounds().isEmpty());
}
}
@Test
public void testNoMetricsCollectionForParquet() throws IOException {
String tableLocation = temp.newFolder("iceberg-table").toString();
HadoopTables tables = new HadoopTables(CONF);
PartitionSpec spec = PartitionSpec.unpartitioned();
Map<String, String> properties = Maps.newHashMap();
properties.put(TableProperties.DEFAULT_WRITE_METRICS_MODE, "none");
Table table = tables.create(SIMPLE_SCHEMA, spec, properties, tableLocation);
List<SimpleRecord> expectedRecords =
Lists.newArrayList(
new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c"));
Dataset<Row> df = spark.createDataFrame(expectedRecords, SimpleRecord.class);
df.select("id", "data")
.coalesce(1)
.write()
.format("iceberg")
.option(SparkWriteOptions.WRITE_FORMAT, "parquet")
.mode(SaveMode.Append)
.save(tableLocation);
for (FileScanTask task : table.newScan().includeColumnStats().planFiles()) {
DataFile file = task.file();
Assert.assertTrue(file.nullValueCounts().isEmpty());
Assert.assertTrue(file.valueCounts().isEmpty());
Assert.assertTrue(file.lowerBounds().isEmpty());
Assert.assertTrue(file.upperBounds().isEmpty());
}
}
@Test
public void testCustomMetricCollectionForParquet() throws IOException {
String tableLocation = temp.newFolder("iceberg-table").toString();
HadoopTables tables = new HadoopTables(CONF);
PartitionSpec spec = PartitionSpec.unpartitioned();
Map<String, String> properties = Maps.newHashMap();
properties.put(TableProperties.DEFAULT_WRITE_METRICS_MODE, "counts");
properties.put("write.metadata.metrics.column.id", "full");
Table table = tables.create(SIMPLE_SCHEMA, spec, properties, tableLocation);
List<SimpleRecord> expectedRecords =
Lists.newArrayList(
new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c"));
Dataset<Row> df = spark.createDataFrame(expectedRecords, SimpleRecord.class);
df.select("id", "data")
.coalesce(1)
.write()
.format("iceberg")
.option(SparkWriteOptions.WRITE_FORMAT, "parquet")
.mode(SaveMode.Append)
.save(tableLocation);
Schema schema = table.schema();
Types.NestedField id = schema.findField("id");
for (FileScanTask task : table.newScan().includeColumnStats().planFiles()) {
DataFile file = task.file();
Assert.assertEquals(2, file.nullValueCounts().size());
Assert.assertEquals(2, file.valueCounts().size());
Assert.assertEquals(1, file.lowerBounds().size());
Assert.assertTrue(file.lowerBounds().containsKey(id.fieldId()));
Assert.assertEquals(1, file.upperBounds().size());
Assert.assertTrue(file.upperBounds().containsKey(id.fieldId()));
}
}
@Test
public void testBadCustomMetricCollectionForParquet() throws IOException {
String tableLocation = temp.newFolder("iceberg-table").toString();
HadoopTables tables = new HadoopTables(CONF);
PartitionSpec spec = PartitionSpec.unpartitioned();
Map<String, String> properties = Maps.newHashMap();
properties.put(TableProperties.DEFAULT_WRITE_METRICS_MODE, "counts");
properties.put("write.metadata.metrics.column.ids", "full");
Assertions.assertThatThrownBy(
() -> tables.create(SIMPLE_SCHEMA, spec, properties, tableLocation))
.isInstanceOf(ValidationException.class)
.hasMessageStartingWith(
"Invalid metrics config, could not find column ids from table prop write.metadata.metrics.column.ids in schema table");
}
@Test
public void testCustomMetricCollectionForNestedParquet() throws IOException {
String tableLocation = temp.newFolder("iceberg-table").toString();
HadoopTables tables = new HadoopTables(CONF);
PartitionSpec spec = PartitionSpec.builderFor(COMPLEX_SCHEMA).identity("strCol").build();
Map<String, String> properties = Maps.newHashMap();
properties.put(TableProperties.DEFAULT_WRITE_METRICS_MODE, "none");
properties.put("write.metadata.metrics.column.longCol", "counts");
properties.put("write.metadata.metrics.column.record.id", "full");
properties.put("write.metadata.metrics.column.record.data", "truncate(2)");
Table table = tables.create(COMPLEX_SCHEMA, spec, properties, tableLocation);
Iterable<InternalRow> rows = RandomData.generateSpark(COMPLEX_SCHEMA, 10, 0);
JavaRDD<InternalRow> rdd = sc.parallelize(Lists.newArrayList(rows));
Dataset<Row> df =
spark.internalCreateDataFrame(JavaRDD.toRDD(rdd), convert(COMPLEX_SCHEMA), false);
df.coalesce(1)
.write()
.format("iceberg")
.option(SparkWriteOptions.WRITE_FORMAT, "parquet")
.mode(SaveMode.Append)
.save(tableLocation);
Schema schema = table.schema();
Types.NestedField longCol = schema.findField("longCol");
Types.NestedField recordId = schema.findField("record.id");
Types.NestedField recordData = schema.findField("record.data");
for (FileScanTask task : table.newScan().includeColumnStats().planFiles()) {
DataFile file = task.file();
Map<Integer, Long> nullValueCounts = file.nullValueCounts();
Assert.assertEquals(3, nullValueCounts.size());
Assert.assertTrue(nullValueCounts.containsKey(longCol.fieldId()));
Assert.assertTrue(nullValueCounts.containsKey(recordId.fieldId()));
Assert.assertTrue(nullValueCounts.containsKey(recordData.fieldId()));
Map<Integer, Long> valueCounts = file.valueCounts();
Assert.assertEquals(3, valueCounts.size());
Assert.assertTrue(valueCounts.containsKey(longCol.fieldId()));
Assert.assertTrue(valueCounts.containsKey(recordId.fieldId()));
Assert.assertTrue(valueCounts.containsKey(recordData.fieldId()));
Map<Integer, ByteBuffer> lowerBounds = file.lowerBounds();
Assert.assertEquals(2, lowerBounds.size());
Assert.assertTrue(lowerBounds.containsKey(recordId.fieldId()));
ByteBuffer recordDataLowerBound = lowerBounds.get(recordData.fieldId());
Assert.assertEquals(2, ByteBuffers.toByteArray(recordDataLowerBound).length);
Map<Integer, ByteBuffer> upperBounds = file.upperBounds();
Assert.assertEquals(2, upperBounds.size());
Assert.assertTrue(upperBounds.containsKey(recordId.fieldId()));
ByteBuffer recordDataUpperBound = upperBounds.get(recordData.fieldId());
Assert.assertEquals(2, ByteBuffers.toByteArray(recordDataUpperBound).length);
}
}
}