blob: 60081c334f9dd8b04249da2d2e9d5b72751784a1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import org.junit.Assert;
import org.apache.pig.EvalFunc;
import org.apache.pig.PigException;
import org.apache.pig.PigServer;
import org.apache.pig.builtin.mock.Storage;
import org.apache.pig.builtin.mock.Storage.Data;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
import org.apache.pig.impl.util.LogUtils;
import org.apache.pig.impl.util.Utils;
import org.apache.pig.parser.ParserException;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestUnionOnSchema {
private static final String INP_FILE_2NUMS = "TestUnionOnSchemaInput1";
private static final String INP_FILE_2NUM_1CHAR_1BAG = "TestUnionOnSchemaInput2";
private static final String INP_FILE_EMPTY= "TestUnionOnSchemaInput3";
private static final String INP_FILE_3NUMS = "TestUnionOnSchemaInput4";
@Before
public void setUp() throws Exception {
FileLocalizer.setInitialized(false);
}
@After
public void tearDown() throws Exception {
}
@BeforeClass
public static void oneTimeSetup() throws IOException, Exception {
// first input file
String[] input1 = {"1\t2","5\t3"};
Util.createLocalInputFile(INP_FILE_2NUMS, input1);
// 2nd input file
String[] input2 = {
"1\tabc\t2\t{(1,a),(1,b)}\t(1,c)",
"5\tdef\t3\t{(2,a),(2,b)}\t(2,c)"
};
Util.createLocalInputFile(INP_FILE_2NUM_1CHAR_1BAG, input2);
//3rd input - empty file
Util.createLocalInputFile(INP_FILE_EMPTY, new String[0]);
// 4th input
String[] input4 = {"1\t2\t3","4\t5\t6",};
Util.createLocalInputFile(INP_FILE_3NUMS, input4);
}
@AfterClass
public static void oneTimeTearDown() throws Exception {
new File(INP_FILE_2NUMS).delete();
new File(INP_FILE_2NUM_1CHAR_1BAG).delete();
new File(INP_FILE_EMPTY).delete();
}
/**
* Test UNION ONSCHEMA on two inputs with same schema
*/
@Test
public void testUnionOnSchemaSameSchema() throws Exception {
PigServer pig = new PigServer(Util.getLocalTestMode());
String query =
" l1 = load '" + INP_FILE_2NUMS + "' as (i : int, j : int);"
+ "l2 = load '" + INP_FILE_2NUMS + "' as (i : int, j : int);"
+ "u = union onschema l1, l2;"
;
Util.registerMultiLineQuery(pig, query);
Schema expectedSch = Utils.getSchemaFromString("i: int, j: int");
Schema sch = pig.dumpSchema("u");
assertEquals("Checking expected schema",sch, expectedSch);
Iterator<Tuple> it = pig.openIterator("u");
List<Tuple> expectedRes =
Util.getTuplesFromConstantTupleStrings(
new String[] {
"(1,2)",
"(5,3)",
"(1,2)",
"(5,3)"
});
Util.checkQueryOutputsAfterSort(it, expectedRes);
}
/**
* Test UNION ONSCHEMA with operations after the union
*/
@Test
public void testUnionOnSchemaFilter() throws Exception {
PigServer pig = new PigServer(Util.getLocalTestMode());
String query =
" l1 = load '" + INP_FILE_2NUMS + "' as (i : int, x : int);"
+ "l2 = load '" + INP_FILE_2NUMS + "' as (i : int, j : int);"
+ "u = union onschema l1, l2;"
+ "fil = filter u by i == 5 and (x is null or x != 1);"
;
Util.registerMultiLineQuery(pig, query);
Schema sch = pig.dumpSchema("fil");
Schema expectedSch = Utils.getSchemaFromString("i: int, x: int, j: int");
assertEquals("Checking expected schema",sch, expectedSch);
Iterator<Tuple> it = pig.openIterator("fil");
List<Tuple> expectedRes =
Util.getTuplesFromConstantTupleStrings(
new String[] {
"(5,null,3)",
"(5,3,null)"
});
Util.checkQueryOutputsAfterSort(it, expectedRes);
}
/**
* Test UNION ONSCHEMA with operations after the union
*/
@Test
public void testUnionOnSchemaSuccOps() throws Exception {
PigServer pig = new PigServer(Util.getLocalTestMode());
String query =
" l1 = load '" + INP_FILE_2NUMS + "' as (i : int);"
+ "l2 = load '" + INP_FILE_2NUMS + "' as (x : int, y : int);"
+ "u = union onschema l1, l2;"
+ "o = order u by i desc;"
+ "lim = limit o 2;"
+ "fil = filter lim by i == 5 and y is null;"
;
Util.registerMultiLineQuery(pig, query);
Schema sch = pig.dumpSchema("fil");
Schema expectedSch = Utils.getSchemaFromString("i: int, x: int, y: int");
assertEquals("Checking expected schema",sch, expectedSch);
Iterator<Tuple> it = pig.openIterator("fil");
List<Tuple> expectedRes =
Util.getTuplesFromConstantTupleStrings(
new String[] {
"(5,null,null)",
});
Util.checkQueryOutputsAfterSort(it, expectedRes);
}
/**
* Test UNION ONSCHEMA with cast from bytearray to another type
*/
@Test
public void testUnionOnSchemaCastOnByteArray() throws Exception {
PigServer pig = new PigServer(Util.getLocalTestMode());
String query =
" l1 = load '" + INP_FILE_2NUMS + "' as (i, j);"
+ " f1 = foreach l1 generate (int)i, (int)j;"
+ "u = union onschema f1, l1;"
;
Util.registerMultiLineQuery(pig, query);
Iterator<Tuple> it = pig.openIterator("u");
List<Tuple> expectedRes =
Util.getTuplesFromConstantTupleStrings(
new String[] {
"(1,2)",
"(5,3)",
"(1,2)",
"(5,3)"
});
Util.checkQueryOutputsAfterSort(it, expectedRes);
}
/**
* Test UNION ONSCHEMA where a common column has additional 'namespace' part
* in the column name in one of the inputs
*/
@Test
public void testUnionOnSchemaScopedColumnName() throws Exception {
PigServer pig = new PigServer(Util.getLocalTestMode());
String query_prefix =
" l1 = load '" + INP_FILE_2NUMS + "' as (i : int, j : int); "
+ "g = group l1 by i; "
+ "f = foreach g generate flatten(l1); "
+ "l2 = load '" + INP_FILE_2NUMS + "' as (i : int, j : int); ";
String query = query_prefix + "u = union onschema f, l2; " ;
Util.registerMultiLineQuery(pig, query);
Schema sch = pig.dumpSchema("u");
Schema expectedSch = Utils.getSchemaFromString("i: int, j: int");
assertEquals("Checking expected schema",sch, expectedSch);
Iterator<Tuple> it = pig.openIterator("u");
List<Tuple> expectedRes =
Util.getTuplesFromConstantTupleStrings(
new String[] {
"(1,2)",
"(5,3)",
"(1,2)",
"(5,3)"
});
Util.checkQueryOutputsAfterSort(it, expectedRes);
// now try reversing the order of relation
query = query_prefix + "u = union onschema l2, f; " ;
Util.registerMultiLineQuery(pig, query);
sch = pig.dumpSchema("u");
expectedSch = Utils.getSchemaFromString("i: int, j: int");
assertEquals("Checking expected schema",sch, expectedSch);
it = pig.openIterator("u");
Util.checkQueryOutputsAfterSort(it, expectedRes);
}
/**
* Test UNION ONSCHEMA where a common column has additional 'namespace' part
* in the column name in both the inputs
*/
@Test
public void testUnionOnSchemaScopedColumnNameBothInp1() throws Exception {
PigServer pig = new PigServer(Util.getLocalTestMode());
String query =
" l1 = load '" + INP_FILE_2NUMS + "' as (i : int, j : int); "
+ "g1 = group l1 by i; "
+ "f1 = foreach g1 generate group as gkey, flatten(l1); "
+ "l2 = load '" + INP_FILE_2NUMS + "' as (i : int, x : chararray); "
+ "g2 = group l2 by i; "
+ "f2 = foreach g2 generate group as gkey, flatten(l2); "
+ "u = union onschema f1, f2; " ;
Util.registerMultiLineQuery(pig, query);
Schema sch = pig.dumpSchema("u");
Schema expectedSch =
Utils.getSchemaFromString("gkey: int, l1::i: int, l1::j: int, l2::i: int, l2::x: chararray");
assertEquals("Checking expected schema",sch, expectedSch);
Iterator<Tuple> it = pig.openIterator("u");
List<Tuple> expectedRes =
Util.getTuplesFromConstantTupleStrings(
new String[] {
"(1,1,2,null,null)",
"(5,5,3,null,null)",
"(1,null,null,1,'2')",
"(5,null,null,5,'3')"
});
Util.checkQueryOutputsAfterSort(it, expectedRes);
}
/**
* Test UNION ONSCHEMA where a common column has additional 'namespace' part
* in the column name in both the inputs
*/
@Test
public void testUnionOnSchemaScopedColumnNameBothInp2() throws Exception {
PigServer pig = new PigServer(Util.getLocalTestMode());
String query =
" l1 = load '" + INP_FILE_2NUMS + "' as (i : int, j : int); "
+ " l2 = load '" + INP_FILE_2NUMS + "' as (i : int, x : chararray); "
+ " cg1 = cogroup l1 by i, l2 by i; "
+ " f1 = foreach cg1 generate group as gkey, flatten(l1), flatten(l2); "
+ " cg2 = cogroup l2 by i, l1 by i; "
+ " f2 = foreach cg1 generate group as gkey, flatten(l2), flatten(l1); "
+ "u = union onschema f1, f2; " ;
Util.registerMultiLineQuery(pig, query);
Schema sch = pig.dumpSchema("u");
Schema expectedSch =
Utils.getSchemaFromString("gkey: int, l1::i: int, l1::j: int, l2::i: int, l2::x: chararray");
assertEquals("Checking expected schema",sch, expectedSch);
Iterator<Tuple> it = pig.openIterator("u");
List<Tuple> expectedRes =
Util.getTuplesFromConstantTupleStrings(
new String[] {
"(1,1,2,1,'2')",
"(5,5,3,5,'3')",
"(1,1,2,1,'2')",
"(5,5,3,5,'3')",
});
Util.checkQueryOutputsAfterSort(it, expectedRes);
}
/**
* Test UNION ONSCHEMA where a common column has additional 'namespace' part
* in the column name in one of the inputs.
* Negative test case
*/
@Test
public void testUnionOnSchemaScopedColumnNameNeg() throws Exception {
String expectedErr = "Found more than one match: l1::i, l2::i";
String query_prefix =
" l1 = load '/tmp/fn' as (i : int, j : long); "
+ "l2 = load '/tmp/fn' as (i : int, j : long); "
+ "cg = cogroup l1 by i, l2 by i;"
+ "f = foreach cg generate flatten(l1), flatten(l2); "
+ "l3 = load '/tmp/fn2' as (i : int, j : long); "
;
String query = query_prefix + "u = union onschema f, l3; ";
checkSchemaEx(query, expectedErr);
// now try reversing the order of relation
query = query_prefix + "u = union onschema l3, f; ";
checkSchemaEx(query, expectedErr);
}
/**
* Test UNION ONSCHEMA on two inputs with same column names, but different
* numeric types - test type promotion
*/
@Test
public void testUnionOnSchemaDiffNumType() throws Exception {
PigServer pig = new PigServer(Util.getLocalTestMode());
String query =
" l1 = load '" + INP_FILE_2NUMS + "' as (i : int, j : double);"
+ "l2 = load '" + INP_FILE_2NUMS + "' as (i : long, j : float);"
+ "u = union onschema l1, l2;"
;
Util.registerMultiLineQuery(pig, query);
Iterator<Tuple> it = pig.openIterator("u");
List<Tuple> expectedRes =
Util.getTuplesFromConstantTupleStrings(
new String[] {
"(1L,2.0)",
"(5L,3.0)",
"(1L,2.0)",
"(5L,3.0)"
});
Util.checkQueryOutputsAfterSort(it, expectedRes);
}
/**
* Test UNION ONSCHEMA on two inputs with no common columns
*/
@Test
public void testUnionOnSchemaNoCommonCols() throws Exception {
PigServer pig = new PigServer(Util.getLocalTestMode());
String query =
" l1 = load '" + INP_FILE_2NUMS + "' as (i : int, j : int);"
+ "l2 = load '" + INP_FILE_2NUMS + "' as (x : long, y : float);"
+ "u = union onschema l1, l2;"
;
Util.registerMultiLineQuery(pig, query);
Iterator<Tuple> it = pig.openIterator("u");
List<Tuple> expectedRes =
Util.getTuplesFromConstantTupleStrings(
new String[] {
"(1,2,null,null)",
"(5,3,null,null)",
"(null,null,1L,2.0F)",
"(null,null,5L,3.0F)"
});
Util.checkQueryOutputsAfterSort(it, expectedRes);
}
/**
* Test UNION ONSCHEMA on two inputs , one input with additional columns
*/
@Test
public void testUnionOnSchemaAdditionalColumn() throws Exception {
PigServer pig = new PigServer(Util.getLocalTestMode());
String query =
" l1 = load '" + INP_FILE_2NUMS + "' as (i : int, j : int);"
+ "l2 = load '" + INP_FILE_2NUM_1CHAR_1BAG + "' as "
+ " (i : long, c : chararray, j : int "
+ ", b : bag { t : tuple (c1 : int, c2 : chararray)}"
+ ", t : tuple (tc1 : int, tc2 : chararray) );"
+ "l3 = load '" + INP_FILE_EMPTY + "' as (i : int, x : long);"
+ "u = union onschema l1, l2, l3;"
;
Util.registerMultiLineQuery(pig, query);
pig.explain("u", System.out);
Iterator<Tuple> it = pig.openIterator("u");
List<Tuple> expectedRes =
Util.getTuplesFromConstantTupleStrings(
new String[] {
"(1L,2,null,null,null,null)",
"(5L,3,null,null,null,null)",
"(1L,2,'abc',{(1,'a'),(1,'b')},(1,'c'),null)",
"(5L,3,'def',{(2,'a'),(2,'b')},(2,'c'),null)",
});
Util.checkQueryOutputsAfterSort(it, expectedRes);
}
@Test
public void testUnionOnSchemaAdditionalColumnsWithImplicitSplit() throws Exception {
PigServer pig = new PigServer(Util.getLocalTestMode());
Data data = Storage.resetData(pig);
// Use batch to force multiple outputs from relation l3. This causes
// ImplicitSplitInsertVisitor to call SchemaResetter.
pig.setBatchOn();
String query =
" l1 = load '" + INP_FILE_2NUMS + "' as (i : int, j: int);"
+ "l2 = load '" + INP_FILE_3NUMS + "' as (i : int, j : int, k : int);"
+ "l3 = load '" + INP_FILE_EMPTY + "' as (i : int, j : int, k : int, l :int);"
+ "u = union onschema l1, l2, l3;"
+ "store u into 'out1' using mock.Storage;"
+ "store l3 into 'out2' using mock.Storage;"
;
Util.registerMultiLineQuery(pig, query);
pig.executeBatch();
List<Tuple> list1 = data.get("out1");
List<Tuple> list2 = data.get("out2");
List<Tuple> expectedRes =
Util.getTuplesFromConstantTupleStrings(
new String[] {
"(1,2,null,null)",
"(5,3,null,null)",
"(1,2,3,null)",
"(4,5,6,null)",
});
Util.checkQueryOutputsAfterSort(list1, expectedRes);
assertEquals(0, list2.size());
}
/**
* Test UNION ONSCHEMA on 3 inputs
*/
@Test
public void testUnionOnSchemaInnerSchema() throws Exception {
PigServer pig = new PigServer(Util.getLocalTestMode());
String query =
" l1 = load '" + INP_FILE_2NUM_1CHAR_1BAG + "' as "
+ " (i : long, c : chararray, j : int "
+ ", b : bag { t : tuple (c1 : int, c2 : chararray)} ); "
+ "l2 = load '" + INP_FILE_2NUM_1CHAR_1BAG + "' as "
+ " (i : long, c : chararray, j : int "
+ ", b : bag { t : tuple (c1 : int, c2 : chararray)} ); "
+ "u = union onschema l1, l2; "
// The addition in the inner foreach will fail if the inner schema's uids
// are all set to -1, since the code that finds the inner load's schema will
// match the last item in b's schema, which is a chararray
+ "p = foreach u { x = foreach b GENERATE c1 + 5 as c3; GENERATE i, c, x; }";
Util.registerMultiLineQuery(pig, query);
pig.explain("p", System.out);
Iterator<Tuple> it = pig.openIterator("p");
List<Tuple> expectedRes =
Util.getTuplesFromConstantTupleStrings(
new String[] {
"(1L,'abc',{(6),(6)})",
"(5L,'def',{(7),(7)})",
"(1L,'abc',{(6),(6)})",
"(5L,'def',{(7),(7)})"
});
Util.checkQueryOutputsAfterSort(it, expectedRes);
}
/**
* Test UNION ONSCHEMA on 3 inputs
* @throws IOException
* @throws ParserException
*/
@Test
public void testUnionOnSchema3Inputs() throws Exception {
PigServer pig = new PigServer(Util.getLocalTestMode());
String query =
" l1 = load '" + INP_FILE_2NUMS + "' as (i : int, j : int); "
+ "l2 = load '" + INP_FILE_2NUMS + "' as (i : double, x : int); "
+ "l3 = load '" + INP_FILE_2NUM_1CHAR_1BAG + "' as "
+ " (i : long, c : chararray, j : int "
+ ", b : bag { t : tuple (c1 : int, c2 : chararray)} ); "
+ "u = union onschema l1, l2, l3;"
;
Util.registerMultiLineQuery(pig, query);
pig.explain("u", System.out);
Iterator<Tuple> it = pig.openIterator("u");
List<Tuple> expectedRes =
Util.getTuplesFromConstantTupleStrings(
new String[] {
"(1.0,2,null,null,null)",
"(5.0,3,null,null,null)",
"(1.0,null,2,null,null)",
"(5.0,null,3,null,null)",
"(1.0,2,null,'abc',{(1,'a'),(1,'b')})",
"(5.0,3,null,'def',{(2,'a'),(2,'b')})",
});
Util.checkQueryOutputsAfterSort(it, expectedRes);
}
/**
* Test UNION ONSCHEMA with bytearray type
*/
@Test
public void testUnionOnSchemaByteArrayConversions() throws Exception {
PigServer pig = new PigServer(Util.getLocalTestMode());
String query =
" l1 = load '" + INP_FILE_2NUM_1CHAR_1BAG + "' as "
+ " (i : bytearray, x : bytearray, j : bytearray "
+ ", b : bytearray); "
+ "l2 = load '" + INP_FILE_2NUM_1CHAR_1BAG + "' as "
+ " (i : long, c : chararray, j : int "
+ ", b : bag { t : tuple (c1 : int, c2 : chararray)} ); "
+ "u = union onSchema l1, l2;"
;
Util.registerMultiLineQuery(pig, query);
pig.explain("u", System.out);
Iterator<Tuple> it = pig.openIterator("u");
List<Tuple> expectedRes =
Util.getTuplesFromConstantTupleStrings(
new String[] {
"(1L,null,2,{(1,'a'),(1,'b')},'abc')",
"(1L,'abc',2,{(1,'a'),(1,'b')},null)",
"(5L,null,3,{(2,'a'),(2,'b')},'def')",
"(5L,'def',3,{(2,'a'),(2,'b')},null)",
});
//update expectedRes to use bytearray instead of chararray in 2nd field
for(Tuple t : expectedRes){
if(t.get(1) != null){
t.set(1, new DataByteArray(t.get(1).toString()));
}
}
Util.checkQueryOutputsAfterSort(it, expectedRes);
}
/**
* negative test - test error on no schema
*/
@Test
public void testUnionOnSchemaNoSchema() throws Exception {
String expectedErr = "UNION ONSCHEMA cannot be used with " +
"relations that have null schema";
String query =
" l1 = load '" + INP_FILE_2NUMS + "' ;"
+ "l2 = load '" + INP_FILE_2NUMS + "' as (x : long, y : float);"
+ "u = union onschema l1, l2;"
;
checkSchemaEx(query, expectedErr);
query =
" l1 = load '" + INP_FILE_2NUMS + "' ;"
+ "l2 = load '" + INP_FILE_2NUMS + "' ;"
+ "u = union onschema l1, l2;"
;
checkSchemaEx(query, expectedErr);
}
/**
* negative test - test error on null alias in one of the FieldSchema
*/
@Test
public void testUnionOnSchemaNullAliasInFieldSchema() throws Exception {
String expectedErr = "Schema of relation f has a null fieldschema for " +
"column(s). Schema ::long,y:float";
String query =
" l = load '" + INP_FILE_2NUMS + "' as (x : long, y : float);"
+ "f = foreach l generate x+1, y;"
+ "u = union onschema l, f;"
;
checkSchemaEx(query, expectedErr);
}
private void checkSchemaEx(String query, String expectedErr) throws Exception {
PigServer pig = new PigServer(Util.getLocalTestMode());
boolean foundEx = false;
try{
Util.registerMultiLineQuery(pig, query);
pig.dumpSchema("u");
}catch(FrontendException e){
PigException pigEx = LogUtils.getPigException(e);
foundEx = true;
if(!pigEx.getMessage().contains(expectedErr)){
String msg = "Expected exception message matching '"
+ expectedErr + "' but got '" + pigEx.getMessage() + "'" ;
fail(msg);
}
}
if(!foundEx)
fail("No exception thrown. Exception is expected.");
}
/**
* test union with incompatible types in schema
*/
@Test
public void testUnionOnSchemaIncompatibleTypes() throws Exception {
String query =
" l1 = load '" + INP_FILE_2NUMS + "' as (x : long, y : chararray);"
+ "l2 = load '" + INP_FILE_2NUMS + "' as (x : long, y : float);"
+ "u = union onschema l1, l2;";
checkSchemaEx(query, "Cannot cast from chararray to bytearray");
//without "onschema"
query =
" l1 = load '" + INP_FILE_2NUMS + "' as (x : long, y : chararray);"
+ "l2 = load '" + INP_FILE_2NUMS + "' as (x : long, y : float);"
+ "u = union l1, l2;";
checkSchemaEx(query, "Cannot cast from chararray to bytearray");
query =
" l1 = load '" + INP_FILE_2NUMS + "' as (x : long, y : chararray);"
+ "l2 = load '" + INP_FILE_2NUMS + "' as (x : map[ ], y : chararray);"
+ "u = union onschema l1, l2;"
;
checkSchemaEx(query, "Cannot cast from long to bytearray");
query =
" l1 = load '" + INP_FILE_2NUMS + "' as (x : long, y : chararray);"
+ "l2 = load '" + INP_FILE_2NUMS + "' as (x : map[ ], y : chararray);"
+ "u = union l1, l2;"
;
checkSchemaEx(query, "Cannot cast from long to bytearray");
// bag column with different internal column types
query =
" l1 = load '" + INP_FILE_2NUMS
+ "' as (x : long, b : bag { t : tuple (c1 : int, c2 : chararray)} );"
+ "l2 = load '" + INP_FILE_2NUMS
+ "' as (x : long, b : bag { t : tuple (c1 : long, c2 : chararray)} );"
+ "u = union onschema l1, l2;"
;
Schema sch = new Schema();
sch.add(new FieldSchema("x", DataType.LONG));
Schema bagInnerSchema = new Schema();
bagInnerSchema.add(new FieldSchema(null, new Schema(), DataType.TUPLE));
sch.add(new FieldSchema("b", bagInnerSchema, DataType.BAG));
checkSchemaEquals(query, sch);
// tuple column with different internal column types
query =
" l1 = load '" + INP_FILE_2NUMS
+ "' as (t : tuple (c1 : int, c2 : chararray) );"
+ "l2 = load '" + INP_FILE_2NUMS
+ "' as (t : tuple (c1 : long, c2 : chararray) );"
+ "u = union onschema l1, l2;"
;
sch = new Schema();
sch.add(new FieldSchema("t", new Schema(), DataType.TUPLE));
checkSchemaEquals(query, sch);
}
private void checkSchemaEquals(String query, Schema expectedSch) throws Exception {
PigServer pig = new PigServer(Util.getLocalTestMode());
Util.registerMultiLineQuery(pig, query);
Schema sch = pig.dumpSchema("u");
assertEquals("Checking expected schema", expectedSch, sch);
}
private void checkSchemaEquals(String query, String schemaStr) throws Exception {
Schema expectedSch = Utils.getSchemaFromString(schemaStr);
checkSchemaEquals(query, expectedSch);
}
/**
* Test UNION ONSCHEMA with input relation having udfs
*/
@Test
public void testUnionOnSchemaInputUdfs() throws Exception {
PigServer pig = new PigServer(Util.getLocalTestMode());
String query =
" l1 = load '" + INP_FILE_2NUMS + "' as (i : int, j : chararray);"
+ "l2 = load '" + INP_FILE_2NUMS + "' as (i : int, j : chararray);"
+ "f1 = foreach l1 generate i, CONCAT(j,j) as cj, " +
"org.apache.pig.test.TestUnionOnSchema\\$UDFTupleNullSchema(i,j) as uo;"
+ "u = union onschema f1, l2;"
;
Util.registerMultiLineQuery(pig, query);
Schema sch = pig.dumpSchema("u");
String expectedSch = "{i: int,cj: chararray,uo: (),j: chararray}";
Assert.assertTrue( expectedSch.equals( sch.toString() ) );
Iterator<Tuple> it = pig.openIterator("u");
List<Tuple> expectedRes =
Util.getTuplesFromConstantTupleStrings(
new String[] {
"(1,null,null,'2')",
"(5,null,null,'3')",
"(1,'22',(1,'2'),null)",
"(5,'33',(5,'3'),null)"
});
Util.checkQueryOutputsAfterSort(it, expectedRes);
}
/**
* Test UNION ONSCHEMA with udf whose default type is different from
* final type
*/
@Test
public void testUnionOnSchemaUdfTypeEvolution() throws Exception {
PigServer pig = new PigServer(Util.getLocalTestMode());
String query_prefix =
" l1 = load '" + INP_FILE_2NUM_1CHAR_1BAG + "' as "
+ " (i : int, c : chararray, j : int "
+ ", b : bag { t : tuple (c1 : int, c2 : chararray)}"
+ ", t : tuple (tc1 : int, tc2 : chararray) );"
+ " l2 = load '" + INP_FILE_2NUM_1CHAR_1BAG + "' as "
+ " (i : int, c : chararray, j : int "
+ ", b : bag { t : tuple (c1 : int, c2 : chararray)}"
+ ", t : tuple (tc1 : int, tc2 : chararray) );"
+ "f1 = foreach l1 generate i, MAX(b.c1) as mx;"
+ "f2 = foreach l2 generate i, COUNT(b.c1) as mx;"
;
String query = query_prefix + "u = union onschema f1, f2;";
Util.registerMultiLineQuery(pig, query);
Schema sch = pig.dumpSchema("u");
Schema expectedSch =
Utils.getSchemaFromString("i: int, mx: long");
assertEquals("Checking expected schema",sch, expectedSch);
// verify schema for reverse order of relations as well
query = query_prefix + "u = union onschema f2, f1;";
Util.registerMultiLineQuery(pig, query);
sch = pig.dumpSchema("u");
expectedSch =
Utils.getSchemaFromString("i: int, mx: long");
assertEquals("Checking expected schema",sch, expectedSch);
Iterator<Tuple> it = pig.openIterator("u");
List<Tuple> expectedRes =
Util.getTuplesFromConstantTupleStrings(
new String[] {
"(1,1L)",
"(5,2L)",
"(1,2L)",
"(5,2L)"
});
Util.checkQueryOutputsAfterSort(it, expectedRes);
}
/**
* Test UNION ONSCHEMA with udf whose default type is different from
* final type - where udf is not in immediate input of union
*/
@Test
public void testUnionOnSchemaUdfTypeEvolution2() throws Exception {
PigServer pig = new PigServer(Util.getLocalTestMode());
String query_prefix =
" l1 = load '" + INP_FILE_2NUM_1CHAR_1BAG + "' as "
+ " (i : int, c : chararray, j : int "
+ ", b : bag { t : tuple (c1 : int, c2 : chararray)}"
+ ", t : tuple (tc1 : int, tc2 : chararray) );"
+ " l2 = load '" + INP_FILE_2NUM_1CHAR_1BAG + "' as "
+ " (i : int, c : chararray, j : int "
+ ", b : bag { t : tuple (c1 : int, c2 : chararray)}"
+ ", t : tuple (tc1 : int, tc2 : chararray) );"
+ "f1 = foreach l1 generate i, MAX(b.c1) as mx;"
+ "f11 = foreach f1 generate i, mx;"
+ "f2 = foreach l2 generate i, COUNT(b.c1) as mx;"
+ "f22 = foreach f2 generate i, mx;"
;
String query = query_prefix + "u = union onschema f11, f22;";
Util.registerMultiLineQuery(pig, query);
Schema sch = pig.dumpSchema("u");
Schema expectedSch =
Utils.getSchemaFromString("i: int, mx: long");
assertEquals("Checking expected schema",sch, expectedSch);
// verify schema for reverse order of relations as well
query = query_prefix + "u = union onschema f22, f11;";
Util.registerMultiLineQuery(pig, query);
sch = pig.dumpSchema("u");
expectedSch =
Utils.getSchemaFromString("i: int, mx: long");
assertEquals("Checking expected schema",sch, expectedSch);
Iterator<Tuple> it = pig.openIterator("u");
List<Tuple> expectedRes =
Util.getTuplesFromConstantTupleStrings(
new String[] {
"(1,1L)",
"(5,2L)",
"(1,2L)",
"(5,2L)"
});
Util.checkQueryOutputsAfterSort(it, expectedRes);
}
/**
* Udf that has schema of tuple column with no inner schema
*/
public static class UDFTupleNullSchema extends EvalFunc <Tuple> {
@Override
public Tuple exec(Tuple input) {
return input;
}
@Override
public Schema outputSchema(Schema input) {
FieldSchema fs =
new Schema.FieldSchema(getSchemaName("UDFTupleNullSchema", input),
DataType.TUPLE);
return new Schema(fs);
}
}
/**
* Test UNION ONSCHEMA with input relation having column names with multiple
* level of namespace in their names
*/
@Test
public void testUnionOnSchemaScopeMulti() throws Exception {
PigServer pig = new PigServer(Util.getLocalTestMode());
String query_prefix =
" a = load '" + INP_FILE_2NUMS+ "' as (i:int, j:int); "
+ "b = group a by i; "
+ "c = foreach b generate group as gp, flatten(a); "
+ "d = group c by $0; "
+ "e = foreach d generate group as gp, flatten(c); "
+ "f = load '" + INP_FILE_2NUMS + "' as (i, j); "
;
String query = query_prefix + "u = union onschema e,f;";
Util.registerMultiLineQuery(pig, query);
Schema sch = pig.dumpSchema("u");
Schema expectedSch =
Utils.getSchemaFromString("gp: int,c::gp: int,i: int,j: int");
assertEquals("Checking expected schema",sch, expectedSch);
query = query_prefix + "u = union onschema f,e;";
Util.registerMultiLineQuery(pig, query);
sch = pig.dumpSchema("u");
expectedSch =
Utils.getSchemaFromString("i: int,j: int, gp: int,c::gp: int");
assertEquals("Checking expected schema",sch, expectedSch);
Iterator<Tuple> it = pig.openIterator("u");
List<Tuple> expectedRes =
Util.getTuplesFromConstantTupleStrings(
new String[] {
"(1,2,null,null)",
"(5,3,null,null)",
"(1,2,1,1)",
"(5,3,5,5)",
});
Util.checkQueryOutputsAfterSort(it, expectedRes);
}
/**
* Test query with a union-onschema having another as input
*/
@Test
public void testTwoUnions() throws Exception {
PigServer pig = new PigServer(Util.getLocalTestMode());
String query =
" l1 = load '" + INP_FILE_2NUMS + "' as (i : int, j : int);"
+ "l2 = load '" + INP_FILE_2NUMS + "' as (i : long, j : int);"
+ "u1 = union onschema l1, l2;"
+ "l3 = load '" + INP_FILE_2NUMS + "' as (i : long, j : double);"
+ "u2 = union onschema u1, l3;"
;
Util.registerMultiLineQuery(pig, query);
Iterator<Tuple> it = pig.openIterator("u2");
List<Tuple> expectedRes =
Util.getTuplesFromConstantTupleStrings(
new String[] {
"(1L,2.0)",
"(5L,3.0)",
"(1L,2.0)",
"(5L,3.0)",
"(1L,2.0)",
"(5L,3.0)"
});
Util.checkQueryOutputsAfterSort(it, expectedRes);
}
}