blob: 7d7d2b1349b136e8dbb4afb8c75f7168c524755a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.pig.piggybank.test.storage.avro;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.pig.piggybank.storage.avro.AvroStorageUtils;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import static org.junit.Assert.*;
public class TestAvroStorageUtils {
// Common elements of test records
private final String TYPE_RECORD = "{ \"type\" : \"record\", ";
private final String NAME_NODE = "\"name\": \"Node\" , ";
private final String FIELDS_VALUE = " \"fields\": [ { \"name\": \"value\", \"type\":\"int\"}, ";
public final String RECORD_BEGINNING = TYPE_RECORD + NAME_NODE + FIELDS_VALUE;
@Test
public void testGenericUnion() throws IOException {
final String str1 = "[ \"string\", \"int\", \"boolean\" ]";
Schema s = Schema.parse(str1);
assertTrue(AvroStorageUtils.containsGenericUnion(s));
final String str2 = "[ \"string\", \"int\", \"null\" ]";
s = Schema.parse(str2);
assertTrue(AvroStorageUtils.containsGenericUnion(s));
final String str3 = "[ \"string\", \"null\" ]";
s = Schema.parse(str3);
assertFalse(AvroStorageUtils.containsGenericUnion(s));
Schema realSchema = AvroStorageUtils.getAcceptedType(s);
assertEquals(AvroStorageUtils.StringSchema, realSchema);
final String str4 = "{\"type\": \"array\", \"items\": " + str2 + "}";
s = Schema.parse(str4);
assertTrue(AvroStorageUtils.containsGenericUnion(s));
try {
realSchema = AvroStorageUtils.getAcceptedType(s);
fail("\"Should throw a runtime exception when trying to get accepted type from a unacceptable union");
} catch (Exception e) {
assertTrue(e.getMessage().contains("Cannot call this function on a unacceptable union"));
}
final String str5 = RECORD_BEGINNING +
"{ \"name\": \"next\", \"type\": [\"null\", \"int\"] } ] }";
s = Schema.parse(str5);
assertFalse(AvroStorageUtils.containsGenericUnion(s));
final String str6 = RECORD_BEGINNING +
"{ \"name\": \"next\", \"type\": [\"string\", \"int\"] } ] }";
s = Schema.parse(str6);
assertTrue(AvroStorageUtils.containsGenericUnion(s));
final String str7 = "[ \"string\" ]"; /*union with one type*/
s = Schema.parse(str7);
assertFalse(AvroStorageUtils.containsGenericUnion(s));
realSchema = AvroStorageUtils.getAcceptedType(s);
assertEquals(AvroStorageUtils.StringSchema, realSchema);
final String str8 = "[ ]"; /*union with no type*/
s = Schema.parse(str8);
assertFalse(AvroStorageUtils.containsGenericUnion(s));
realSchema = AvroStorageUtils.getAcceptedType(s);
assertNull(realSchema);
}
// test merging null and non-null
@Test
public void testMergeSchema1() throws IOException {
Schema nonNull = Schema.create(Schema.Type.INT);
assertEquals(AvroStorageUtils.mergeSchema(null, null), null);
assertEquals(AvroStorageUtils.mergeSchema(null, nonNull), nonNull);
assertEquals(AvroStorageUtils.mergeSchema(nonNull, null), nonNull);
}
// test merging primitive types
@Test
public void testMergeSchema2() throws IOException {
Schema.Type primitiveType[] = {
Schema.Type.NULL,
Schema.Type.BOOLEAN,
Schema.Type.BYTES,
Schema.Type.INT,
Schema.Type.LONG,
Schema.Type.FLOAT,
Schema.Type.DOUBLE,
Schema.Type.ENUM,
Schema.Type.STRING,
};
// First index is type1, and second index is type2. Note that Schema.Type.NULL
// means the Avro null type while null means undefined.
Schema.Type expectedType[][] = {
{
Schema.Type.NULL, // = null + null
null, // = null + boolean
null, // = null + bytes
null, // = null + int
null, // = null + long
null, // = null + float
null, // = null + double
null, // = null + enum
null, // = null + string
},
{
null, // = boolean + null
Schema.Type.BOOLEAN, // = boolean + boolean
null, // = boolean + bytes
null, // = boolean + int
null, // = boolean + long
null, // = boolean + float
null, // = boolean + double
null, // = boolean + enum
null, // = boolean + string
},
{
null, // = bytes + null
null, // = bytes + boolean
Schema.Type.BYTES, // = bytes + bytes
null, // = bytes + int
null, // = bytes + long
null, // = bytes + float
null, // = bytes + double
null, // = bytes + enum
null, // = bytes + string
},
{
null, // = int + null
null, // = int + boolean
null, // = int + bytes
Schema.Type.INT, // = int + int
Schema.Type.LONG, // = int + long
Schema.Type.FLOAT, // = int + float
Schema.Type.DOUBLE, // = int + double
Schema.Type.STRING, // = int + enum
Schema.Type.STRING, // = int + string
},
{
null, // = long + null
null, // = long + boolean
null, // = long + bytes
Schema.Type.LONG, // = long + int
Schema.Type.LONG, // = long + long
Schema.Type.FLOAT, // = long + float
Schema.Type.DOUBLE, // = long + double
Schema.Type.STRING, // = long + enum
Schema.Type.STRING, // = long + string
},
{
null, // = float + null
null, // = float + boolean
null, // = float + bytes
Schema.Type.FLOAT, // = float + int
Schema.Type.FLOAT, // = float + long
Schema.Type.FLOAT, // = float + float
Schema.Type.DOUBLE, // = float + double
Schema.Type.STRING, // = float + enum
Schema.Type.STRING, // = float + string
},
{
null, // = double + null
null, // = double + boolean
null, // = double + bytes
Schema.Type.DOUBLE, // = double + int
Schema.Type.DOUBLE, // = double + long
Schema.Type.DOUBLE, // = double + float
Schema.Type.DOUBLE, // = double + double
Schema.Type.STRING, // = double + enum
Schema.Type.STRING, // = double + string
},
{
null, // = enum + null
null, // = enum + boolean
null, // = enum + bytes
Schema.Type.STRING, // = enum + int
Schema.Type.STRING, // = enum + long
Schema.Type.STRING, // = enum + float
Schema.Type.STRING, // = enum + double
Schema.Type.ENUM, // = enum + enum
Schema.Type.STRING, // = enum + string
},
{
null, // = string + null
null, // = string + boolean
null, // = string + bytes
Schema.Type.STRING, // = string + int
Schema.Type.STRING, // = string + long
Schema.Type.STRING, // = string + float
Schema.Type.STRING, // = string + double
Schema.Type.STRING, // = string + enum
Schema.Type.STRING, // = string + string
},
};
List<String> enumSymbols = new ArrayList<String>();
enumSymbols.add("sym1");
enumSymbols.add("sym2");
Schema enumSchema = Schema.createEnum("enum", null, null, enumSymbols);
for (int i = 0; i < primitiveType.length; i++) {
Schema.Type x = primitiveType[i];
for (int j = 0; j < primitiveType.length; j++) {
Schema.Type y = primitiveType[j];
if (expectedType[i][j] == null) {
try {
Schema z = AvroStorageUtils.mergeSchema(
x.equals(Schema.Type.ENUM) ? enumSchema : Schema.create(x),
y.equals(Schema.Type.ENUM) ? enumSchema : Schema.create(y));
Assert.fail("exception is expected, but " + z.getType() + " is returned");
} catch (IOException e) {
assertEquals("Cannot merge "+ x +" with " +y, e.getMessage());
}
} else {
Schema z = AvroStorageUtils.mergeSchema(
x.equals(Schema.Type.ENUM) ? enumSchema : Schema.create(x),
y.equals(Schema.Type.ENUM) ? enumSchema : Schema.create(y));
assertEquals(expectedType[i][j], z.getType());
}
}
}
}
// test merging different kinds of complex types (negative test)
// different kinds of complex types cannot be merged
@Test
public void testMergeSchema3() throws IOException {
Schema complexType[] = {
Schema.createRecord(new ArrayList<Schema.Field>()),
Schema.createArray(Schema.create(Schema.Type.INT)),
Schema.createMap(Schema.create(Schema.Type.INT)),
Schema.createUnion(new ArrayList<Schema>()),
Schema.createFixed("fixed", null, null, 1),
};
for (int i = 0; i < complexType.length; i++) {
Schema x = complexType[i];
for (int j = 0; j < complexType.length; j++) {
Schema y = complexType[j];
if (i != j) {
try {
Schema z = AvroStorageUtils.mergeSchema(x, y);
Assert.fail("exception is expected, but " + z.getType() + " is returned");
} catch (IOException e) {
assertEquals("Cannot merge "+ x.getType()+ " with "+ y.getType(), e.getMessage());
}
}
}
}
}
// test merging two records
@Test
public void testMergeSchema4() throws IOException {
Schema x, y, z;
// fields have the same names and types
List<Schema.Field> fields1 = new ArrayList<Schema.Field>();
fields1.add(new Schema.Field("f1", Schema.create(Schema.Type.INT), null, null));
x = Schema.createRecord(fields1);
List<Schema.Field> fields2 = new ArrayList<Schema.Field>();
fields2.add(new Schema.Field("f1", Schema.create(Schema.Type.INT), null, null));
y = Schema.createRecord(fields2);
z = AvroStorageUtils.mergeSchema(x, y);
assertEquals(x.getFields().size(), z.getFields().size());
assertEquals(Schema.Type.INT, z.getField("f1").schema().getType());
// fields have the same names and mergeable types
List<Schema.Field> fields3 = new ArrayList<Schema.Field>();
fields3.add(new Schema.Field("f1", Schema.create(Schema.Type.INT), null, null));
x = Schema.createRecord(fields3);
List<Schema.Field> fields4 = new ArrayList<Schema.Field>();
fields4.add(new Schema.Field("f1", Schema.create(Schema.Type.DOUBLE), null, null));
y = Schema.createRecord(fields4);
z = AvroStorageUtils.mergeSchema(x, y);
assertEquals(x.getFields().size(), z.getFields().size());
assertEquals(Schema.Type.DOUBLE, z.getField("f1").schema().getType());
// fields have the same names but not mergeable types
List<Schema.Field> fields5 = new ArrayList<Schema.Field>();
fields5.add(new Schema.Field("f1", Schema.create(Schema.Type.INT), null, null));
x = Schema.createRecord(fields5);
List<Schema.Field> fields6 = new ArrayList<Schema.Field>();
fields6.add(new Schema.Field("f1", Schema.create(Schema.Type.BOOLEAN), null, null));
y = Schema.createRecord(fields6);
try {
z = AvroStorageUtils.mergeSchema(x, y);
Assert.fail("exception is expected, but " + z.getType() + " is returned");
} catch (IOException e) {
assertEquals("Cannot merge "+ x.getField("f1").schema().getType() +
" with "+ y.getField("f1").schema().getType(), e.getMessage());
}
// fields have different names
List<Schema.Field> fields7 = new ArrayList<Schema.Field>();
fields7.add(new Schema.Field("f1", Schema.create(Schema.Type.INT), null, null));
x = Schema.createRecord(fields7);
List<Schema.Field> fields8 = new ArrayList<Schema.Field>();
fields8.add(new Schema.Field("f2", Schema.create(Schema.Type.DOUBLE), null, null));
y = Schema.createRecord(fields8);
z = AvroStorageUtils.mergeSchema(x, y);
assertEquals(x.getFields().size() + y.getFields().size(), z.getFields().size());
assertEquals(Schema.Type.INT, z.getField("f1").schema().getType());
assertEquals(Schema.Type.DOUBLE, z.getField("f2").schema().getType());
}
// test merging two maps
@Test
public void testMergeSchema5() throws IOException {
Schema x, y, z;
// element types are mergeable
x = Schema.createArray(Schema.create(Schema.Type.INT));
y = Schema.createArray(Schema.create(Schema.Type.DOUBLE));
z = AvroStorageUtils.mergeSchema(x, y);
assertEquals(Schema.Type.ARRAY, z.getType());
assertEquals(Schema.Type.DOUBLE, z.getElementType().getType());
// element types are not mergeable
x = Schema.createArray(Schema.create(Schema.Type.INT));
y = Schema.createArray(Schema.create(Schema.Type.BOOLEAN));
try {
z = AvroStorageUtils.mergeSchema(x, y);
Assert.fail("exception is expected, but " + z.getType() + " is returned");
} catch (IOException e) {
assertEquals("Cannot merge "+ x.getElementType().getType() +
" with "+ y.getElementType().getType(), e.getMessage());
}
}
// test merging two maps
@Test
public void testMergeSchema6() throws IOException {
Schema x, y, z;
// value types are mergeable
x = Schema.createMap(Schema.create(Schema.Type.INT));
y = Schema.createMap(Schema.create(Schema.Type.DOUBLE));
z = AvroStorageUtils.mergeSchema(x, y);
assertEquals(Schema.Type.MAP, z.getType());
assertEquals(Schema.Type.DOUBLE, z.getValueType().getType());
// value types are not mergeable
x = Schema.createMap(Schema.create(Schema.Type.INT));
y = Schema.createMap(Schema.create(Schema.Type.BOOLEAN));
try {
z = AvroStorageUtils.mergeSchema(x, y);
Assert.fail("exception is expected, but " + z.getType() + " is returned");
} catch (IOException e) {
assertEquals("Cannot merge "+ x.getValueType().getType() +
" with "+ y.getValueType().getType(), e.getMessage());
}
}
// test merging two unions
@Test
public void testMergeSchema7() throws IOException {
Schema x, y, z;
List<Schema> types1 = new ArrayList<Schema>();
types1.add(Schema.create(Schema.Type.INT));
List<Schema> types2 = new ArrayList<Schema>();
types2.add(Schema.create(Schema.Type.DOUBLE));
x = Schema.createUnion(types1);
y = Schema.createUnion(types2);
z = AvroStorageUtils.mergeSchema(x, y);
assertEquals(Schema.Type.UNION, z.getType());
assertTrue(z.getTypes().contains(types1.get(0)));
assertTrue(z.getTypes().contains(types2.get(0)));
}
// test merging two fixeds
@Test
public void testMergeSchema8() throws IOException {
Schema x, y, z;
x = Schema.createFixed("fixed1", null, null, 1);
y = Schema.createFixed("fixed2", null, null, 1);
z = AvroStorageUtils.mergeSchema(x, y);
assertEquals(Schema.Type.FIXED, z.getType());
assertEquals(x.getFixedSize(), z.getFixedSize());
x = Schema.createFixed("fixed1", null, null, 1);
y = Schema.createFixed("fixed2", null, null, 2);
try {
z = AvroStorageUtils.mergeSchema(x, y);
Assert.fail("exception is expected, but " + z.getType() + " is returned");
} catch (IOException e) {
assertTrue(e.getMessage().contains("Cannot merge FIXED types with different sizes"));
}
}
}