blob: c669cf02a2be6aedb4035573367b5f0d1bf59763 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.parquet;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iceberg.Schema;
import org.apache.iceberg.mapping.MappingUtil;
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.MessageTypeParser;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
import org.apache.parquet.schema.Type;
import org.apache.parquet.schema.Type.Repetition;
import org.apache.parquet.schema.Types.GroupBuilder;
import org.apache.parquet.schema.Types.ListBuilder;
import org.apache.parquet.schema.Types.MapBuilder;
import org.apache.parquet.schema.Types.PrimitiveBuilder;
import org.junit.Assert;
import org.junit.Test;
public class TestParquetSchemaUtil {
private static final Types.StructType SUPPORTED_PRIMITIVES =
Types.StructType.of(
required(100, "id", Types.LongType.get()),
optional(101, "data", Types.StringType.get()),
required(102, "b", Types.BooleanType.get()),
optional(103, "i", Types.IntegerType.get()),
required(104, "l", Types.LongType.get()),
optional(105, "f", Types.FloatType.get()),
required(106, "d", Types.DoubleType.get()),
optional(107, "date", Types.DateType.get()),
required(108, "ts", Types.TimestampType.withZone()),
required(110, "s", Types.StringType.get()),
required(112, "fixed", Types.FixedType.ofLength(7)),
optional(113, "bytes", Types.BinaryType.get()),
required(114, "dec_9_0", Types.DecimalType.of(9, 0)),
required(115, "dec_11_2", Types.DecimalType.of(11, 2)),
required(116, "dec_38_10", Types.DecimalType.of(38, 10)) // spark's maximum precision
);
@Test
public void testAssignIdsByNameMapping() {
Types.StructType structType =
Types.StructType.of(
required(0, "id", Types.LongType.get()),
optional(
1,
"list_of_maps",
Types.ListType.ofOptional(
2,
Types.MapType.ofOptional(3, 4, Types.StringType.get(), SUPPORTED_PRIMITIVES))),
optional(
5,
"map_of_lists",
Types.MapType.ofOptional(
6,
7,
Types.StringType.get(),
Types.ListType.ofOptional(8, SUPPORTED_PRIMITIVES))),
required(
9,
"list_of_lists",
Types.ListType.ofOptional(10, Types.ListType.ofOptional(11, SUPPORTED_PRIMITIVES))),
required(
12,
"map_of_maps",
Types.MapType.ofOptional(
13,
14,
Types.StringType.get(),
Types.MapType.ofOptional(
15, 16, Types.StringType.get(), SUPPORTED_PRIMITIVES))),
required(
17,
"list_of_struct_of_nested_types",
Types.ListType.ofOptional(
19,
Types.StructType.of(
Types.NestedField.required(
20,
"m1",
Types.MapType.ofOptional(
21, 22, Types.StringType.get(), SUPPORTED_PRIMITIVES)),
Types.NestedField.optional(
23, "l1", Types.ListType.ofRequired(24, SUPPORTED_PRIMITIVES)),
Types.NestedField.required(
25, "l2", Types.ListType.ofRequired(26, SUPPORTED_PRIMITIVES)),
Types.NestedField.optional(
27,
"m2",
Types.MapType.ofOptional(
28, 29, Types.StringType.get(), SUPPORTED_PRIMITIVES))))));
Schema schema =
new Schema(
TypeUtil.assignFreshIds(structType, new AtomicInteger(0)::incrementAndGet)
.asStructType()
.fields());
NameMapping nameMapping = MappingUtil.create(schema);
MessageType messageTypeWithIds = ParquetSchemaUtil.convert(schema, "parquet_type");
MessageType messageTypeWithIdsFromNameMapping =
ParquetSchemaUtil.applyNameMapping(RemoveIds.removeIds(messageTypeWithIds), nameMapping);
Assert.assertEquals(messageTypeWithIds, messageTypeWithIdsFromNameMapping);
}
@Test
public void testSchemaConversionWithoutAssigningIds() {
MessageType messageType =
new MessageType(
"test",
primitive(1, "int_col", PrimitiveTypeName.INT32, Repetition.REQUIRED),
primitive(2, "double_col", PrimitiveTypeName.DOUBLE, Repetition.OPTIONAL),
primitive(null, "long_col", PrimitiveTypeName.INT64, Repetition.OPTIONAL),
struct(
3,
"struct_col_1",
Repetition.REQUIRED,
primitive(4, "n1", PrimitiveTypeName.INT32, Repetition.REQUIRED),
primitive(null, "n2", PrimitiveTypeName.INT64, Repetition.OPTIONAL),
primitive(5, "n3", PrimitiveTypeName.INT64, Repetition.OPTIONAL)),
struct(
6,
"struct_col_2",
Repetition.OPTIONAL,
primitive(null, "n1", PrimitiveTypeName.INT32, Repetition.REQUIRED),
primitive(null, "n2", PrimitiveTypeName.INT64, Repetition.OPTIONAL),
primitive(null, "n3", PrimitiveTypeName.INT64, Repetition.OPTIONAL)),
list(
null,
"list_col_1",
Repetition.REQUIRED,
primitive(7, "i", PrimitiveTypeName.INT32, Repetition.OPTIONAL)),
list(
8,
"list_col_2",
Repetition.REQUIRED,
primitive(null, "i", PrimitiveTypeName.INT32, Repetition.OPTIONAL)),
list(
9,
"list_col_3",
Repetition.OPTIONAL,
struct(
null,
"s",
Repetition.REQUIRED,
primitive(10, "n1", PrimitiveTypeName.INT32, Repetition.REQUIRED),
primitive(11, "n2", PrimitiveTypeName.INT64, Repetition.OPTIONAL))),
list(
12,
"list_col_4",
Repetition.REQUIRED,
struct(
13,
"s",
Repetition.REQUIRED,
primitive(null, "n1", PrimitiveTypeName.INT32, Repetition.REQUIRED),
primitive(null, "n2", PrimitiveTypeName.INT64, Repetition.OPTIONAL))),
list(
14,
"list_col_5",
Repetition.OPTIONAL,
struct(
15,
"s",
Repetition.REQUIRED,
primitive(16, "n1", PrimitiveTypeName.INT32, Repetition.REQUIRED),
primitive(17, "n2", PrimitiveTypeName.INT64, Repetition.OPTIONAL))),
map(
null,
"map_col_1",
Repetition.REQUIRED,
primitive(18, "k", PrimitiveTypeName.INT32, Repetition.REQUIRED),
primitive(19, "v", PrimitiveTypeName.INT32, Repetition.REQUIRED)),
map(
20,
"map_col_2",
Repetition.OPTIONAL,
primitive(null, "k", PrimitiveTypeName.INT32, Repetition.REQUIRED),
primitive(21, "v", PrimitiveTypeName.INT32, Repetition.REQUIRED)),
map(
22,
"map_col_3",
Repetition.REQUIRED,
primitive(null, "k", PrimitiveTypeName.INT32, Repetition.REQUIRED),
primitive(null, "v", PrimitiveTypeName.INT32, Repetition.REQUIRED)),
map(
23,
"map_col_4",
Repetition.OPTIONAL,
primitive(24, "k", PrimitiveTypeName.INT32, Repetition.REQUIRED),
struct(
25,
"s",
Repetition.REQUIRED,
primitive(null, "n1", PrimitiveTypeName.INT32, Repetition.REQUIRED),
primitive(26, "n2", PrimitiveTypeName.INT64, Repetition.OPTIONAL),
primitive(null, "n3", PrimitiveTypeName.INT64, Repetition.OPTIONAL))),
map(
27,
"map_col_5",
Repetition.REQUIRED,
primitive(28, "k", PrimitiveTypeName.INT32, Repetition.REQUIRED),
primitive(29, "v", PrimitiveTypeName.INT32, Repetition.REQUIRED)));
Schema expectedSchema =
new Schema(
required(1, "int_col", Types.IntegerType.get()),
optional(2, "double_col", Types.DoubleType.get()),
required(
3,
"struct_col_1",
Types.StructType.of(
required(4, "n1", Types.IntegerType.get()),
optional(5, "n3", Types.LongType.get()))),
optional(
14,
"list_col_5",
Types.ListType.ofRequired(
15,
Types.StructType.of(
required(16, "n1", Types.IntegerType.get()),
optional(17, "n2", Types.LongType.get())))),
optional(
23,
"map_col_4",
Types.MapType.ofRequired(
24,
25,
Types.IntegerType.get(),
Types.StructType.of(optional(26, "n2", Types.LongType.get())))),
required(
27,
"map_col_5",
Types.MapType.ofRequired(
28, 29, Types.IntegerType.get(), Types.IntegerType.get())));
Schema actualSchema = ParquetSchemaUtil.convertAndPrune(messageType);
Assert.assertEquals("Schema must match", expectedSchema.asStruct(), actualSchema.asStruct());
}
@Test
public void testSchemaConversionForHiveStyleLists() {
String parquetSchemaString =
"message spark_schema {\n"
+ " optional group col1 (LIST) {\n"
+ " repeated group bag {\n"
+ " optional group array {\n"
+ " required int32 col2;\n"
+ " }\n"
+ " }\n"
+ " }\n"
+ "}\n";
MessageType messageType = MessageTypeParser.parseMessageType(parquetSchemaString);
Schema expectedSchema =
new Schema(
optional(
1,
"col1",
Types.ListType.ofOptional(
2, Types.StructType.of(required(3, "col2", Types.IntegerType.get())))));
NameMapping nameMapping = MappingUtil.create(expectedSchema);
MessageType messageTypeWithIds = ParquetSchemaUtil.applyNameMapping(messageType, nameMapping);
Schema actualSchema = ParquetSchemaUtil.convertAndPrune(messageTypeWithIds);
Assert.assertEquals("Schema must match", expectedSchema.asStruct(), actualSchema.asStruct());
}
@Test
public void testLegacyTwoLevelListTypeWithPrimitiveElement() {
String parquetSchemaString =
"message spark_schema {\n"
+ " optional group arraybytes (LIST) {\n"
+ " repeated binary array;\n"
+ " }\n"
+ "}\n";
MessageType messageType = MessageTypeParser.parseMessageType(parquetSchemaString);
Schema expectedSchema =
new Schema(
optional(1, "arraybytes", Types.ListType.ofRequired(1000, Types.BinaryType.get())));
Schema actualSchema = ParquetSchemaUtil.convert(messageType);
Assert.assertEquals("Schema must match", expectedSchema.asStruct(), actualSchema.asStruct());
}
@Test
public void testLegacyTwoLevelListTypeWithGroupTypeElementWithTwoFields() {
String messageType =
"message root {"
+ " required group f0 {"
+ " required group f00 (LIST) {"
+ " repeated group element {"
+ " required int32 f000;"
+ " optional int64 f001;"
+ " }"
+ " }"
+ " }"
+ "}";
MessageType parquetScehma = MessageTypeParser.parseMessageType(messageType);
Schema expectedSchema =
new Schema(
required(
1,
"f0",
Types.StructType.of(
required(
1003,
"f00",
Types.ListType.ofRequired(
1002,
Types.StructType.of(
required(1000, "f000", Types.IntegerType.get()),
optional(1001, "f001", Types.LongType.get())))))));
Schema actualSchema = ParquetSchemaUtil.convert(parquetScehma);
Assert.assertEquals("Schema must match", expectedSchema.asStruct(), actualSchema.asStruct());
}
@Test
public void testLegacyTwoLevelListGenByParquetAvro() {
String messageType =
"message root {"
+ " optional group my_list (LIST) {"
+ " repeated group array {"
+ " required binary str (UTF8);"
+ " }"
+ " }"
+ "}";
MessageType parquetScehma = MessageTypeParser.parseMessageType(messageType);
Schema expectedSchema =
new Schema(
optional(
1,
"my_list",
Types.ListType.ofRequired(
1001, Types.StructType.of(required(1000, "str", Types.StringType.get())))));
Schema actualSchema = ParquetSchemaUtil.convert(parquetScehma);
Assert.assertEquals("Schema must match", expectedSchema.asStruct(), actualSchema.asStruct());
}
@Test
public void testLegacyTwoLevelListGenByParquetThrift() {
String messageType =
"message root {"
+ " optional group my_list (LIST) {"
+ " repeated group my_list_tuple {"
+ " required binary str (UTF8);"
+ " }"
+ " }"
+ "}";
MessageType parquetScehma = MessageTypeParser.parseMessageType(messageType);
Schema expectedSchema =
new Schema(
optional(
1,
"my_list",
Types.ListType.ofRequired(
1001, Types.StructType.of(required(1000, "str", Types.StringType.get())))));
Schema actualSchema = ParquetSchemaUtil.convert(parquetScehma);
Assert.assertEquals("Schema must match", expectedSchema.asStruct(), actualSchema.asStruct());
}
@Test
public void testLegacyTwoLevelListGenByParquetThrift1() {
String messageType =
"message root {"
+ " optional group my_list (LIST) {"
+ " repeated group my_list_tuple (LIST) {"
+ " repeated int32 my_list_tuple_tuple;"
+ " }"
+ " }"
+ "}";
MessageType parquetScehma = MessageTypeParser.parseMessageType(messageType);
Schema expectedSchema =
new Schema(
optional(
1,
"my_list",
Types.ListType.ofRequired(
1001, Types.ListType.ofRequired(1000, Types.IntegerType.get()))));
Schema actualSchema = ParquetSchemaUtil.convert(parquetScehma);
Assert.assertEquals("Schema must match", expectedSchema.asStruct(), actualSchema.asStruct());
}
private Type primitive(
Integer id, String name, PrimitiveTypeName typeName, Repetition repetition) {
PrimitiveBuilder<PrimitiveType> builder =
org.apache.parquet.schema.Types.primitive(typeName, repetition);
if (id != null) {
builder.id(id);
}
return builder.named(name);
}
private Type struct(Integer id, String name, Repetition repetition, Type... types) {
GroupBuilder<GroupType> builder = org.apache.parquet.schema.Types.buildGroup(repetition);
builder.addFields(types);
if (id != null) {
builder.id(id);
}
return builder.named(name);
}
private Type list(Integer id, String name, Repetition repetition, Type elementType) {
ListBuilder<GroupType> builder = org.apache.parquet.schema.Types.list(repetition);
builder.element(elementType);
if (id != null) {
builder.id(id);
}
return builder.named(name);
}
private Type map(Integer id, String name, Repetition repetition, Type keyType, Type valueType) {
MapBuilder<GroupType> builder = org.apache.parquet.schema.Types.map(repetition);
builder.key(keyType);
builder.value(valueType);
if (id != null) {
builder.id(id);
}
return builder.named(name);
}
}