blob: e46686fb414b5183adcddff9d7edfaac2d729a68 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.test;
import static org.apache.pig.ExecType.LOCAL;
import static org.junit.Assert.assertEquals;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.util.Utils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* Test Project-(star/range) expansion when used as udf argument
*/
public class TestProjectStarRangeInUdf {
protected final Log log = LogFactory.getLog(getClass());
protected static PigServer pigServer;
private static final String INP_FILE_5FIELDS = "TestProjectRange_5fields";
@BeforeClass
public static void oneTimeSetUp() throws Exception {
String[] input = {"10\t20\t30\t40\t50", "11\t21\t31\t41\t51"};
Util.createLocalInputFile(INP_FILE_5FIELDS, input);
}
@Before
public void setup() throws ExecException{
pigServer = new PigServer(LOCAL);
}
@After
public void tearDown() throws Exception {
pigServer.shutdown();
}
@AfterClass
public static void oneTimeTearDown() throws Exception {
new File(INP_FILE_5FIELDS).delete();
}
@Test
public void testProjStarExpandInForeach1() throws IOException{
//star expansion lets CONCAT be used if input has two cols
String query;
query =
" l1 = load '" + INP_FILE_5FIELDS + "' as (a, b);"
+ "f = foreach l1 generate CONCAT(*) as ct;"
;
compileAndCompareSchema("ct : bytearray", query, "f");
}
@Test
public void testProjStarExpandInForeach1Multi() throws IOException{
//star expansion gives 3 columns, CONCAT(*) should pass
String query;
query =
" l1 = load '" + INP_FILE_5FIELDS + "' as (a, b, c);"
+ "f = foreach l1 generate CONCAT(*) as ct;"
;
compileAndCompareSchema("ct : bytearray", query, "f");
}
@Test
public void testProjStarExpandInForeach1NegativeNoSchema() throws IOException{
String query;
query =
" l1 = load '" + INP_FILE_5FIELDS + "' ;"
+ "f = foreach l1 generate CONCAT(*) as ct;"
;
Util.checkExceptionMessage(query, "f",
"Could not infer the matching function for " +
"org.apache.pig.builtin.CONCAT");
query =
" l1 = load '" + INP_FILE_5FIELDS + "' ;"
+ "f = foreach l1 generate SIZE(*) as ct;"
;
Util.checkExceptionMessage(query, "f",
"Could not infer the matching function for " +
"org.apache.pig.builtin.SIZE");
}
@Test
public void testProjStarExpandInForeach2() throws IOException {
String query;
query =
" l1 = load '" + INP_FILE_5FIELDS + "' as (a : int, b : int, c : int);"
+ "f = foreach l1 generate TOTUPLE(*) as tb;"
;
compileAndCompareSchema("tb : (a : int, b : int, c : int)", query, "f");
Iterator<Tuple> it = pigServer.openIterator("f");
List<Tuple> expectedRes =
Util.getTuplesFromConstantTupleStrings(
new String[] {
"((10,20,30))",
"((11,21,31))",
});
Util.checkQueryOutputsAfterSort(it, expectedRes);
}
//PIG-2223
// lookup on column name in udf output tuple schema
@Test
public void testProjStarExpandInForeachLookup1() throws IOException {
String query =
" l1 = load '" + INP_FILE_5FIELDS + "' as (a : int, b : int, c : int);"
+ "f = foreach l1 generate TOTUPLE(*) as tb;"
+ "f2 = foreach f generate tb.a, tb.b;"
;
compileAndCompareSchema("a : int, b : int", query, "f2");
}
//PIG-2223
// lookup on column name in udf output tuple schema
@Test
public void testProjStarExpandInForeachLookup2() throws IOException {
String query =
" l1 = load '" + INP_FILE_5FIELDS + "' as (a : int, b : int, c : int);"
+ "f = foreach l1 generate TOTUPLE(b .. ) as tb;"
+ "f2 = foreach f generate tb.b as b2, tb.c as c2;"
+ "f3 = foreach f2 generate b2, b2 + c2 as bc2;"
;
compileAndCompareSchema("b2 : int, bc2 : int", query, "f3");
}
@Test
public void testProjStarExpandInFilter1() throws IOException{
//TOBAG has * and a bincond expression as argument
String query;
query =
" l1 = load '" + INP_FILE_5FIELDS + "' as (a : int, b : int);"
+ "f = filter l1 by SUM(TOBAG((a == 10 ? 100 : 0), *)) == 130;"
;
compileAndCompareSchema("a : int, b : int", query, "f");
Iterator<Tuple> it = pigServer.openIterator("f");
List<Tuple> expectedRes =
Util.getTuplesFromConstantTupleStrings(
new String[] {
"(10,20)",
});
Util.checkQueryOutputsAfterSort(it, expectedRes);
}
@Test
public void testProjRangeExpandInFilterNoSchema1() throws IOException{
//star expansion lets CONCAT be used if input has two cols
String query;
query =
" l1 = load '" + INP_FILE_5FIELDS + "' ;"
+ "f = filter l1 by SUM(TOBAG($0 .. $1)) == 30;"
;
compileAndCompareSchema((Schema)null, query, "f");
Iterator<Tuple> it = pigServer.openIterator("f");
List<Tuple> expectedRes =
Util.getTuplesFromConstantTupleStringAsByteArray(
new String[] {
"('10','20','30','40','50')",
});
Util.checkQueryOutputsAfterSort(it, expectedRes);
}
/**
* Test project-range in foreach with limits on both sides
* @throws IOException
* @throws ParseException
*/
@Test
public void testProjRangeExpandInForeach() throws IOException {
String query;
query =
" l1 = load '" + INP_FILE_5FIELDS + "' as (a, b : chararray, c : chararray, d);"
+ "f = foreach l1 generate CONCAT($1 .. $2) as ct;"
;
compileAndCompareSchema("ct : chararray", query, "f");
Iterator<Tuple> it = pigServer.openIterator("f");
List<Tuple> expectedRes =
Util.getTuplesFromConstantTupleStrings(
new String[] {
"('2030')",
"('2131')",
});
Util.checkQueryOutputsAfterSort(it, expectedRes);
}
@Test
public void testProjRangeExpandInJoin() throws IOException {
String query;
query =
" l1 = load '" + INP_FILE_5FIELDS + "' as (a : chararray, b : chararray, c : chararray, d);"
+ "f1 = foreach l1 generate a, b, c, '1' as num;"
+ "l2 = load '" + INP_FILE_5FIELDS + "' as (a : chararray, b : chararray, c : chararray, d);"
+ "f2 = foreach l1 generate c, a, b, '2' as num;"
+ "j = join f1 by CONCAT($0 .. $1), f2 by CONCAT(a .. b);"
;
String schStr =
"f1::a : chararray, f1::b : chararray, f1::c : chararray, f1::num : chararray," +
"f2::c : chararray, f2::a : chararray, f2::b : chararray, f2::num : chararray";
compileAndCompareSchema(schStr, query, "j");
Iterator<Tuple> it = pigServer.openIterator("j");
List<Tuple> expectedRes =
Util.getTuplesFromConstantTupleStrings(
new String[] {
"('10', '20', '30', '1', '30', '10', '20', '2')",
"('11', '21', '31', '1', '31', '11', '21', '2')",
});
Util.checkQueryOutputsAfterSort(it, expectedRes);
}
@Test
public void testProjMixExpand1() throws IOException {
String query;
query =
" l1 = load '" + INP_FILE_5FIELDS + "' as (a : int, b : int, c : int);"
+ "f = foreach l1 generate TOBAG(*, $0 .. $2) as tt;"
;
compileAndCompareSchema("tt : {(NullAlias : int)}", query, "f");
Iterator<Tuple> it = pigServer.openIterator("f");
List<Tuple> expectedRes =
Util.getTuplesFromConstantTupleStrings(
new String[] {
"({(10),(20),(30),(10),(20),(30)})",
"({(11),(21),(31),(11),(21),(31)})",
});
Util.checkQueryOutputsAfterSort(it, expectedRes);
}
@Test
public void testProjMixExpand1NoSchema() throws IOException {
String query;
query =
" l1 = load '" + INP_FILE_5FIELDS + "';"
+ "f = foreach l1 generate TOBAG(*, $0 .. $2) as tt;"
;
Schema sch = Utils.getSchemaFromString("tt : {(NullALias)}");
sch.getField(0).schema.getField(0).schema.getField(0).alias = null;
sch.getField(0).schema.getField(0).schema.getField(0).type = DataType.NULL;
compileAndCompareSchema(sch, query, "f");
Iterator<Tuple> it = pigServer.openIterator("f");
List<Tuple> expectedRes =
Util.getTuplesFromConstantTupleStringAsByteArray(
new String[] {
"({('10'),('20'),('30'),('40'),('50'),('10'),('20'),('30')})",
"({('11'),('21'),('31'),('41'),('51'),('11'),('21'),('31')})",
});
Util.checkQueryOutputsAfterSort(it, expectedRes);
}
@Test
public void testProjMixExpand2() throws IOException {
String query;
query =
" l1 = load '" + INP_FILE_5FIELDS + "' as (a : int, b : int, c : int, d : int);"
+ "f = foreach l1 generate TOTUPLE(1, $0 .. $1, 2+3, $2 .. , d - 1) as tt;"
;
String schStr = "tt : (NullAliasA : int, a : int, b : int," +
" NullAliasB : int, c : int, d : int, NullAliasC : int)";
compileAndCompareSchema(schStr, query, "f");
Iterator<Tuple> it = pigServer.openIterator("f");
List<Tuple> expectedRes =
Util.getTuplesFromConstantTupleStrings(
new String[] {
"((1,10,20,5,30,40,39))",
"((1,11,21,5,31,41,40))",
});
Util.checkQueryOutputsAfterSort(it, expectedRes);
}
@Test
public void testProjMixExpand2NoSchema() throws IOException {
String query;
query =
" l1 = load '" + INP_FILE_5FIELDS + "' ;"
+ "f = foreach l1 generate TOTUPLE(1, $0 .. $1, 2+3, $2 .. , $4 -1) as tt;"
;
compileAndCompareSchema("tt :()", query, "f");
pigServer.explain("f", System.out);
Iterator<Tuple> it = pigServer.openIterator("f");
List<Tuple> expectedRes =
Util.getTuplesFromConstantTupleStringAsByteArray(
new String[] {
"((1,'10','20',5,'30','40','50',49))",
"((1,'11','21',5,'31','41','51',50))",
});
Util.checkQueryOutputsAfterSort(it, expectedRes);
}
@Test
public void testProjMixExpand3() throws IOException {
String query;
query =
" l1 = load '" + INP_FILE_5FIELDS + "' as (a : int, b : int, c : chararray, d : chararray);"
+ "f = foreach l1 generate TOTUPLE($0 .. $1, CONCAT($2 .. )) as tt;"
;
String schStr = "tt : (a : int, b : int, NullAlias : chararray)";
compileAndCompareSchema(schStr, query, "f");
Iterator<Tuple> it = pigServer.openIterator("f");
List<Tuple> expectedRes =
Util.getTuplesFromConstantTupleStrings(
new String[] {
"((10,20,'3040'))",
"((11,21,'3141'))",
});
Util.checkQueryOutputsAfterSort(it, expectedRes);
}
private void compileAndCompareSchema(String expectedSchStr, String query, String alias)
throws IOException {
Schema expectedSch = null;
if(expectedSchStr != null)
expectedSch = Utils.getSchemaFromString(expectedSchStr);
Util.schemaReplaceNullAlias(expectedSch);
compileAndCompareSchema(expectedSch, query, alias);
}
private void compileAndCompareSchema(Schema expectedSch, String query,
String alias) throws IOException {
Util.registerMultiLineQuery(pigServer, query);
Schema sch = pigServer.dumpSchema(alias);
assertEquals("Checking expected schema", expectedSch, sch);
}
}