blob: a1568be9efb2c580e5aaede934272c124a9d3800 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
import org.apache.pig.impl.plan.PlanValidationException;
import org.apache.pig.impl.util.Utils;
import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
import org.apache.pig.newplan.logical.relational.LOCogroup;
import org.apache.pig.newplan.logical.relational.LOJoin;
import org.apache.pig.newplan.logical.relational.LOSort;
import org.apache.pig.newplan.logical.relational.LogicalPlan;
import org.apache.pig.parser.ParserException;
import org.apache.pig.parser.ParserTestingUtils;
import org.apache.pig.test.utils.NewLogicalPlanUtil;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestProjectRange {
protected final Log log = LogFactory.getLog(getClass());
private static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
protected static PigServer pigServer;
private static final String INP_FILE_5FIELDS = "TestProjectRange_5fields";
@BeforeClass
public static void oneTimeSetUp() throws Exception {
String[] input = {"10\t20\t30\t40\t50", "11\t21\t31\t41\t51"};
Util.createInputFile(cluster, INP_FILE_5FIELDS, input);
}
@Before
public void setup() throws ExecException {
pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
}
@After
public void tearDown() throws Exception {
pigServer.shutdown();
}
@AfterClass
public static void oneTimeTearDown() throws Exception {
new File(INP_FILE_5FIELDS).delete();
if(cluster != null)
cluster.shutDown();
}
/**
* Test project-range in foreach with limits on both sides
* @throws IOException
* @throws ParserException
*/
@Test
public void testFullRangeForeachWSchema() throws IOException, ParserException {
String query;
//specifying the new aliases
query =
" l1 = load '" + INP_FILE_5FIELDS + "' as (a : int, b : float, c : int, d : int, e : int);"
+ "f = foreach l1 generate a .. c as (aa, bb, cc);"
;
compileAndCompareSchema("aa : int, bb : float, cc : int", query, "f");
//specifying the new aliases - refer to column by pos
query =
" l1 = load '" + INP_FILE_5FIELDS + "' as (a : int, b : float, c : int, d : int, e : int);"
+ "f = foreach l1 generate $0 .. $2 as (aa, bb, cc);"
;
compileAndCompareSchema("aa : int, bb : float, cc : int", query, "f");
//column with pos , name
query =
" l1 = load '" + INP_FILE_5FIELDS + "' as (a : int, b : float, c : int, d : int, e : int);"
+ "f = foreach l1 generate $0 .. c as (aa, bb, cc);"
;
compileAndCompareSchema("aa : int, bb : float, cc : int", query, "f");
//specifying the new aliases
query =
" l1 = load '" + INP_FILE_5FIELDS + "' as (a : int, b : float, c : int, d : int, e : int);"
+ "f = foreach l1 generate b .. d as (bb, cc, dd);"
;
compileAndCompareSchema("bb : float, cc : int, dd : int", query, "f");
//begin, end of range is same
query =
" l1 = load '" + INP_FILE_5FIELDS + "' as (a : int, b : float, c : int, d : int, e : int);"
+ "f = foreach l1 generate b .. b as (bb), $2 .. $2;"
;
compileAndCompareSchema("bb : float, c : int", query, "f");
// without aliases - two projections
query =
" l1 = load '" + INP_FILE_5FIELDS + "' as (a : int, b : int, c : int, d : int, e : int);"
+ "f = foreach l1 generate a .. c, d .. e ;"
;
compileAndCompareSchema("a : int, b : int, c : int, d : int, e : int", query, "f");
// without aliases
query =
" l1 = load '" + INP_FILE_5FIELDS + "' as (a : int, b : int, c : int, d : int, e : int);"
+ "f = foreach l1 generate a .. c ;"
;
compileAndCompareSchema("a : int, b : int, c : int", query, "f");
Iterator<Tuple> it = pigServer.openIterator("f");
List<Tuple> expectedRes =
Util.getTuplesFromConstantTupleStrings(
new String[] {
"(10,20,30)",
"(11,21,31)",
});
Util.checkQueryOutputsAfterSort(it, expectedRes);
}
private void compileAndCompareSchema(String expectedSchStr, String query, String alias)
throws IOException, ParserException {
Schema expectedSch = null;
if(expectedSchStr != null)
expectedSch = Utils.getSchemaFromString(expectedSchStr);
compileAndCompareSchema(expectedSch, query, alias);
}
private void compileAndCompareSchema(Schema expectedSch, String query,
String alias) throws IOException {
Util.registerMultiLineQuery(pigServer, query);
Schema sch = pigServer.dumpSchema(alias);
assertEquals("Checking expected schema", expectedSch, sch);
}
/**
* Test project-range in foreach with starting limit
* @throws IOException
* @throws ParserException
*/
@Test
public void testEndRangeForeachWSchema() throws IOException, ParserException {
//specifying the new aliases
String query;
query =
" l1 = load '" + INP_FILE_5FIELDS + "' as (a : int, b : float, c : int, d : int, e : int);"
+ "f = foreach l1 generate .. c as (aa, bb, cc);"
;
compileAndCompareSchema("aa : int, bb : float, cc : int", query, "f");
//col position
query =
" l1 = load '" + INP_FILE_5FIELDS + "' as (a : int, b : float, c : int, d : int, e : int);"
+ "f = foreach l1 generate .. $2 as (aa, bb, cc);"
;
compileAndCompareSchema("aa : int, bb : float, cc : int", query, "f");
//end is the beginning!
query =
" l1 = load '" + INP_FILE_5FIELDS + "' as (a : int, b : float, c : int, d : int, e : int);"
+ "f = foreach l1 generate .. $0 as (aa, bb, cc);"
;
compileAndCompareSchema("aa : int", query, "f");
query =
" l1 = load '" + INP_FILE_5FIELDS + "' as (a : int, b : float, c : int, d : int, e : int);"
+ "f = foreach l1 generate .. c as (aa, bb, cc);"
;
compileAndCompareSchema("aa : int, bb : float, cc : int", query, "f");
// without aliases
query =
" l1 = load '" + INP_FILE_5FIELDS + "' as (a : int, b : int, c : int, d : long, e : int);"
+ "f = foreach l1 generate .. $3 ;"
;
compileAndCompareSchema("a : int, b : int, c : int, d : long", query, "f");
Iterator<Tuple> it = pigServer.openIterator("f");
List<Tuple> expectedRes =
Util.getTuplesFromConstantTupleStrings(
new String[] {
"(10,20,30,40L)",
"(11,21,31,41L)",
});
Util.checkQueryOutputsAfterSort(it, expectedRes);
}
/**
* Test project-range in foreach with start limit
* @throws IOException
* @throws ParserException
*/
@Test
public void testStartRangeForeachWSchema() throws IOException, ParserException {
//specifying the new aliases
String query =
" l1 = load '" + INP_FILE_5FIELDS + "' " +
"as (a : int, b : float, c : int, d : int, e : int);"
+ "f = foreach l1 generate c .. as (aa, bb, cc);"
;
compileAndCompareSchema("aa : int, bb : int, cc : int", query, "f");
// without aliases
query =
" l1 = load '" + INP_FILE_5FIELDS + "' " +
"as (a : int, b : int, c : int, d : long, e : int);"
+ "f = foreach l1 generate $1 .. ;"
;
compileAndCompareSchema("b : int, c : int, d : long, e : int", query, "f");
//start with last column - beginning is the end!
query =
" l1 = load '" + INP_FILE_5FIELDS + "' " +
"as (a : int, b : int, c : int, d : long, e : int);"
+ "f = foreach l1 generate e .. ;"
;
compileAndCompareSchema("e : int", query, "f");
//specifying the new aliases for one
query =
" l1 = load '" + INP_FILE_5FIELDS + "' " +
"as (a : int, b : long, c : int, d : int, e : int);"
+ "f = foreach l1 generate c .. as (aa, bb, cc), b .. ;"
;
compileAndCompareSchema(
"aa : int, bb : int, cc : int, b : long, c : int, d : int, e : int",
query,
"f"
);
Iterator<Tuple> it = pigServer.openIterator("f");
List<Tuple> expectedRes =
Util.getTuplesFromConstantTupleStrings(
new String[] {
"(30,40,50,20L,30,40,50)",
"(31,41,51,21L,31,41,51)",
});
Util.checkQueryOutputsAfterSort(it, expectedRes);
}
/**
* Test multiple different types of range-project with foreach
* @throws IOException
* @throws ParserException
*/
@Test
public void testMixRangeForeachWSchema() throws IOException, ParserException {
//specifying the new aliases
String query;
query =
" l1 = load '" + INP_FILE_5FIELDS + "' as (a : int, b : float, c : int, d : int, e : int);"
+ "f = foreach l1 generate .. b, c .. d, d .. as (aa, bb);"
;
compileAndCompareSchema("a : int, b : float, c : int, d : int, aa : int, bb : int", query, "f");
// without aliases
query =
" l1 = load '" + INP_FILE_5FIELDS + "' as (a : int, b : int, c : int, d : long, e : int);"
+ "f = foreach l1 generate ..$0 as (first), e.. as (last), d .., .. b ;"
;
compileAndCompareSchema("first : int, last : int, d : long, e : int, a : int, b : int", query, "f");
Iterator<Tuple> it = pigServer.openIterator("f");
List<Tuple> expectedRes =
Util.getTuplesFromConstantTupleStrings(
new String[] {
"(10,50,40L,50,10,20)",
"(11,51,41L,51,11,21)",
});
Util.checkQueryOutputsAfterSort(it, expectedRes);
}
/**
* -ve test cases
* @throws IOException
* @throws ParserException
*/
@Test
public void testNegativeForeachWSchema() throws IOException, ParserException {
String query;
query =
" l1 = load '" + INP_FILE_5FIELDS + "' as (a : int, b : float, c : int, d : int, e : int);"
+ "f = foreach l1 generate $3 .. $1;"
;
Util.checkExceptionMessage(query, "f",
"start column appears after end column in range projection");
query =
" l1 = load '" + INP_FILE_5FIELDS + "' as (a : int, b : float, c : int, d : int, e : int);"
+ "f = foreach l1 generate c .. b;"
;
Util.checkExceptionMessage(query, "f",
"start column appears after end column in range projection");
}
/**
* -ve test cases
* @throws IOException
* @throws ParserException
*/
@Test
public void testNegativeForeachNOSchema() throws IOException, ParserException {
String query;
query =
" l1 = load '" + INP_FILE_5FIELDS + "';"
+ "f = foreach l1 generate $3 .. $1;"
;
Util.checkExceptionMessage(query, "f",
"start column appears after end column in range projection");
query =
" l1 = load '" + INP_FILE_5FIELDS + "' ;"
+ "f = foreach l1 generate a .. b;"
;
Util.checkExceptionMessage(query, "f",
"Invalid field projection. Projected field [a] does not exist.");
}
/**
* -ve test cases
* @throws IOException
* @throws ParserException
*/
@Test
public void testNegativeForeachFollowedByRange() throws IOException, ParserException {
String query =
"A = load '" + INP_FILE_5FIELDS + "' as (a0,a1,a2,a3,a4);"
+ "B = FOREACH A GENERATE a0, b1, a2, a3, a4;"
+ "C = FOREACH B GENERATE a0..a2;";
// In PIG-5335, above query was failing at parsing time
// and error message didn't even mention "b1".
// (below generateLogicalPlan was throwing ParserException)
LogicalPlan lp = generateLogicalPlan(query);
// After PIG-5335, we moved the error to be caught
// at validation phase AND have proper error message
// pointing to an invalid fieldname, b1.
boolean exceptionCaught = false;
try {
lp.validate(pigServer.getPigContext(), "test", false);
} catch (PlanValidationException ex) {
Util.checkMessageInException(ex,
"Projected field [b1] does not exist in schema: a0:bytearray,a1:bytearray,a2:bytearray,a3:bytearray,a4:bytearray");
exceptionCaught = true;
}
assertTrue("No exception was thrown from an invalid script", exceptionCaught);
}
/**
* Test foreach without schema
* @throws IOException
* @throws ParserException
*/
@Test
public void testStartRangeForeachNOSchema() throws IOException, ParserException {
String query;
// without aliases
query =
" l1 = load '" + INP_FILE_5FIELDS + "';"
+ "f = foreach l1 generate ..$3 as (a,b,c,d);"
;
compileAndCompareSchema("a : bytearray,b : bytearray,c : bytearray,d : bytearray", query, "f");
Util.registerMultiLineQuery(pigServer, query);
pigServer.explain("f", System.err);
Iterator<Tuple> it = pigServer.openIterator("f");
List<Tuple> expectedRes =
Util.getTuplesFromConstantTupleStringAsByteArray(
new String[] {
"('10','20','30','40')",
"('11','21','31','41')",
});
Util.checkQueryOutputsAfterSort(it, expectedRes);
}
/**
* Test foreach without schema
* @throws IOException
* @throws ParserException
*/
@Test
public void testMixRangeForeachNOSchema() throws IOException, ParserException {
String query;
// without aliases
query =
" l1 = load '" + INP_FILE_5FIELDS + "';"
+ "f = foreach l1 generate ..$0 as (first), $4.. as (last), $3 .., .. $1 ;"
;
compileAndCompareSchema((Schema)null, query, "f");
Util.registerMultiLineQuery(pigServer, query);
pigServer.explain("f", System.err);
Iterator<Tuple> it = pigServer.openIterator("f");
List<Tuple> expectedRes =
Util.getTuplesFromConstantTupleStringAsByteArray(
new String[] {
"('10','50','40','50','10','20')",
"('11','51','41','51','11','21')",
});
Util.checkQueryOutputsAfterSort(it, expectedRes);
}
/**
* Test foreach without schema - with some operations after the foreach
* @throws IOException
* @throws ParserException
*/
@Test
public void testRangeForeachWFilterNOSchema() throws IOException, ParserException {
String query;
query =
" l1 = load '" + INP_FILE_5FIELDS + "';"
+ "f = foreach l1 generate ..$0 as (first), $4.. as (last), $3 .., .. $1 ;"
+ " fil = filter f by $0 > 10;"
;
Util.registerMultiLineQuery(pigServer, query);
pigServer.explain("fil", System.err);
Iterator<Tuple> it = pigServer.openIterator("fil");
List<Tuple> expectedRes =
Util.getTuplesFromConstantTupleStringAsByteArray(
new String[] {
"('11','51','41','51','11','21')",
});
Util.checkQueryOutputsAfterSort(it, expectedRes);
}
@Test
public void testRangeOrderByWSchema() throws IOException, ParserException{
String query;
{
query =
" l1 = load '" + INP_FILE_5FIELDS +
"' as (a : int, b : long, c : int, d : int, e : int);"
+ " o = order l1 by .. $2 DESC ;"
;
compileAndCompareSchema("a : int, b : long, c : int, d : int, e : int", query, "o");
//check number of sort expression plans
LogicalPlan lp = createAndProcessLPlan(query);
boolean[] isAsc = {false,false,false};
checkNumExpressionPlansForSort(lp, 3, isAsc);
}
{
query =
" l1 = load '" + INP_FILE_5FIELDS +
"' as (a : int, b : long, c : int, d : int, e : int);"
+ " o = order l1 by $3 .. ;"
;
compileAndCompareSchema("a : int, b : long, c : int, d : int, e : int", query, "o");
//check number of sort expression plans
LogicalPlan lp = createAndProcessLPlan(query);
boolean[] isAsc = {true, true};
checkNumExpressionPlansForSort(lp, 2, isAsc);
}
{
query =
" l1 = load '" + INP_FILE_5FIELDS +
"' as (a : int, b : long, c : int, d : int, e : int);"
+ " o = order l1 by d .. DESC ;"
;
compileAndCompareSchema("a : int, b : long, c : int, d : int, e : int", query, "o");
//check number of sort expression plans
LogicalPlan lp = createAndProcessLPlan(query);
boolean[] isAsc = {false, false};
checkNumExpressionPlansForSort(lp, 2, isAsc);
}
{
query =
" l1 = load '" + INP_FILE_5FIELDS +
"' as (a : int, b : long, c : int, d : int, e : int);"
+ " f = foreach l1 generate *;"
+ " o = order f by $0 .. c ASC ;"
+ " lim = limit o 10; ;"
;
compileAndCompareSchema("a : int, b : long, c : int, d : int, e : int", query, "lim");
//check number of sort expression plans
LogicalPlan lp = createAndProcessLPlan(query);
boolean[] isAsc = {true, true, true};
checkNumExpressionPlansForSort(lp, 3, isAsc);
}
query =
" l1 = load '" + INP_FILE_5FIELDS +
"' as (a : int, b : long, c : int, d : int, e : int);"
+ " o = order l1 by $0 .. $4 ;"
;
compileAndCompareSchema("a : int, b : long, c : int, d : int, e : int", query, "o");
//check number of sort expression plans
LogicalPlan lp = createAndProcessLPlan(query);
boolean[] isAsc = {true,true,true,true,true};
checkNumExpressionPlansForSort(lp, 5, isAsc);
Util.registerMultiLineQuery(pigServer, query);
Iterator<Tuple> it = pigServer.openIterator("o");
List<Tuple> expectedRes =
Util.getTuplesFromConstantTupleStrings(
new String[] {
"(10,20L,30,40,50)",
"(11,21L,31,41,51)",
});
Util.checkQueryOutputs(it, expectedRes);
}
/**
* Test nested order-by with schema
* @throws IOException
* @throws ParserException
*/
@Test
public void testRangeOrderByNestedWSchema() throws IOException, ParserException{
String query;
{
query =
" l1 = load '" + INP_FILE_5FIELDS +
"' as (a : int, b : long, c : int, d : int, e : int);"
+ " g = group l1 by a;"
+ " f = foreach g { o = order l1 by .. $2 DESC; generate group, o;}"
;
String expectedSchStr = "g : int,o: {t : (a: int,b: long,c: int,d: int,e: int)}";
Schema expectedSch = getCleanedGroupSchema(expectedSchStr);
compileAndCompareSchema(expectedSch, query, "f");
//check number of sort expression plans
LogicalPlan lp = createAndProcessLPlan(query);
boolean[] isAsc = {false,false,false};
checkNumExpressionPlansForSort(lp, 3, isAsc);
}
{
query =
" l1 = load '" + INP_FILE_5FIELDS +
"' as (a : int, b : long, c : int, d : int, e : int);"
+ " g = group l1 by a;"
+ " f = foreach g { o = order l1 by d .. ; generate group, o;}"
;
String expectedSchStr = "g : int,o: {t : (a: int,b: long,c: int,d: int,e: int)}";
Schema expectedSch = getCleanedGroupSchema(expectedSchStr);
compileAndCompareSchema(expectedSch, query, "f");
//check number of sort expression plans
LogicalPlan lp = createAndProcessLPlan(query);
boolean[] isAsc = {true,true};
checkNumExpressionPlansForSort(lp, 2, isAsc);
}
{
query =
" l1 = load '" + INP_FILE_5FIELDS +
"' as (a : int, b : long, c : int, d : int, e : int);"
+ " g = group l1 by a;"
+ " f = foreach g { o = order l1 by $2 .. $3 ASC, $1..c DESC; generate group, o;}"
;
String expectedSchStr = "g : int,o: {t : (a: int,b: long,c: int,d: int,e: int)}";
Schema expectedSch = getCleanedGroupSchema(expectedSchStr);
compileAndCompareSchema(expectedSch, query, "f");
//check number of sort expression plans
LogicalPlan lp = createAndProcessLPlan(query);
boolean[] isAsc = {true,true,false,false};
checkNumExpressionPlansForSort(lp, 4, isAsc);
}
query =
" l1 = load '" + INP_FILE_5FIELDS +
"' as (a : int, b : long, c : int, d : int, e : int);"
+ " g = group l1 by a;"
+ " f = foreach g { o = order l1 by $2 .. $3 DESC, $1 ASC; generate group, o;}"
;
String expectedSchStr = "g : int,o: {t : (a: int,b: long,c: int,d: int,e: int)}";
Schema expectedSch = getCleanedGroupSchema(expectedSchStr);
compileAndCompareSchema(expectedSch, query, "f");
//check number of sort expression plans
LogicalPlan lp = createAndProcessLPlan(query);
boolean[] isAsc = {false,false,true};
checkNumExpressionPlansForSort(lp, 3, isAsc);
Util.registerMultiLineQuery(pigServer, query);
Iterator<Tuple> it = pigServer.openIterator("f");
String[] expectedRes =
new String[] {
"(10,{(10,20,30,40,50)})",
"(11,{(11,21,31,41,51)})",
};
Schema s = pigServer.dumpSchema("f");
Util.checkQueryOutputs(it, expectedRes,org.apache.pig.newplan.logical.Util.translateSchema(s),
Util.isSparkExecType(cluster.getExecType()));
}
/**
* Test nested order-by without schema
* @throws IOException
* @throws ParserException
*/
@Test
public void testRangeOrderByNestedNOSchema() throws IOException, ParserException{
String query;
{
query =
" l1 = load '" + INP_FILE_5FIELDS + "';"
+ " g = group l1 by $0;"
+ " f = foreach g { o = order l1 by .. $2 DESC; generate group, o;}"
;
String expectedSchStr = "g : bytearray, o: {t : ()}";
Schema expectedSch = getCleanedGroupSchema(expectedSchStr);
compileAndCompareSchema(expectedSch, query, "f");
//check number of sort expression plans
LogicalPlan lp = createAndProcessLPlan(query);
boolean[] isAsc = {false,false,false};
checkNumExpressionPlansForSort(lp, 3, isAsc);
}
{
query =
" l1 = load '" + INP_FILE_5FIELDS + "';"
+ " g = group l1 by $0;"
+ " f = foreach g { o = order l1 by $3 .. ; generate group, o;}"
;
String expectedSchStr = "g : bytearray, o: {t : ()}";
Schema expectedSch = getCleanedGroupSchema(expectedSchStr);
compileAndCompareSchema(expectedSch, query, "f");
//check number of sort expression plans
LogicalPlan lp = createAndProcessLPlan(query);
//project to end can't be expanded
boolean[] isAsc = {true};
checkNumExpressionPlansForSort(lp, 1, isAsc);
}
{
query =
" l1 = load '" + INP_FILE_5FIELDS + "';"
+ " g = group l1 by $1;"
+ " f = foreach g { o = order l1 by $2 .. $3 ASC, $1 .. $2 DESC; generate group, o;}"
;
String expectedSchStr = "g : bytearray, o: {t : ()}";
Schema expectedSch = getCleanedGroupSchema(expectedSchStr);
compileAndCompareSchema(expectedSch, query, "f");
//check number of sort expression plans
LogicalPlan lp = createAndProcessLPlan(query);
boolean[] isAsc = {true,true,false,false};
checkNumExpressionPlansForSort(lp, 4, isAsc);
}
query =
" l1 = load '" + INP_FILE_5FIELDS + "';"
+ " g = group l1 by 1;"
+ " f = foreach g { o = order l1 by $2 .. $3 desc; generate group, o;}"
;
String expectedSchStr = "g : int, o: {t : ()}";
Schema expectedSch = getCleanedGroupSchema(expectedSchStr);
compileAndCompareSchema(expectedSch, query, "f");
//check number of sort expression plans
LogicalPlan lp = createAndProcessLPlan(query);
boolean[] isAsc = {false,false};
checkNumExpressionPlansForSort(lp, 2, isAsc);
Util.registerMultiLineQuery(pigServer, query);
Iterator<Tuple> it = pigServer.openIterator("f");
String[] expectedRes =
new String[] {
"(1,{(11,21,31,41,51),(10,20,30,40,50)})",
};
Schema s = pigServer.dumpSchema("f");
Util.checkQueryOutputs(it, expectedRes,org.apache.pig.newplan.logical.Util.translateSchema(s),
Util.isSparkExecType(cluster.getExecType()));
}
private LOSort checkNumExpressionPlansForSort(LogicalPlan lp, int numPlans, boolean[] isAsc) {
Class<?> sortClass = org.apache.pig.newplan.logical.relational.LOSort.class;
LOSort sort = (LOSort) NewLogicalPlanUtil.getRelOpFromPlan(lp, sortClass);
assertEquals("number of sort col plans", numPlans, sort.getSortColPlans().size());
List<Boolean> ascCols = sort.getAscendingCols();
for(int i = 0; i < ascCols.size(); i++){
assertEquals("ascending order", isAsc[i], ascCols.get(i));
}
return sort;
}
private LogicalPlan createAndProcessLPlan(String query) throws FrontendException {
//TODO: create a common util function for logical plan tests
LogicalPlan lp = generateLogicalPlan(query);
lp.validate(pigServer.getPigContext(), "test", false);
return lp;
}
private LogicalPlan generateLogicalPlan(String query) {
try {
return ParserTestingUtils.generateLogicalPlan( query );
} catch(Exception ex) {
ex.printStackTrace();
Assert.fail( "Failed to generate logical plan for query [" + query + "] due to exception: " + ex );
}
return null;
}
@Test
public void testRangeOrderByMixWSchema() throws IOException, ParserException{
String query;
query =
" l1 = load '" + INP_FILE_5FIELDS +
"' as (a : int, b : long, c : int, d : int, e : int);"
+ " o = order l1 by b .. c, d .. DESC, a DESC;"
;
compileAndCompareSchema("a : int, b : long, c : int, d : int, e : int", query, "o");
//check number of sort expression plans
LogicalPlan lp = createAndProcessLPlan(query);
boolean[] isAsc = {true,true,false,false,false};
checkNumExpressionPlansForSort(lp, 5, isAsc);
Util.registerMultiLineQuery(pigServer, query);
Iterator<Tuple> it = pigServer.openIterator("o");
List<Tuple> expectedRes =
Util.getTuplesFromConstantTupleStrings(
new String[] {
"(10,20L,30,40,50)",
"(11,21L,31,41,51)",
});
Util.checkQueryOutputs(it, expectedRes);
}
@Test
public void testRangeOrderByMixNOSchema() throws IOException, ParserException{
String query;
query =
" l1 = load '" + INP_FILE_5FIELDS + "';"
+ " o = order l1 by $1 .. $2 DESC, $0 , $4 .. DESC;"
;
compileAndCompareSchema((Schema)null, query, "o");
//check number of sort expression plans
LogicalPlan lp = createAndProcessLPlan(query);
boolean[] isAsc = {false, false,true,false};
checkNumExpressionPlansForSort(lp, 4, isAsc);
Util.registerMultiLineQuery(pigServer, query);
pigServer.explain("o", System.err);
Iterator<Tuple> it = pigServer.openIterator("o");
List<Tuple> expectedRes =
Util.getTuplesFromConstantTupleStringAsByteArray(
new String[] {
"('11','21','31','41','51')",
"('10','20','30','40','50')",
});
Util.checkQueryOutputs(it, expectedRes);
}
@Test
public void testRangeOrderByStartNOSchema() throws IOException, ParserException{
String query;
query =
" l1 = load '" + INP_FILE_5FIELDS + "';"
+ " o = order l1 by $3 .. DESC;"
;
compileAndCompareSchema((Schema)null, query, "o");
//check number of sort expression plans
LogicalPlan lp = createAndProcessLPlan(query);
boolean[] isAsc = {false};
checkNumExpressionPlansForSort(lp, 1, isAsc);
Util.registerMultiLineQuery(pigServer, query);
pigServer.explain("o", System.err);
Iterator<Tuple> it = pigServer.openIterator("o");
List<Tuple> expectedRes =
Util.getTuplesFromConstantTupleStringAsByteArray(
new String[] {
"('11','21','31','41','51')",
"('10','20','30','40','50')",
});
Util.checkQueryOutputs(it, expectedRes);
}
@Test
public void testRangeOrderByStartNegNOSchema() throws IOException, ParserException{
String query;
query =
" l1 = load '" + INP_FILE_5FIELDS + "';"
+ " o = order l1 by $3 .. DESC, $1;"
;
Util.checkExceptionMessage(query, "o","Project-range to end (eg. x..)" +
" is supported in order-by only as last sort column");
}
@Test
public void testRangeGroupWSchema() throws IOException, ParserException{
String query;
{
query =
" l1 = load '" + INP_FILE_5FIELDS +
"' as (a : int, b : long, c : int, d : int, e : int);" +
" l2 = load '" + INP_FILE_5FIELDS +
"' as (a : int, b : long, c : int, d : int, e : int);" +
" g = group l1 by d .., l2 by d ..;"
;
String expectedSchStr = "grp: (d: int,e : int)," +
"l1: {t : (a: int,b: long,c: int,d: int,e: int)}," +
"l2: {t : (a: int,b: long,c: int,d: int,e: int)}";
Schema expectedSch = getCleanedGroupSchema(expectedSchStr);
compileAndCompareSchema(expectedSch, query, "g");
//check number of group expression plans
LogicalPlan lp = createAndProcessLPlan(query);
checkNumExpressionPlansForGroup(lp, 2);
}
{
query =
" l1 = load '" + INP_FILE_5FIELDS +
"' as (a : int, b : long, c : int, d : int, e : int);" +
" l2 = load '" + INP_FILE_5FIELDS +
"' as (a : int, b : long, c : int, d : int, e : int);" +
" g = group l1 by c .. $3, l2 by $3..$4;"
;
String expectedSchStr = "grp: (c: int,d : int)," +
"l1: {t : (a: int,b: long,c: int,d: int,e: int)}," +
"l2: {t : (a: int,b: long,c: int,d: int,e: int)}";
Schema expectedSch = getCleanedGroupSchema(expectedSchStr);
compileAndCompareSchema(expectedSch, query, "g");
//check number of group expression plans
LogicalPlan lp = createAndProcessLPlan(query);
checkNumExpressionPlansForGroup(lp, 2);
}
query =
" l1 = load '" + INP_FILE_5FIELDS +
"' as (a : int, b : long, c : int, d : int, e : int);" +
" l2 = load '" + INP_FILE_5FIELDS +
"' as (a : int, b : long, c : int, d : int, e : int);" +
" g = group l1 by .. c, l2 by .. c;"
;
String expectedSchStr = "grp: (a: int,b: long,c: int)," +
"l1: {t : (a: int,b: long,c: int,d: int,e: int)}," +
"l2: {t : (a: int,b: long,c: int,d: int,e: int)}";
Schema expectedSch = getCleanedGroupSchema(expectedSchStr);
compileAndCompareSchema(expectedSch, query, "g");
//check number of group expression plans
LogicalPlan lp = createAndProcessLPlan(query);
checkNumExpressionPlansForGroup(lp, 3);
Util.registerMultiLineQuery(pigServer, query);
String[] expectedRes =
new String[] {
"((10,20,30),{(10,20,30,40,50)},{(10,20,30,40,50)})",
"((11,21,31),{(11,21,31,41,51)},{(11,21,31,41,51)})",
};
Iterator<Tuple> it = pigServer.openIterator("g");
Schema s = pigServer.dumpSchema("g");
Util.checkQueryOutputs(it, expectedRes,org.apache.pig.newplan.logical.Util.translateSchema(s),
Util.isSparkExecType(cluster.getExecType()));
}
/**
* some transformations to schema, because the parser does not accept
* group as a column name in schema, and to add tuple within bag schema
* @param expectedSchStr
* @return
* @throws ParserException
* @throws FrontendException
*/
private Schema getCleanedGroupSchema(String expectedSchStr) throws ParserException, FrontendException {
Schema expectedSch = Utils.getSchemaFromString(expectedSchStr);
expectedSch.getField(0).alias = "group";
if(expectedSch.size() > 1)
expectedSch.getField(1).schema.getField(0).alias = null;
if(expectedSch.size() > 2)
expectedSch.getField(2).schema.getField(0).alias = null;
expectedSch = org.apache.pig.newplan.logical.Util.fixSchemaAddTupleInBag(expectedSch);
return expectedSch;
}
private LOCogroup checkNumExpressionPlansForGroup(LogicalPlan lp, int numPlans) {
Class<?> groupClass = org.apache.pig.newplan.logical.relational.LOCogroup.class;
LOCogroup grp = (LOCogroup) NewLogicalPlanUtil.getRelOpFromPlan(lp, groupClass);
for( int inp : grp.getExpressionPlans().keySet()){
List<LogicalExpressionPlan> plans = grp.getExpressionPlans().get(inp);
assertEquals("number of group-by plans", numPlans, plans.size());
}
return grp;
}
@Test
public void testRangeCoGroupMixWSchema() throws IOException, ParserException{
String query;
query =
" l1 = load '" + INP_FILE_5FIELDS +
"' as (a : int, b : long, c : int, d : int, e : int);" +
" l2 = load '" + INP_FILE_5FIELDS +
"' as (a : int, b : long, c : int, d : int, e : int);" +
" g = group l1 by (a + b, c .. d, e.. ), l2 by ($0 + $1, c..d, $4..);"
;
String expectedSchStr = "grp: (x : long, c :int , d :int, e : int)," +
"l1: {t : (a: int,b: long,c: int,d: int,e: int)}," +
"l2: {t : (a: int,b: long,c: int,d: int,e: int)}";
Schema expectedSch = getCleanedGroupSchema(expectedSchStr);
expectedSch.getField(0).schema.getField(0).alias = null;
compileAndCompareSchema(expectedSch, query, "g");
//check number of group expression plans
LogicalPlan lp = createAndProcessLPlan(query);
checkNumExpressionPlansForGroup(lp, 4);
Util.registerMultiLineQuery(pigServer, query);
String[] expectedRes =
new String[] {
"((30,30,40,50),{(10,20,30,40,50)},{(10,20,30,40,50)})",
"((32,31,41,51),{(11,21,31,41,51)},{(11,21,31,41,51)})",
};
Iterator<Tuple> it = pigServer.openIterator("g");
Schema s = pigServer.dumpSchema("g");
Util.checkQueryOutputs(it,expectedRes, org.apache.pig.newplan.logical.Util.translateSchema(s),
Util.isSparkExecType(cluster.getExecType()));
}
@Test
public void testRangeGroupMixWSchema() throws IOException, ParserException{
String query;
{
query =
" l1 = load '" + INP_FILE_5FIELDS +
"' as (a : int, b : long, c : int, d : int, e : int);" +
" g = group l1 by b .. c;"
;
String expectedSchStr = "grp: (b : long, c :int)," +
"l1: {t : (a: int,b: long,c: int,d: int,e: int)}";
Schema expectedSch = getCleanedGroupSchema(expectedSchStr);
compileAndCompareSchema(expectedSch, query, "g");
//check number of group expression plans
LogicalPlan lp = createAndProcessLPlan(query);
checkNumExpressionPlansForGroup(lp, 2);
}
query =
" l1 = load '" + INP_FILE_5FIELDS +
"' as (a : int, b : long, c : int, d : int, e : int);" +
" g = group l1 by $2 .. ;" +
" lim = limit g 2;"
;
String expectedSchStr = "grp: (c :int , d :int, e : int)," +
"l1: {t : (a: int,b: long,c: int,d: int,e: int)}";
Schema expectedSch = getCleanedGroupSchema(expectedSchStr);
compileAndCompareSchema(expectedSch, query, "lim");
//check number of group expression plans
LogicalPlan lp = createAndProcessLPlan(query);
checkNumExpressionPlansForGroup(lp, 3);
Util.registerMultiLineQuery(pigServer, query);
String[] expectedRes =
new String[] {
"((30,40,50),{(10,20L,30,40,50)})",
"((31,41,51),{(11,21L,31,41,51)})",
};
Iterator<Tuple> it = pigServer.openIterator("lim");
Schema s = pigServer.dumpSchema("lim");
Util.checkQueryOutputs(it, expectedRes,org.apache.pig.newplan.logical.Util.translateSchema(s),
Util.isSparkExecType(cluster.getExecType()));
}
@Test
public void testRangeGroupMixNOSchema() throws IOException, ParserException{
String query;
{
query =
" l1 = load '" + INP_FILE_5FIELDS + "';" +
" g = group l1 by .. $2;"
;
String expectedSchStr = "g : (duma, dumb, dumc), l1: {t : ()}";
Schema expectedSch = getCleanedGroupSchema(expectedSchStr);
setAliasesToNull(expectedSch.getField(0).schema);
compileAndCompareSchema(expectedSch, query, "g");
//check number of group expression plans
LogicalPlan lp = createAndProcessLPlan(query);
checkNumExpressionPlansForGroup(lp, 3);
}
{
query =
" l1 = load '" + INP_FILE_5FIELDS + "';" +
" g = group l1 by $3 .. $3;"
;
String expectedSchStr = "g : bytearray, l1: {t : ()}";
Schema expectedSch = getCleanedGroupSchema(expectedSchStr);
compileAndCompareSchema(expectedSch, query, "g");
//check number of group expression plans
LogicalPlan lp = createAndProcessLPlan(query);
checkNumExpressionPlansForGroup(lp, 1);
}
query =
" l1 = load '" + INP_FILE_5FIELDS + "';" +
" g = group l1 by $2 .. ;"
;
String expectedSchStr = "grp: (), l1: {t : ()}";
Schema expectedSch = getCleanedGroupSchema(expectedSchStr);
compileAndCompareSchema(expectedSch, query, "g");
//check number of group expression plans
LogicalPlan lp = createAndProcessLPlan(query);
checkNumExpressionPlansForGroup(lp, 1);
Util.registerMultiLineQuery(pigServer, query);
String[] expectedRes =
new String[] {
"((30,40,50),{(10,20,30,40,50)})",
"((31,41,51),{(11,21,31,41,51)})",
};
Iterator<Tuple> it = pigServer.openIterator("g");
Schema s = pigServer.dumpSchema("g");
Util.checkQueryOutputs(it, expectedRes, org.apache.pig.newplan.logical.Util.translateSchema(s),
Util.isSparkExecType(cluster.getExecType()));
}
private void setAliasesToNull(Schema schema) {
for(FieldSchema fs : schema.getFields()){
fs.alias = null;
}
}
@Test
public void testRangeJoinMixWSchema() throws IOException, ParserException{
String query;
query =
" l1 = load '" + INP_FILE_5FIELDS +
"' as (a : int, b : long, c : int, d : int, e : int);" +
" l2 = load '" + INP_FILE_5FIELDS +
"' as (a : int, b : long, c : int, d : int, e : int);" +
" j = join l1 by (a + b, c .. d, e.. ), l2 by ($0 + $1, c..d, $4..);"
;
String expectedSchStr = "l1::a: int, l1::b: long, l1::c: int, l1::d: int, l1::e: int," +
"l2::a: int, l2::b: long, l2::c: int, l2::d: int, l2::e: int";
compileAndCompareSchema(expectedSchStr, query, "j");
//check number of group expression plans
LogicalPlan lp = createAndProcessLPlan(query);
checkNumExpressionPlansForJoin(lp, 4);
Util.registerMultiLineQuery(pigServer, query);
String[] expectedRes =
new String[] {
"(10,20,30,40,50,10,20,30,40,50)",
"(11,21,31,41,51,11,21,31,41,51)",
};
Iterator<Tuple> it = pigServer.openIterator("j");
Schema s = pigServer.dumpSchema("j");
Util.checkQueryOutputs(it, expectedRes, org.apache.pig.newplan.logical.Util.translateSchema(s),
Util.isSparkExecType(cluster.getExecType()));
}
@Test
public void testRangeJoinMixNOSchema() throws IOException, ParserException{
String query;
query =
" l1 = load '" + INP_FILE_5FIELDS + "';" +
" l2 = load '" + INP_FILE_5FIELDS + "';" +
" j = join l1 by $0 .. $3, l2 by $0 .. $3;"
;
compileAndCompareSchema((Schema)null, query, "j");
//check number of group expression plans
LogicalPlan lp = createAndProcessLPlan(query);
checkNumExpressionPlansForJoin(lp, 4);
Util.registerMultiLineQuery(pigServer, query);
String[] expectedRes =
new String[] {
"(10,20,30,40,50,10,20,30,40,50)",
"(11,21,31,41,51,11,21,31,41,51)",
};
Iterator<Tuple> it = pigServer.openIterator("j");
Schema s = pigServer.dumpSchema("j");
Util.checkQueryOutputs(it, expectedRes, org.apache.pig.newplan.logical.Util.translateSchema(s),
Util.isSparkExecType(cluster.getExecType()));
}
@Test
public void testRangeCoGroupNegNoSchema() throws IOException, ParserException{
String query;
//cogroup
query =
" l1 = load '" + INP_FILE_5FIELDS + "';" +
" l2 = load '" + INP_FILE_5FIELDS + "';" +
" g = cogroup l1 by ($0 .. ), l2 by ($0 .. );";
Util.checkExceptionMessage(query, "g", "Cogroup/Group by '*' or 'x..' " +
"(range of columns to the end) " +
"is only allowed if the input has a schema");
}
private LOJoin checkNumExpressionPlansForJoin(LogicalPlan lp, int numPlans) {
Class<?> joinClass = org.apache.pig.newplan.logical.relational.LOJoin.class;
LOJoin join = (LOJoin) NewLogicalPlanUtil.getRelOpFromPlan(lp, joinClass);
for( int inp : join.getExpressionPlans().keySet()){
List<LogicalExpressionPlan> plans = join.getExpressionPlans().get(inp);
assertEquals("number of join exp plans", numPlans, plans.size());
}
return join;
}
}