blob: f5b348b6b03a916072301f493f39e22bedf08fb0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.avro;
import com.google.common.collect.Lists;
import com.google.common.io.Resources;
import org.apache.avro.JsonProperties;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.MessageTypeParser;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
import org.apache.parquet.schema.Types;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
import static org.apache.avro.Schema.Type.INT;
import static org.apache.avro.Schema.Type.LONG;
import static org.apache.parquet.avro.AvroTestUtil.field;
import static org.apache.parquet.avro.AvroTestUtil.optionalField;
import static org.apache.parquet.avro.AvroTestUtil.primitive;
import static org.apache.parquet.avro.AvroTestUtil.record;
import static org.apache.parquet.schema.OriginalType.DATE;
import static org.apache.parquet.schema.OriginalType.TIMESTAMP_MICROS;
import static org.apache.parquet.schema.OriginalType.TIMESTAMP_MILLIS;
import static org.apache.parquet.schema.OriginalType.TIME_MICROS;
import static org.apache.parquet.schema.OriginalType.TIME_MILLIS;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT96;
import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
import static org.junit.Assert.assertEquals;
public class TestAvroSchemaConverter {
private static final Configuration NEW_BEHAVIOR = new Configuration(false);
@BeforeClass
public static void setupConf() {
NEW_BEHAVIOR.setBoolean("parquet.avro.add-list-element-records", false);
NEW_BEHAVIOR.setBoolean("parquet.avro.write-old-list-structure", false);
}
public static final String ALL_PARQUET_SCHEMA =
"message org.apache.parquet.avro.myrecord {\n" +
" required boolean myboolean;\n" +
" required int32 myint;\n" +
" required int64 mylong;\n" +
" required float myfloat;\n" +
" required double mydouble;\n" +
" required binary mybytes;\n" +
" required binary mystring (UTF8);\n" +
" required group mynestedrecord {\n" +
" required int32 mynestedint;\n" +
" }\n" +
" required binary myenum (ENUM);\n" +
" required group myarray (LIST) {\n" +
" repeated int32 array;\n" +
" }\n" +
" optional group myoptionalarray (LIST) {\n" +
" repeated int32 array;\n" +
" }\n" +
" required group myarrayofoptional (LIST) {\n" +
" repeated group list {\n" +
" optional int32 element;\n" +
" }\n" +
" }\n" +
" required group myrecordarray (LIST) {\n" +
" repeated group array {\n" +
" required int32 a;\n" +
" required int32 b;\n" +
" }\n" +
" }\n" +
" required group mymap (MAP) {\n" +
" repeated group map (MAP_KEY_VALUE) {\n" +
" required binary key (UTF8);\n" +
" required int32 value;\n" +
" }\n" +
" }\n" +
" required fixed_len_byte_array(1) myfixed;\n" +
"}\n";
private void testAvroToParquetConversion(
Schema avroSchema, String schemaString) throws Exception {
testAvroToParquetConversion(new Configuration(false), avroSchema, schemaString);
}
private void testAvroToParquetConversion(
Configuration conf, Schema avroSchema, String schemaString)
throws Exception {
AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter(conf);
MessageType schema = avroSchemaConverter.convert(avroSchema);
MessageType expectedMT = MessageTypeParser.parseMessageType(schemaString);
assertEquals("converting " + schema + " to " + schemaString, expectedMT.toString(),
schema.toString());
}
private void testParquetToAvroConversion(
Schema avroSchema, String schemaString) throws Exception {
testParquetToAvroConversion(new Configuration(false), avroSchema, schemaString);
}
private void testParquetToAvroConversion(
Configuration conf, Schema avroSchema, String schemaString)
throws Exception {
AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter(conf);
Schema schema = avroSchemaConverter.convert(MessageTypeParser.parseMessageType
(schemaString));
assertEquals("converting " + schemaString + " to " + avroSchema, avroSchema.toString(),
schema.toString());
}
private void testRoundTripConversion(
Schema avroSchema, String schemaString) throws Exception {
testRoundTripConversion(new Configuration(), avroSchema, schemaString);
}
private void testRoundTripConversion(
Configuration conf, Schema avroSchema, String schemaString)
throws Exception {
AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter(conf);
MessageType schema = avroSchemaConverter.convert(avroSchema);
MessageType expectedMT = MessageTypeParser.parseMessageType(schemaString);
assertEquals("converting " + schema + " to " + schemaString, expectedMT.toString(),
schema.toString());
Schema convertedAvroSchema = avroSchemaConverter.convert(expectedMT);
assertEquals("converting " + expectedMT + " to " + avroSchema.toString(true),
avroSchema.toString(), convertedAvroSchema.toString());
}
@Test(expected = IllegalArgumentException.class)
public void testTopLevelMustBeARecord() {
new AvroSchemaConverter().convert(Schema.create(INT));
}
@Test
public void testAllTypes() throws Exception {
Schema schema = new Schema.Parser().parse(
Resources.getResource("all.avsc").openStream());
testAvroToParquetConversion(
NEW_BEHAVIOR, schema,
"message org.apache.parquet.avro.myrecord {\n" +
// Avro nulls are not encoded, unless they are null unions
" required boolean myboolean;\n" +
" required int32 myint;\n" +
" required int64 mylong;\n" +
" required float myfloat;\n" +
" required double mydouble;\n" +
" required binary mybytes;\n" +
" required binary mystring (UTF8);\n" +
" required group mynestedrecord {\n" +
" required int32 mynestedint;\n" +
" }\n" +
" required binary myenum (ENUM);\n" +
" required group myarray (LIST) {\n" +
" repeated group list {\n" +
" required int32 element;\n" +
" }\n" +
" }\n" +
" required group myemptyarray (LIST) {\n" +
" repeated group list {\n" +
" required int32 element;\n" +
" }\n" +
" }\n" +
" optional group myoptionalarray (LIST) {\n" +
" repeated group list {\n" +
" required int32 element;\n" +
" }\n" +
" }\n" +
" required group myarrayofoptional (LIST) {\n" +
" repeated group list {\n" +
" optional int32 element;\n" +
" }\n" +
" }\n" +
" required group mymap (MAP) {\n" +
" repeated group map (MAP_KEY_VALUE) {\n" +
" required binary key (UTF8);\n" +
" required int32 value;\n" +
" }\n" +
" }\n" +
" required group myemptymap (MAP) {\n" +
" repeated group map (MAP_KEY_VALUE) {\n" +
" required binary key (UTF8);\n" +
" required int32 value;\n" +
" }\n" +
" }\n" +
" required fixed_len_byte_array(1) myfixed;\n" +
"}\n");
}
@Test
public void testAllTypesOldListBehavior() throws Exception {
Schema schema = new Schema.Parser().parse(
Resources.getResource("all.avsc").openStream());
testAvroToParquetConversion(
schema,
"message org.apache.parquet.avro.myrecord {\n" +
// Avro nulls are not encoded, unless they are null unions
" required boolean myboolean;\n" +
" required int32 myint;\n" +
" required int64 mylong;\n" +
" required float myfloat;\n" +
" required double mydouble;\n" +
" required binary mybytes;\n" +
" required binary mystring (UTF8);\n" +
" required group mynestedrecord {\n" +
" required int32 mynestedint;\n" +
" }\n" +
" required binary myenum (ENUM);\n" +
" required group myarray (LIST) {\n" +
" repeated int32 array;\n" +
" }\n" +
" required group myemptyarray (LIST) {\n" +
" repeated int32 array;\n" +
" }\n" +
" optional group myoptionalarray (LIST) {\n" +
" repeated int32 array;\n" +
" }\n" +
" required group myarrayofoptional (LIST) {\n" +
" repeated int32 array;\n" +
" }\n" +
" required group mymap (MAP) {\n" +
" repeated group map (MAP_KEY_VALUE) {\n" +
" required binary key (UTF8);\n" +
" required int32 value;\n" +
" }\n" +
" }\n" +
" required group myemptymap (MAP) {\n" +
" repeated group map (MAP_KEY_VALUE) {\n" +
" required binary key (UTF8);\n" +
" required int32 value;\n" +
" }\n" +
" }\n" +
" required fixed_len_byte_array(1) myfixed;\n" +
"}\n");
}
@Test
public void testAllTypesParquetToAvro() throws Exception {
Schema schema = new Schema.Parser().parse(
Resources.getResource("allFromParquetNewBehavior.avsc").openStream());
// Cannot use round-trip assertion because enum is lost
testParquetToAvroConversion(NEW_BEHAVIOR, schema, ALL_PARQUET_SCHEMA);
}
@Test
public void testAllTypesParquetToAvroOldBehavior() throws Exception {
Schema schema = new Schema.Parser().parse(
Resources.getResource("allFromParquetOldBehavior.avsc").openStream());
// Cannot use round-trip assertion because enum is lost
testParquetToAvroConversion(schema, ALL_PARQUET_SCHEMA);
}
@Test(expected = IllegalArgumentException.class)
public void testParquetMapWithNonStringKeyFails() throws Exception {
MessageType parquetSchema = MessageTypeParser.parseMessageType(
"message myrecord {\n" +
" required group mymap (MAP) {\n" +
" repeated group map (MAP_KEY_VALUE) {\n" +
" required int32 key;\n" +
" required int32 value;\n" +
" }\n" +
" }\n" +
"}\n"
);
new AvroSchemaConverter().convert(parquetSchema);
}
@Test
public void testOptionalFields() throws Exception {
Schema schema = Schema.createRecord("record1", null, null, false);
Schema optionalInt = optional(Schema.create(INT));
schema.setFields(Collections.singletonList(
new Schema.Field("myint", optionalInt, null, JsonProperties.NULL_VALUE)
));
testRoundTripConversion(
schema,
"message record1 {\n" +
" optional int32 myint;\n" +
"}\n");
}
@Test
public void testOptionalMapValue() throws Exception {
Schema schema = Schema.createRecord("record1", null, null, false);
Schema optionalIntMap = Schema.createMap(optional(Schema.create(INT)));
schema.setFields(Arrays.asList(
new Schema.Field("myintmap", optionalIntMap, null, null)
));
testRoundTripConversion(
schema,
"message record1 {\n" +
" required group myintmap (MAP) {\n" +
" repeated group map (MAP_KEY_VALUE) {\n" +
" required binary key (UTF8);\n" +
" optional int32 value;\n" +
" }\n" +
" }\n" +
"}\n");
}
@Test
public void testOptionalArrayElement() throws Exception {
Schema schema = Schema.createRecord("record1", null, null, false);
Schema optionalIntArray = Schema.createArray(optional(Schema.create(INT)));
schema.setFields(Arrays.asList(
new Schema.Field("myintarray", optionalIntArray, null, null)
));
testRoundTripConversion(
NEW_BEHAVIOR, schema,
"message record1 {\n" +
" required group myintarray (LIST) {\n" +
" repeated group list {\n" +
" optional int32 element;\n" +
" }\n" +
" }\n" +
"}\n");
}
@Test
public void testUnionOfTwoTypes() throws Exception {
Schema schema = Schema.createRecord("record2", null, null, false);
Schema multipleTypes = Schema.createUnion(Arrays.asList(Schema.create(Schema.Type
.NULL),
Schema.create(INT),
Schema.create(Schema.Type.FLOAT)));
schema.setFields(Arrays.asList(
new Schema.Field("myunion", multipleTypes, null, JsonProperties.NULL_VALUE)));
// Avro union is modelled using optional data members of the different
// types. This does not translate back into an Avro union
testAvroToParquetConversion(
schema,
"message record2 {\n" +
" optional group myunion {\n" +
" optional int32 member0;\n" +
" optional float member1;\n" +
" }\n" +
"}\n");
}
@Test
public void testArrayOfOptionalRecords() throws Exception {
Schema innerRecord = Schema.createRecord("element", null, null, false);
Schema optionalString = optional(Schema.create(Schema.Type.STRING));
innerRecord.setFields(Lists.newArrayList(
new Schema.Field("s1", optionalString, null, JsonProperties.NULL_VALUE),
new Schema.Field("s2", optionalString, null, JsonProperties.NULL_VALUE)
));
Schema schema = Schema.createRecord("HasArray", null, null, false);
schema.setFields(Lists.newArrayList(
new Schema.Field("myarray", Schema.createArray(optional(innerRecord)),
null, null)
));
System.err.println("Avro schema: " + schema.toString(true));
testRoundTripConversion(NEW_BEHAVIOR, schema, "message HasArray {\n" +
" required group myarray (LIST) {\n" +
" repeated group list {\n" +
" optional group element {\n" +
" optional binary s1 (UTF8);\n" +
" optional binary s2 (UTF8);\n" +
" }\n" +
" }\n" +
" }\n" +
"}\n");
}
@Test
public void testArrayOfOptionalRecordsOldBehavior() throws Exception {
Schema innerRecord = Schema.createRecord("InnerRecord", null, null, false);
Schema optionalString = optional(Schema.create(Schema.Type.STRING));
innerRecord.setFields(Lists.newArrayList(
new Schema.Field("s1", optionalString, null, JsonProperties.NULL_VALUE),
new Schema.Field("s2", optionalString, null, JsonProperties.NULL_VALUE)
));
Schema schema = Schema.createRecord("HasArray", null, null, false);
schema.setFields(Lists.newArrayList(
new Schema.Field("myarray", Schema.createArray(optional(innerRecord)),
null, null)
));
System.err.println("Avro schema: " + schema.toString(true));
// Cannot use round-trip assertion because InnerRecord optional is removed
testAvroToParquetConversion(schema, "message HasArray {\n" +
" required group myarray (LIST) {\n" +
" repeated group array {\n" +
" optional binary s1 (UTF8);\n" +
" optional binary s2 (UTF8);\n" +
" }\n" +
" }\n" +
"}\n");
}
@Test
public void testOldAvroListOfLists() throws Exception {
Schema listOfLists = optional(Schema.createArray(Schema.createArray(
Schema.create(INT))));
Schema schema = Schema.createRecord("AvroCompatListInList", null, null, false);
schema.setFields(Lists.newArrayList(
new Schema.Field("listOfLists", listOfLists, null, JsonProperties.NULL_VALUE)
));
System.err.println("Avro schema: " + schema.toString(true));
testRoundTripConversion(schema,
"message AvroCompatListInList {\n" +
" optional group listOfLists (LIST) {\n" +
" repeated group array (LIST) {\n" +
" repeated int32 array;\n" +
" }\n" +
" }\n" +
"}");
// Cannot use round-trip assertion because 3-level representation is used
testParquetToAvroConversion(NEW_BEHAVIOR, schema,
"message AvroCompatListInList {\n" +
" optional group listOfLists (LIST) {\n" +
" repeated group array (LIST) {\n" +
" repeated int32 array;\n" +
" }\n" +
" }\n" +
"}");
}
@Test
public void testOldThriftListOfLists() throws Exception {
Schema listOfLists = optional(Schema.createArray(Schema.createArray(
Schema.create(INT))));
Schema schema = Schema.createRecord("ThriftCompatListInList", null, null, false);
schema.setFields(Lists.newArrayList(
new Schema.Field("listOfLists", listOfLists, null, JsonProperties.NULL_VALUE)
));
System.err.println("Avro schema: " + schema.toString(true));
// Cannot use round-trip assertion because repeated group names differ
testParquetToAvroConversion(schema,
"message ThriftCompatListInList {\n" +
" optional group listOfLists (LIST) {\n" +
" repeated group listOfLists_tuple (LIST) {\n" +
" repeated int32 listOfLists_tuple_tuple;\n" +
" }\n" +
" }\n" +
"}");
// Cannot use round-trip assertion because 3-level representation is used
testParquetToAvroConversion(NEW_BEHAVIOR, schema,
"message ThriftCompatListInList {\n" +
" optional group listOfLists (LIST) {\n" +
" repeated group listOfLists_tuple (LIST) {\n" +
" repeated int32 listOfLists_tuple_tuple;\n" +
" }\n" +
" }\n" +
"}");
}
@Test
public void testUnknownTwoLevelListOfLists() throws Exception {
// This tests the case where we don't detect a 2-level list by the repeated
// group's name, but it must be 2-level because the repeated group doesn't
// contain an optional or repeated element as required for 3-level lists
Schema listOfLists = optional(Schema.createArray(Schema.createArray(
Schema.create(INT))));
Schema schema = Schema.createRecord("UnknownTwoLevelListInList", null, null, false);
schema.setFields(Lists.newArrayList(
new Schema.Field("listOfLists", listOfLists, null, JsonProperties.NULL_VALUE)
));
System.err.println("Avro schema: " + schema.toString(true));
// Cannot use round-trip assertion because repeated group names differ
testParquetToAvroConversion(schema,
"message UnknownTwoLevelListInList {\n" +
" optional group listOfLists (LIST) {\n" +
" repeated group mylist (LIST) {\n" +
" repeated int32 innerlist;\n" +
" }\n" +
" }\n" +
"}");
// Cannot use round-trip assertion because 3-level representation is used
testParquetToAvroConversion(NEW_BEHAVIOR, schema,
"message UnknownTwoLevelListInList {\n" +
" optional group listOfLists (LIST) {\n" +
" repeated group mylist (LIST) {\n" +
" repeated int32 innerlist;\n" +
" }\n" +
" }\n" +
"}");
}
@Test
public void testParquetMapWithoutMapKeyValueAnnotation() throws Exception {
Schema schema = Schema.createRecord("myrecord", null, null, false);
Schema map = Schema.createMap(Schema.create(INT));
schema.setFields(Collections.singletonList(new Schema.Field("mymap", map, null, null)));
String parquetSchema =
"message myrecord {\n" +
" required group mymap (MAP) {\n" +
" repeated group map {\n" +
" required binary key (UTF8);\n" +
" required int32 value;\n" +
" }\n" +
" }\n" +
"}\n";
testParquetToAvroConversion(schema, parquetSchema);
testParquetToAvroConversion(NEW_BEHAVIOR, schema, parquetSchema);
}
@Test
public void testDecimalBytesType() throws Exception {
Schema schema = Schema.createRecord("myrecord", null, null, false);
Schema decimal = LogicalTypes.decimal(9, 2).addToSchema(
Schema.create(Schema.Type.BYTES));
schema.setFields(Collections.singletonList(
new Schema.Field("dec", decimal, null, null)));
testRoundTripConversion(schema,
"message myrecord {\n" +
" required binary dec (DECIMAL(9,2));\n" +
"}\n");
}
@Test
public void testDecimalFixedType() throws Exception {
Schema schema = Schema.createRecord("myrecord", null, null, false);
Schema decimal = LogicalTypes.decimal(9, 2).addToSchema(
Schema.createFixed("dec", null, null, 8));
schema.setFields(Collections.singletonList(
new Schema.Field("dec", decimal, null, null)));
testRoundTripConversion(schema,
"message myrecord {\n" +
" required fixed_len_byte_array(8) dec (DECIMAL(9,2));\n" +
"}\n");
}
@Test
public void testDecimalIntegerType() throws Exception {
Schema expected = Schema.createRecord("myrecord", null, null, false,
Arrays.asList(new Schema.Field(
"dec", Schema.create(INT), null, null)));
// the decimal portion is lost because it isn't valid in Avro
testParquetToAvroConversion(expected,
"message myrecord {\n" +
" required int32 dec (DECIMAL(9,2));\n" +
"}\n");
}
@Test
public void testDecimalLongType() throws Exception {
Schema expected = Schema.createRecord("myrecord", null, null, false,
Arrays.asList(new Schema.Field("dec", Schema.create(LONG), null, null)));
// the decimal portion is lost because it isn't valid in Avro
testParquetToAvroConversion(expected,
"message myrecord {\n" +
" required int64 dec (DECIMAL(9,2));\n" +
"}\n");
}
@Test
public void testDateType() throws Exception {
Schema date = LogicalTypes.date().addToSchema(Schema.create(INT));
Schema expected = Schema.createRecord("myrecord", null, null, false,
Arrays.asList(new Schema.Field("date", date, null, null)));
testRoundTripConversion(expected,
"message myrecord {\n" +
" required int32 date (DATE);\n" +
"}\n");
for (PrimitiveTypeName primitive : new PrimitiveTypeName[]
{INT64, INT96, FLOAT, DOUBLE, BOOLEAN, BINARY, FIXED_LEN_BYTE_ARRAY}) {
final PrimitiveType type;
if (primitive == FIXED_LEN_BYTE_ARRAY) {
type = new PrimitiveType(REQUIRED, primitive, 12, "test", DATE);
} else {
type = new PrimitiveType(REQUIRED, primitive, "test", DATE);
}
assertThrows("Should not allow TIME_MICROS with " + primitive,
IllegalArgumentException.class, () -> new AvroSchemaConverter().convert(message(type)));
}
}
@Test
public void testTimeMillisType() throws Exception {
Schema date = LogicalTypes.timeMillis().addToSchema(Schema.create(INT));
Schema expected = Schema.createRecord("myrecord", null, null, false,
Arrays.asList(new Schema.Field("time", date, null, null)));
testRoundTripConversion(expected,
"message myrecord {\n" +
" required int32 time (TIME(MILLIS,true));\n" +
"}\n");
for (PrimitiveTypeName primitive : new PrimitiveTypeName[]
{INT64, INT96, FLOAT, DOUBLE, BOOLEAN, BINARY, FIXED_LEN_BYTE_ARRAY}) {
final PrimitiveType type;
if (primitive == FIXED_LEN_BYTE_ARRAY) {
type = new PrimitiveType(REQUIRED, primitive, 12, "test", TIME_MILLIS);
} else {
type = new PrimitiveType(REQUIRED, primitive, "test", TIME_MILLIS);
}
assertThrows("Should not allow TIME_MICROS with " + primitive,
IllegalArgumentException.class, () -> new AvroSchemaConverter().convert(message(type)));
}
}
@Test
public void testTimeMicrosType() throws Exception {
Schema date = LogicalTypes.timeMicros().addToSchema(Schema.create(LONG));
Schema expected = Schema.createRecord("myrecord", null, null, false,
Arrays.asList(new Schema.Field("time", date, null, null)));
testRoundTripConversion(expected,
"message myrecord {\n" +
" required int64 time (TIME(MICROS,true));\n" +
"}\n");
for (PrimitiveTypeName primitive : new PrimitiveTypeName[]
{INT32, INT96, FLOAT, DOUBLE, BOOLEAN, BINARY, FIXED_LEN_BYTE_ARRAY}) {
final PrimitiveType type;
if (primitive == FIXED_LEN_BYTE_ARRAY) {
type = new PrimitiveType(REQUIRED, primitive, 12, "test", TIME_MICROS);
} else {
type = new PrimitiveType(REQUIRED, primitive, "test", TIME_MICROS);
}
assertThrows("Should not allow TIME_MICROS with " + primitive,
IllegalArgumentException.class, () -> new AvroSchemaConverter().convert(message(type)));
}
}
@Test
public void testTimestampMillisType() throws Exception {
Schema date = LogicalTypes.timestampMillis().addToSchema(Schema.create(LONG));
Schema expected = Schema.createRecord("myrecord", null, null, false,
Arrays.asList(new Schema.Field("timestamp", date, null, null)));
testRoundTripConversion(expected,
"message myrecord {\n" +
" required int64 timestamp (TIMESTAMP(MILLIS,true));\n" +
"}\n");
for (PrimitiveTypeName primitive : new PrimitiveTypeName[]
{INT32, INT96, FLOAT, DOUBLE, BOOLEAN, BINARY, FIXED_LEN_BYTE_ARRAY}) {
final PrimitiveType type;
if (primitive == FIXED_LEN_BYTE_ARRAY) {
type = new PrimitiveType(REQUIRED, primitive, 12, "test", TIMESTAMP_MILLIS);
} else {
type = new PrimitiveType(REQUIRED, primitive, "test", TIMESTAMP_MILLIS);
}
assertThrows("Should not allow TIMESTAMP_MILLIS with " + primitive,
IllegalArgumentException.class, () -> new AvroSchemaConverter().convert(message(type)));
}
}
@Test
public void testTimestampMicrosType() throws Exception {
Schema date = LogicalTypes.timestampMicros().addToSchema(Schema.create(LONG));
Schema expected = Schema.createRecord("myrecord", null, null, false,
Arrays.asList(new Schema.Field("timestamp", date, null, null)));
testRoundTripConversion(expected,
"message myrecord {\n" +
" required int64 timestamp (TIMESTAMP(MICROS,true));\n" +
"}\n");
for (PrimitiveTypeName primitive : new PrimitiveTypeName[]
{INT32, INT96, FLOAT, DOUBLE, BOOLEAN, BINARY, FIXED_LEN_BYTE_ARRAY}) {
final PrimitiveType type;
if (primitive == FIXED_LEN_BYTE_ARRAY) {
type = new PrimitiveType(REQUIRED, primitive, 12, "test", TIMESTAMP_MICROS);
} else {
type = new PrimitiveType(REQUIRED, primitive, "test", TIMESTAMP_MICROS);
}
assertThrows("Should not allow TIMESTAMP_MICROS with " + primitive,
IllegalArgumentException.class, () -> new AvroSchemaConverter().convert(message(type)));
}
}
@Test
public void testReuseNameInNestedStructure() throws Exception {
Schema innerA1 = record("a1", "a12",
field("a4", primitive(Schema.Type.FLOAT)));
Schema outerA1 = record("a1",
field("a2", primitive(Schema.Type.FLOAT)),
optionalField("a1", innerA1));
Schema schema = record("Message",
optionalField("a1", outerA1));
String parquetSchema = "message Message {\n" +
" optional group a1 {\n" +
" required float a2;\n" +
" optional group a1 {\n" +
" required float a4;\n"+
" }\n" +
" }\n" +
"}\n";
testParquetToAvroConversion(schema, parquetSchema);
testParquetToAvroConversion(NEW_BEHAVIOR, schema, parquetSchema);
}
@Test
public void testReuseNameInNestedStructureAtSameLevel() throws Exception {
Schema a2 = record("a2",
field("a4", primitive(Schema.Type.FLOAT)));
Schema a22 = record("a2", "a22",
field("a4", primitive(Schema.Type.FLOAT)),
field("a5", primitive(Schema.Type.FLOAT)));
Schema a1 = record("a1",
optionalField("a2", a2));
Schema a3 = record("a3",
optionalField("a2", a22));
Schema schema = record("Message",
optionalField("a1", a1),
optionalField("a3", a3));
String parquetSchema = "message Message {\n" +
" optional group a1 {\n" +
" optional group a2 {\n" +
" required float a4;\n"+
" }\n" +
" }\n" +
" optional group a3 {\n" +
" optional group a2 {\n" +
" required float a4;\n"+
" required float a5;\n"+
" }\n" +
" }\n" +
"}\n";
testParquetToAvroConversion(schema, parquetSchema);
testParquetToAvroConversion(NEW_BEHAVIOR, schema, parquetSchema);
}
public static Schema optional(Schema original) {
return Schema.createUnion(Lists.newArrayList(
Schema.create(Schema.Type.NULL),
original));
}
public static MessageType message(PrimitiveType primitive) {
return Types.buildMessage()
.addField(primitive)
.named("myrecord");
}
/**
* A convenience method to avoid a large number of @Test(expected=...) tests
* @param message A String message to describe this assertion
* @param expected An Exception class that the Runnable should throw
* @param runnable A Runnable that is expected to throw the exception
*/
public static void assertThrows(
String message, Class<? extends Exception> expected, Runnable runnable) {
try {
runnable.run();
Assert.fail("No exception was thrown (" + message + "), expected: " +
expected.getName());
} catch (Exception actual) {
try {
Assert.assertEquals(message, expected, actual.getClass());
} catch (AssertionError e) {
e.addSuppressed(actual);
throw e;
}
}
}
}