blob: 385d633707313c78f4d041ba6b3a18a28764ea61 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.metastore.iceberg.write;
import org.apache.drill.metastore.iceberg.IcebergBaseTest;
import org.apache.drill.metastore.iceberg.exceptions.IcebergMetastoreException;
import com.google.common.collect.Lists;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.Tables;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.parquet.GenericParquetReaders;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.types.Types;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
public class TestParquetFileWriter extends IcebergBaseTest {
private static Tables tables;
@BeforeClass
public static void init() {
tables = new HadoopTables(baseHadoopConfig());
}
@Test
public void testAllTypes() throws Exception {
Schema schema = new Schema(
Types.NestedField.optional(1, "int_field", Types.IntegerType.get()),
Types.NestedField.optional(2, "long_field", Types.LongType.get()),
Types.NestedField.optional(3, "float_field", Types.FloatType.get()),
Types.NestedField.optional(4, "double_field", Types.DoubleType.get()),
Types.NestedField.optional(5, "string_field", Types.StringType.get()),
Types.NestedField.optional(6, "boolean_field", Types.BooleanType.get()),
Types.NestedField.optional(7, "list_field", Types.ListType.ofOptional(
9, Types.StringType.get())),
Types.NestedField.optional(8, "map_field", Types.MapType.ofOptional(
10, 11, Types.StringType.get(), Types.FloatType.get())));
List<String> listValue = Arrays.asList("a", "b", "c");
Map<String, Float> mapValue = new HashMap<>();
mapValue.put("a", 0.1F);
mapValue.put("b", 0.2F);
Record record = GenericRecord.create(schema);
record.setField("int_field", 1);
record.setField("long_field", 100L);
record.setField("float_field", 0.5F);
record.setField("double_field", 1.5D);
record.setField("string_field", "abc");
record.setField("boolean_field", true);
record.setField("list_field", listValue);
record.setField("map_field", mapValue);
String location = defaultFolder.newFolder("testAllTypes").toURI().getPath();
String fileName = "allTypes";
Table table = tables.create(schema, location);
org.apache.drill.metastore.iceberg.write.File result = new ParquetFileWriter(table)
.records(Collections.singletonList(record))
.location(location)
.name(fileName)
.write();
String writePath = new Path(location, FileFormat.PARQUET.addExtension(fileName)).toUri().getPath();
assertEquals(new Path( FileFormat.PARQUET.addExtension(writePath)), new Path(result.location()));
assertEquals(Long.valueOf(1), result.metrics().recordCount());
List<Record> rows = readData(result.input(), schema);
assertEquals(1, rows.size());
Record row = rows.get(0);
assertEquals(1, row.getField("int_field"));
assertEquals(100L, row.getField("long_field"));
assertEquals(0.5F, row.getField("float_field"));
assertEquals(1.5D, row.getField("double_field"));
assertEquals("abc", row.getField("string_field"));
assertEquals(true, row.getField("boolean_field"));
assertEquals(listValue, row.getField("list_field"));
assertEquals(mapValue, row.getField("map_field"));
}
@Test
public void testNullAndEmptyValues() throws Exception {
Schema schema = new Schema(
Types.NestedField.optional(1, "int_null_field", Types.IntegerType.get()),
Types.NestedField.optional(2, "string_null_field", Types.StringType.get()),
Types.NestedField.optional(3, "string_empty_field", Types.StringType.get()),
Types.NestedField.optional(4, "boolean_null_field", Types.BooleanType.get()),
Types.NestedField.optional(5, "list_null_field", Types.ListType.ofOptional(
9, Types.StringType.get())),
Types.NestedField.optional(6, "list_empty_field", Types.ListType.ofOptional(
10, Types.StringType.get())),
Types.NestedField.optional(7, "map_null_field", Types.MapType.ofOptional(
11, 12, Types.StringType.get(), Types.FloatType.get())),
Types.NestedField.optional(8, "map_empty_field", Types.MapType.ofOptional(
13, 14, Types.StringType.get(), Types.FloatType.get())));
Record record = GenericRecord.create(schema);
record.setField("int_null_field", null);
record.setField("string_null_field", null);
record.setField("string_empty_field", "");
record.setField("boolean_null_field", null);
record.setField("list_null_field", null);
record.setField("list_empty_field", Collections.emptyList());
record.setField("map_null_field", null);
record.setField("map_empty_field", Collections.emptyMap());
String location = defaultFolder.newFolder("testNullAndEmptyValues").toURI().getPath();
Table table = tables.create(schema, location);
org.apache.drill.metastore.iceberg.write.File result = new ParquetFileWriter(table)
.records(Collections.singletonList(record))
.location(location)
.name("nullEmptyValues")
.write();
assertEquals(Long.valueOf(1), result.metrics().recordCount());
List<Record> rows = readData(result.input(), schema);
assertEquals(1, rows.size());
Record row = rows.get(0);
assertNull(row.getField("int_null_field"));
assertNull(row.getField("string_null_field"));
assertEquals("", row.getField("string_empty_field"));
assertNull(row.getField("boolean_null_field"));
assertNull(row.getField("list_null_field"));
assertEquals(Collections.emptyList(), row.getField("list_empty_field"));
assertNull(row.getField("map_null_field"));
assertEquals(Collections.emptyMap(), row.getField("map_empty_field"));
}
@Test
public void testEmptyFile() throws Exception {
Schema schema = new Schema(
Types.NestedField.optional(1, "int_field", Types.IntegerType.get()));
String location = defaultFolder.newFolder("testEmptyFile").toURI().getPath();
Table table = tables.create(schema, location);
org.apache.drill.metastore.iceberg.write.File result = new ParquetFileWriter(table)
.location(location)
.name("emptyFile")
.write();
assertEquals(Long.valueOf(0), result.metrics().recordCount());
List<Record> rows = readData(result.input(), schema);
assertEquals(0, rows.size());
}
@Test
public void testSeveralRecords() throws Exception {
int fieldIndex = 1;
Schema schema = new Schema(
Types.NestedField.optional(fieldIndex, "int_field", Types.IntegerType.get()));
List<Integer> values = Arrays.asList(1, 2, 3, 3, null, null, null);
List<Record> records = values.stream()
.map(value -> {
Record record = GenericRecord.create(schema);
record.setField("int_field", value);
return record;
})
.collect(Collectors.toList());
String location = defaultFolder.newFolder("testSeveralRecords").toURI().getPath();
Table table = tables.create(schema, location);
org.apache.drill.metastore.iceberg.write.File result = new ParquetFileWriter(table)
.records(records)
.location(location)
.name("severalRecords")
.write();
assertEquals(Long.valueOf(7), result.metrics().recordCount());
assertEquals(Long.valueOf(7), result.metrics().valueCounts().get(fieldIndex));
assertEquals(Long.valueOf(3), result.metrics().nullValueCounts().get(fieldIndex));
List<Record> rows = readData(result.input(), schema);
assertEquals(7, rows.size());
List<Integer> actual = rows.stream()
.map(row -> (Integer) row.getField("int_field"))
.collect(Collectors.toList());
assertEquals(values, actual);
}
@Test
public void testTypeMismatch() throws Exception {
Schema schema = new Schema(
Types.NestedField.optional(1, "int_field", Types.IntegerType.get()));
Record record = GenericRecord.create(schema);
record.setField("int_field", 1);
record.setField("int_field", "abc");
String location = defaultFolder.newFolder("testTypeMismatch").toURI().getPath();
Table table = tables.create(schema, location);
thrown.expect(IcebergMetastoreException.class);
new ParquetFileWriter(table)
.records(Collections.singletonList(record))
.location(location)
.name("typeMismatch")
.write();
}
@Test
public void testWriteIntoExistingFile() throws Exception {
Schema schema = new Schema(
Types.NestedField.optional(1, "int_field", Types.IntegerType.get()));
Record record = GenericRecord.create(schema);
record.setField("int_field", 1);
String fileName = "existingFile";
String location = defaultFolder.newFolder("testWriteIntoExistingFile").toURI().getPath();
Table table = tables.create(schema, location);
java.nio.file.Path file = Paths.get(new File(location, FileFormat.PARQUET.addExtension(fileName)).getPath());
Files.write(file, Collections.singletonList("abc"));
thrown.expect(IcebergMetastoreException.class);
new ParquetFileWriter(table)
.records(Collections.singletonList(record))
.location(location)
.name(fileName)
.write();
}
private List<Record> readData(InputFile inputFile, Schema schema) throws IOException {
try (CloseableIterable<Record> reader = Parquet.read(inputFile)
.project(schema)
.createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(schema, fileSchema))
.build()) {
return Lists.newArrayList(reader);
}
}
}