| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with this |
| * work for additional information regarding copyright ownership. The ASF |
| * licenses this file to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| * License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.pig.piggybank.test.storage.avro; |
| |
| import org.apache.avro.Schema; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.pig.piggybank.storage.avro.AvroStorageUtils; |
| import org.junit.Assert; |
| import org.junit.Test; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.Set; |
| |
| import static org.junit.Assert.*; |
| |
| public class TestAvroStorageUtils { |
| |
| // Common elements of test records |
| private final String TYPE_RECORD = "{ \"type\" : \"record\", "; |
| private final String NAME_NODE = "\"name\": \"Node\" , "; |
| private final String FIELDS_VALUE = " \"fields\": [ { \"name\": \"value\", \"type\":\"int\"}, "; |
| |
| public final String RECORD_BEGINNING = TYPE_RECORD + NAME_NODE + FIELDS_VALUE; |
| |
| @Test |
| public void testGenericUnion() throws IOException { |
| |
| final String str1 = "[ \"string\", \"int\", \"boolean\" ]"; |
| Schema s = Schema.parse(str1); |
| assertTrue(AvroStorageUtils.containsGenericUnion(s)); |
| |
| final String str2 = "[ \"string\", \"int\", \"null\" ]"; |
| s = Schema.parse(str2); |
| assertTrue(AvroStorageUtils.containsGenericUnion(s)); |
| |
| final String str3 = "[ \"string\", \"null\" ]"; |
| s = Schema.parse(str3); |
| assertFalse(AvroStorageUtils.containsGenericUnion(s)); |
| Schema realSchema = AvroStorageUtils.getAcceptedType(s); |
| assertEquals(AvroStorageUtils.StringSchema, realSchema); |
| |
| final String str4 = "{\"type\": \"array\", \"items\": " + str2 + "}"; |
| s = Schema.parse(str4); |
| assertTrue(AvroStorageUtils.containsGenericUnion(s)); |
| try { |
| realSchema = AvroStorageUtils.getAcceptedType(s); |
| fail("\"Should throw a runtime exception when trying to get accepted type from a unacceptable union"); |
| } catch (Exception e) { |
| assertTrue(e.getMessage().contains("Cannot call this function on a unacceptable union")); |
| } |
| |
| final String str5 = RECORD_BEGINNING + |
| "{ \"name\": \"next\", \"type\": [\"null\", \"int\"] } ] }"; |
| s = Schema.parse(str5); |
| assertFalse(AvroStorageUtils.containsGenericUnion(s)); |
| |
| final String str6 = RECORD_BEGINNING + |
| "{ \"name\": \"next\", \"type\": [\"string\", \"int\"] } ] }"; |
| s = Schema.parse(str6); |
| assertTrue(AvroStorageUtils.containsGenericUnion(s)); |
| |
| final String str7 = "[ \"string\" ]"; /*union with one type*/ |
| s = Schema.parse(str7); |
| assertFalse(AvroStorageUtils.containsGenericUnion(s)); |
| realSchema = AvroStorageUtils.getAcceptedType(s); |
| assertEquals(AvroStorageUtils.StringSchema, realSchema); |
| |
| final String str8 = "[ ]"; /*union with no type*/ |
| s = Schema.parse(str8); |
| assertFalse(AvroStorageUtils.containsGenericUnion(s)); |
| realSchema = AvroStorageUtils.getAcceptedType(s); |
| assertNull(realSchema); |
| } |
| |
| // test merging null and non-null |
| @Test |
| public void testMergeSchema1() throws IOException { |
| Schema nonNull = Schema.create(Schema.Type.INT); |
| assertEquals(AvroStorageUtils.mergeSchema(null, null), null); |
| assertEquals(AvroStorageUtils.mergeSchema(null, nonNull), nonNull); |
| assertEquals(AvroStorageUtils.mergeSchema(nonNull, null), nonNull); |
| } |
| |
| // test merging primitive types |
| @Test |
| public void testMergeSchema2() throws IOException { |
| Schema.Type primitiveType[] = { |
| Schema.Type.NULL, |
| Schema.Type.BOOLEAN, |
| Schema.Type.BYTES, |
| Schema.Type.INT, |
| Schema.Type.LONG, |
| Schema.Type.FLOAT, |
| Schema.Type.DOUBLE, |
| Schema.Type.ENUM, |
| Schema.Type.STRING, |
| }; |
| |
| // First index is type1, and second index is type2. Note that Schema.Type.NULL |
| // means the Avro null type while null means undefined. |
| Schema.Type expectedType[][] = { |
| { |
| Schema.Type.NULL, // = null + null |
| null, // = null + boolean |
| null, // = null + bytes |
| null, // = null + int |
| null, // = null + long |
| null, // = null + float |
| null, // = null + double |
| null, // = null + enum |
| null, // = null + string |
| }, |
| { |
| null, // = boolean + null |
| Schema.Type.BOOLEAN, // = boolean + boolean |
| null, // = boolean + bytes |
| null, // = boolean + int |
| null, // = boolean + long |
| null, // = boolean + float |
| null, // = boolean + double |
| null, // = boolean + enum |
| null, // = boolean + string |
| }, |
| { |
| null, // = bytes + null |
| null, // = bytes + boolean |
| Schema.Type.BYTES, // = bytes + bytes |
| null, // = bytes + int |
| null, // = bytes + long |
| null, // = bytes + float |
| null, // = bytes + double |
| null, // = bytes + enum |
| null, // = bytes + string |
| }, |
| { |
| null, // = int + null |
| null, // = int + boolean |
| null, // = int + bytes |
| Schema.Type.INT, // = int + int |
| Schema.Type.LONG, // = int + long |
| Schema.Type.FLOAT, // = int + float |
| Schema.Type.DOUBLE, // = int + double |
| Schema.Type.STRING, // = int + enum |
| Schema.Type.STRING, // = int + string |
| }, |
| { |
| null, // = long + null |
| null, // = long + boolean |
| null, // = long + bytes |
| Schema.Type.LONG, // = long + int |
| Schema.Type.LONG, // = long + long |
| Schema.Type.FLOAT, // = long + float |
| Schema.Type.DOUBLE, // = long + double |
| Schema.Type.STRING, // = long + enum |
| Schema.Type.STRING, // = long + string |
| }, |
| { |
| null, // = float + null |
| null, // = float + boolean |
| null, // = float + bytes |
| Schema.Type.FLOAT, // = float + int |
| Schema.Type.FLOAT, // = float + long |
| Schema.Type.FLOAT, // = float + float |
| Schema.Type.DOUBLE, // = float + double |
| Schema.Type.STRING, // = float + enum |
| Schema.Type.STRING, // = float + string |
| }, |
| { |
| null, // = double + null |
| null, // = double + boolean |
| null, // = double + bytes |
| Schema.Type.DOUBLE, // = double + int |
| Schema.Type.DOUBLE, // = double + long |
| Schema.Type.DOUBLE, // = double + float |
| Schema.Type.DOUBLE, // = double + double |
| Schema.Type.STRING, // = double + enum |
| Schema.Type.STRING, // = double + string |
| }, |
| { |
| null, // = enum + null |
| null, // = enum + boolean |
| null, // = enum + bytes |
| Schema.Type.STRING, // = enum + int |
| Schema.Type.STRING, // = enum + long |
| Schema.Type.STRING, // = enum + float |
| Schema.Type.STRING, // = enum + double |
| Schema.Type.ENUM, // = enum + enum |
| Schema.Type.STRING, // = enum + string |
| }, |
| { |
| null, // = string + null |
| null, // = string + boolean |
| null, // = string + bytes |
| Schema.Type.STRING, // = string + int |
| Schema.Type.STRING, // = string + long |
| Schema.Type.STRING, // = string + float |
| Schema.Type.STRING, // = string + double |
| Schema.Type.STRING, // = string + enum |
| Schema.Type.STRING, // = string + string |
| }, |
| }; |
| |
| List<String> enumSymbols = new ArrayList<String>(); |
| enumSymbols.add("sym1"); |
| enumSymbols.add("sym2"); |
| Schema enumSchema = Schema.createEnum("enum", null, null, enumSymbols); |
| |
| for (int i = 0; i < primitiveType.length; i++) { |
| Schema.Type x = primitiveType[i]; |
| for (int j = 0; j < primitiveType.length; j++) { |
| Schema.Type y = primitiveType[j]; |
| if (expectedType[i][j] == null) { |
| try { |
| Schema z = AvroStorageUtils.mergeSchema( |
| x.equals(Schema.Type.ENUM) ? enumSchema : Schema.create(x), |
| y.equals(Schema.Type.ENUM) ? enumSchema : Schema.create(y)); |
| Assert.fail("exception is expected, but " + z.getType() + " is returned"); |
| } catch (IOException e) { |
| assertEquals("Cannot merge "+ x +" with " +y, e.getMessage()); |
| } |
| } else { |
| Schema z = AvroStorageUtils.mergeSchema( |
| x.equals(Schema.Type.ENUM) ? enumSchema : Schema.create(x), |
| y.equals(Schema.Type.ENUM) ? enumSchema : Schema.create(y)); |
| assertEquals(expectedType[i][j], z.getType()); |
| } |
| } |
| } |
| } |
| |
| // test merging different kinds of complex types (negative test) |
| // different kinds of complex types cannot be merged |
| @Test |
| public void testMergeSchema3() throws IOException { |
| Schema complexType[] = { |
| Schema.createRecord(new ArrayList<Schema.Field>()), |
| Schema.createArray(Schema.create(Schema.Type.INT)), |
| Schema.createMap(Schema.create(Schema.Type.INT)), |
| Schema.createUnion(new ArrayList<Schema>()), |
| Schema.createFixed("fixed", null, null, 1), |
| }; |
| |
| for (int i = 0; i < complexType.length; i++) { |
| Schema x = complexType[i]; |
| for (int j = 0; j < complexType.length; j++) { |
| Schema y = complexType[j]; |
| if (i != j) { |
| try { |
| Schema z = AvroStorageUtils.mergeSchema(x, y); |
| Assert.fail("exception is expected, but " + z.getType() + " is returned"); |
| } catch (IOException e) { |
| assertEquals("Cannot merge "+ x.getType()+ " with "+ y.getType(), e.getMessage()); |
| } |
| } |
| } |
| } |
| } |
| |
| // test merging two records |
| @Test |
| public void testMergeSchema4() throws IOException { |
| Schema x, y, z; |
| |
| // fields have the same names and types |
| List<Schema.Field> fields1 = new ArrayList<Schema.Field>(); |
| fields1.add(new Schema.Field("f1", Schema.create(Schema.Type.INT), null, null)); |
| x = Schema.createRecord(fields1); |
| |
| List<Schema.Field> fields2 = new ArrayList<Schema.Field>(); |
| fields2.add(new Schema.Field("f1", Schema.create(Schema.Type.INT), null, null)); |
| y = Schema.createRecord(fields2); |
| |
| z = AvroStorageUtils.mergeSchema(x, y); |
| assertEquals(x.getFields().size(), z.getFields().size()); |
| assertEquals(Schema.Type.INT, z.getField("f1").schema().getType()); |
| |
| // fields have the same names and mergeable types |
| List<Schema.Field> fields3 = new ArrayList<Schema.Field>(); |
| fields3.add(new Schema.Field("f1", Schema.create(Schema.Type.INT), null, null)); |
| x = Schema.createRecord(fields3); |
| |
| List<Schema.Field> fields4 = new ArrayList<Schema.Field>(); |
| fields4.add(new Schema.Field("f1", Schema.create(Schema.Type.DOUBLE), null, null)); |
| y = Schema.createRecord(fields4); |
| |
| z = AvroStorageUtils.mergeSchema(x, y); |
| assertEquals(x.getFields().size(), z.getFields().size()); |
| assertEquals(Schema.Type.DOUBLE, z.getField("f1").schema().getType()); |
| |
| // fields have the same names but not mergeable types |
| List<Schema.Field> fields5 = new ArrayList<Schema.Field>(); |
| fields5.add(new Schema.Field("f1", Schema.create(Schema.Type.INT), null, null)); |
| x = Schema.createRecord(fields5); |
| |
| List<Schema.Field> fields6 = new ArrayList<Schema.Field>(); |
| fields6.add(new Schema.Field("f1", Schema.create(Schema.Type.BOOLEAN), null, null)); |
| y = Schema.createRecord(fields6); |
| |
| try { |
| z = AvroStorageUtils.mergeSchema(x, y); |
| Assert.fail("exception is expected, but " + z.getType() + " is returned"); |
| } catch (IOException e) { |
| assertEquals("Cannot merge "+ x.getField("f1").schema().getType() + |
| " with "+ y.getField("f1").schema().getType(), e.getMessage()); |
| } |
| |
| // fields have different names |
| List<Schema.Field> fields7 = new ArrayList<Schema.Field>(); |
| fields7.add(new Schema.Field("f1", Schema.create(Schema.Type.INT), null, null)); |
| x = Schema.createRecord(fields7); |
| |
| List<Schema.Field> fields8 = new ArrayList<Schema.Field>(); |
| fields8.add(new Schema.Field("f2", Schema.create(Schema.Type.DOUBLE), null, null)); |
| y = Schema.createRecord(fields8); |
| |
| z = AvroStorageUtils.mergeSchema(x, y); |
| assertEquals(x.getFields().size() + y.getFields().size(), z.getFields().size()); |
| assertEquals(Schema.Type.INT, z.getField("f1").schema().getType()); |
| assertEquals(Schema.Type.DOUBLE, z.getField("f2").schema().getType()); |
| } |
| |
| // test merging two maps |
| @Test |
| public void testMergeSchema5() throws IOException { |
| Schema x, y, z; |
| |
| // element types are mergeable |
| x = Schema.createArray(Schema.create(Schema.Type.INT)); |
| y = Schema.createArray(Schema.create(Schema.Type.DOUBLE)); |
| |
| z = AvroStorageUtils.mergeSchema(x, y); |
| assertEquals(Schema.Type.ARRAY, z.getType()); |
| assertEquals(Schema.Type.DOUBLE, z.getElementType().getType()); |
| |
| // element types are not mergeable |
| x = Schema.createArray(Schema.create(Schema.Type.INT)); |
| y = Schema.createArray(Schema.create(Schema.Type.BOOLEAN)); |
| |
| try { |
| z = AvroStorageUtils.mergeSchema(x, y); |
| Assert.fail("exception is expected, but " + z.getType() + " is returned"); |
| } catch (IOException e) { |
| assertEquals("Cannot merge "+ x.getElementType().getType() + |
| " with "+ y.getElementType().getType(), e.getMessage()); |
| } |
| } |
| |
| // test merging two maps |
| @Test |
| public void testMergeSchema6() throws IOException { |
| Schema x, y, z; |
| |
| // value types are mergeable |
| x = Schema.createMap(Schema.create(Schema.Type.INT)); |
| y = Schema.createMap(Schema.create(Schema.Type.DOUBLE)); |
| |
| z = AvroStorageUtils.mergeSchema(x, y); |
| assertEquals(Schema.Type.MAP, z.getType()); |
| assertEquals(Schema.Type.DOUBLE, z.getValueType().getType()); |
| |
| // value types are not mergeable |
| x = Schema.createMap(Schema.create(Schema.Type.INT)); |
| y = Schema.createMap(Schema.create(Schema.Type.BOOLEAN)); |
| |
| try { |
| z = AvroStorageUtils.mergeSchema(x, y); |
| Assert.fail("exception is expected, but " + z.getType() + " is returned"); |
| } catch (IOException e) { |
| assertEquals("Cannot merge "+ x.getValueType().getType() + |
| " with "+ y.getValueType().getType(), e.getMessage()); |
| } |
| } |
| |
| // test merging two unions |
| @Test |
| public void testMergeSchema7() throws IOException { |
| Schema x, y, z; |
| |
| List<Schema> types1 = new ArrayList<Schema>(); |
| types1.add(Schema.create(Schema.Type.INT)); |
| List<Schema> types2 = new ArrayList<Schema>(); |
| types2.add(Schema.create(Schema.Type.DOUBLE)); |
| |
| x = Schema.createUnion(types1); |
| y = Schema.createUnion(types2); |
| |
| z = AvroStorageUtils.mergeSchema(x, y); |
| assertEquals(Schema.Type.UNION, z.getType()); |
| assertTrue(z.getTypes().contains(types1.get(0))); |
| assertTrue(z.getTypes().contains(types2.get(0))); |
| } |
| |
| // test merging two fixeds |
| @Test |
| public void testMergeSchema8() throws IOException { |
| Schema x, y, z; |
| |
| x = Schema.createFixed("fixed1", null, null, 1); |
| y = Schema.createFixed("fixed2", null, null, 1); |
| |
| z = AvroStorageUtils.mergeSchema(x, y); |
| assertEquals(Schema.Type.FIXED, z.getType()); |
| assertEquals(x.getFixedSize(), z.getFixedSize()); |
| |
| x = Schema.createFixed("fixed1", null, null, 1); |
| y = Schema.createFixed("fixed2", null, null, 2); |
| |
| try { |
| z = AvroStorageUtils.mergeSchema(x, y); |
| Assert.fail("exception is expected, but " + z.getType() + " is returned"); |
| } catch (IOException e) { |
| assertTrue(e.getMessage().contains("Cannot merge FIXED types with different sizes")); |
| } |
| } |
| } |