blob: da3c44dac84674e9b6edbd2f5e9fc09f4d5fbaeb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.parser;
import static org.apache.pig.builtin.mock.Storage.resetData;
import static org.apache.pig.builtin.mock.Storage.tuple;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import org.antlr.runtime.MismatchedTokenException;
import org.apache.pig.PigServer;
import org.apache.pig.builtin.mock.Storage.Data;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.util.Utils;
import org.apache.pig.test.TestEvalPipeline.MapUDF;
import org.apache.pig.test.Util;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import com.google.common.collect.Lists;
public class TestLogicalPlanGenerator {
static File command;
private PigServer pigServer;
@Before
public void setUp() throws Exception {
pigServer = new PigServer(Util.getLocalTestMode());
}
@BeforeClass
public static void oneTimeSetup() throws IOException, Exception {
// Perl script
String[] script =
new String[] {
"#!/usr/bin/perl",
"open(INFILE, $ARGV[0]) or die \"Can't open \".$ARGV[0].\"!: $!\";",
"while (<INFILE>) {",
" chomp $_;",
" print STDOUT \"$_\n\";",
" print STDERR \"STDERR: $_\n\";",
"}",
};
command = Util.createInputFile("script", "pl", script);
}
@Test
public void test1() throws Exception {
String query = "A = load 'x' as ( u:int, v:long, w:bytearray); " +
"B = limit A 100; " +
"C = filter B by 2 > 1; " +
"D = load 'y' as (d1, d2); " +
"E = join C by ( $0, $1 ), D by ( d1, d2 ) using 'replicated' parallel 16; " +
"F = store E into 'output';";
generateLogicalPlan( query );
}
@Test
public void test2() throws Exception {
String query = "A = load 'x' as ( u:int, v:long, w:bytearray); " +
"B = distinct A partition by org.apache.pig.Identity; " +
"C = sample B 0.49; " +
"D = order C by $0, $1; " +
"E = load 'y' as (d1, d2); " +
"F = union onschema D, E; " +
"G = load 'z' as (g1:int, g2:tuple(g21, g22)); " +
"H = cross F, G; " +
"split H into I if 10 > 5, J if 'world' eq 'hello', K if 77 <= 200; " +
"L = store J into 'output';";
generateLogicalPlan( query );
}
@Test
public void test3() throws Exception {
String query = "a = load '1.txt' as (name, age, gpa);" +
"b = group a by name PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner2;" +
"c = foreach b generate group, COUNT(a.age);" +
"store c into 'y';";
generateLogicalPlan( query );
}
private void generateLogicalPlan(String query) throws Exception {
ParserTestingUtils.generateLogicalPlan( query );
}
@Test
public void test4() throws Exception {
String query = "A = load 'x'; " +
"B = mapreduce '" + "myjar.jar" + "' " +
"Store A into 'table_testNativeMRJobSimple_input' "+
"Load 'table_testNativeMRJobSimple_output' "+
"`org.apache.pig.test.utils.WordCount -files " + "file " +
"table_testNativeMRJobSimple_input table_testNativeMRJobSimple_output " +
"stopworld.file" + "`;" +
"C = Store B into 'output';";
generateLogicalPlan( query );
}
// Test define function.
@Test
public void test5() throws Exception {
String query = "define myudf org.apache.pig.builtin.PigStorage( ',' );" +
"A = load 'x' using myudf;" +
"store A into 'y';";
generateLogicalPlan( query );
}
@Test
public void test6() throws Exception {
String query = "A = load 'x' as ( a : int, b, c : chararray );" +
"B = group A by ( a, $2 );" +
"store B into 'y';";
generateLogicalPlan( query );
}
@Test
public void test7() throws Exception {
String query = "A = load 'x' as ( a : int, b, c : chararray );" +
"B = foreach A generate a, $2;" +
"store B into 'y';";
generateLogicalPlan( query );
}
@Test
public void test8() throws Exception {
String query = "A = load 'x' as ( a : int, b, c : chararray );" +
"B = group A by a;" +
"C = foreach B { S = A.b; generate S; };" +
"store C into 'y';";
generateLogicalPlan( query );
}
@Test
public void test9() throws Exception {
String query = "A = load 'x' as ( a : bag{ T:tuple(u, v) }, c : int, d : long );" +
"B = foreach A { R = a; S = R.u; T = limit S 100; generate S, T, c + d/5; };" +
"store B into 'y';";
generateLogicalPlan( query );
}
@Test
public void test10() throws Exception {
String query = "A = load 'x' as ( a : bag{ T:tuple(u, v) }, c : int, d : long );" +
"B = foreach A { S = a; T = limit S 100; generate T; };" +
"store B into 'y';";
generateLogicalPlan( query );
}
@Test
public void test11() throws Exception {
String query = "A = load 'x' as ( a : bag{ T:tuple(u, v) }, c : int, d : long );" +
"B = foreach A { T = limit a 100; generate T; };" +
"store B into 'y';";
generateLogicalPlan( query );
}
@Test
public void test12() throws Exception {
String query = "define CMD `perl GroupBy.pl '\t' 0 1` ship('"+Util.encodeEscape(command.toString())+"');" +
"A = load 'x';" +
"B = group A by $0;" +
"C = foreach B {" +
" D = order A by $1; " +
" generate flatten(D);" +
"};" +
"E = stream C through CMD;" +
"store E into 'y';";
generateLogicalPlan( query );
}
@Test
public void test13() throws Exception {
String query = "define CMD `perl PigStreaming.pl` ship('"+Util.encodeEscape(command.toString())+"') stderr('CMD');" +
"A = load 'x';" +
"C = stream A through CMD;" +
"store C into 'y';";
generateLogicalPlan( query );
}
@Test
public void test14() throws Exception {
String query = "a = load 'x1' using PigStorage() as (name, age:int, gpa);" +
"b = load 'x2' as (name, age, registration, contributions);" +
"e = cogroup a by name, b by name parallel 8;" +
"f = foreach e generate group, SUM(a.age) as s;" +
"g = filter f by s>0;" +
"store g into 'y';";
generateLogicalPlan( query );
}
@Test
public void test15() throws Exception {
String query = "a = load 'x1' using PigStorage() as (name, age, gpa);" +
"b = group a all;" +
"c = foreach b generate AVG(a.age) as avg; " +
"d = load 'x2' using PigStorage() as (name, age, registration, contributions);" +
"e = group d all;" +
"f = foreach e generate AVG(d.age) as avg;" +
"y = foreach a generate age/c.avg, age/f.avg;" +
"store y into 'y';";
generateLogicalPlan( query );
}
@Test
public void test16() throws Exception {
String query = "AA = load 'x';" +
"A = foreach (group (filter AA by $0 > 0) all) generate flatten($1);" +
"store A into 'y';";
generateLogicalPlan( query );
}
@Test
public void test17() throws Exception {
String query = "store ( load 'x' ) into 'y';";
generateLogicalPlan( query );
}
@Test
public void test18() throws Exception {
String query = "A = load 'x';\n" +
"C = group (foreach A generate $0 parallel 5) all;";
generateLogicalPlan( query );
}
@Test
public void test19() throws Exception {
String query = "A = load 'x' as (u:map[], v);\n" +
"B = foreach A { T = (chararray)u#'hello'#'world'; generate T; };";
generateLogicalPlan( query );
}
@Test
public void test20() throws Exception {
String query = "A = load 'x' using PigStorage() as (a:int,b:chararray);\n" +
"B = foreach A { C = TOMAP()#'key1'; generate C as C; };";
generateLogicalPlan( query );
}
@Test
public void test21() throws Exception {
String query = "A = load 'x' as (u, v);\n" +
"B = foreach A { S = u; T = org.apache.pig.builtin.TOMAP(); generate S, T;};";
generateLogicalPlan( query );
}
@Test
public void test22() throws Exception {
String query = "A = (load 'x' as (u, v));\n" +
"B = (group (foreach A generate $0 parallel 5) all);";
generateLogicalPlan( query );
}
@Test
public void test23() throws Exception {
String query = "a = (load 'x1' using PigStorage() as (name, age, gpa));" +
"b = (group a all);" +
"c = (foreach b generate AVG(a.age) as avg); " +
"d = (load 'x2' using PigStorage() as (name, age, registration, contributions));" +
"e = (group d all);" +
"f = (foreach e generate AVG(d.age) as avg);" +
"y = (foreach a generate age/c.avg, age/f.avg);" +
"store y into 'y';";
generateLogicalPlan( query );
}
@Test
public void test24() throws Exception {
String query = "a = (load 'x1' using PigStorage() as (name, age:int, gpa));" +
"b = (load 'x2' as (name, age, registration, contributions));" +
"e = (cogroup a by name, b by name parallel 8);" +
"f = (foreach e generate group, SUM(a.age) as s);" +
"g = (filter f by s>0);" +
"(store g into 'y');";
generateLogicalPlan( query );
}
@Test
public void test25() throws Exception {
String query = "A = (load 'x' as ( u:int, v:long, w:bytearray)); " +
"B = (distinct A partition by org.apache.pig.Identity); " +
"C = (sample B 0.49); " +
"D = (order C by $0, $1); " +
"E = (load 'y' as (d1, d2)); " +
"F = (union onschema D, E); " +
"G = (load 'z' as (g1:int, g2:tuple(g21, g22))); " +
"H = (cross F, G); " +
"split H into I if 10 > 5, J if 'world' eq 'hello', K if 77 <= 200; " +
"L = (store J into 'output');";
generateLogicalPlan( query );
}
@Test
public void testCubeBasic() throws Exception {
String query = "a = load 'input' as (x:chararray,y:chararray,z:long);"
+ "b = cube a by cube(x,y);"
+ "c = foreach b generate flatten(group) as (x,y), COUNT(cube) as count, SUM(cube.z) as total;"
+ "store c into 'output';";
generateLogicalPlan(query);
}
@Test
public void testCubeMultipleIAlias() throws Exception {
String query = "a = load 'input' as (x:chararray,y:chararray,z:long);"
+ "a = load 'input' as (x,y:chararray,z:long);"
+ "a = load 'input' as (x:chararray,y:chararray,z:long);"
+ "b = cube a by rollup(x,y);"
+ "c = foreach b generate flatten(group) as (x,y), COUNT(cube) as count, SUM(cube.z) as total;"
+ "store c into 'c';";
generateLogicalPlan(query);
}
@Test
public void testCubeAfterForeach() throws Exception {
String query = "a = load 'input' as (x:chararray,y:chararray,z:long);"
+ "b = foreach a generate x as type,y as location,z as number;"
+ "c = cube b by cube(type,location);"
+ "d = foreach c generate flatten(group) as (type,location), COUNT(cube) as count, SUM(cube.number) as total;"
+ "store d into 'd';";
generateLogicalPlan(query);
}
@Test
public void testFilter1() throws Exception {
String query = "A = load 'x' as ( u:int, v:long, w:bytearray); " +
"B = filter A by 2 > 1;\n" +
"store B into 'y';";
generateLogicalPlan( query );
}
@Test
public void testFilter2() throws Exception {
generateLogicalPlan(
"A = load 'x' as ( u:int, v:long, w:bytearray); " +
"B = filter A by u is null;\n");
}
@Test
public void testFilter3() throws Exception {
generateLogicalPlan(
"A = load 'x' as ( u:int, v:long, w:bytearray); " +
"B = filter A by u is not null;\n");
}
@Test
public void testFilter4() throws Exception {
generateLogicalPlan("b = filter (load 'd.txt' as (id:int, v1, v2)) by (id > 3) AND (v1 is null);");
}
@Test
public void testScopedAlias() throws Exception {
String query = "A = load 'x' as ( u:int, v:long, w:bytearray);" +
"B = load 'y' as ( u:int, x:int, y:chararray);" +
"C = join A by u, B by u;" +
"D = foreach C generate A::u, B::u, v, x;" +
"store D into 'z';";
generateLogicalPlan ( query );
}
@Test
public void testConstantWithNegativeValue() throws Exception {
String query = "A = load 'x' as ( u:int, v:long, w:bytearray);" +
"B = foreach A generate u, { ( -1, -15L, -3.5, -4.03F, -2.3e3 ) };" +
"store B into 'y';";
generateLogicalPlan ( query );
}
@Test(expected = NonProjectExpressionException.class)
public void testNegative1() throws Exception {
String query = "A = load 'x' as ( a : bag{ T:tuple(u, v) }, c : int, d : long );" +
"B = foreach A { S = c * 2; T = limit S 100; generate T; };" +
"store B into 'y';";
ParserTestingUtils.generateLogicalPlan( query );
}
@Test(expected = MismatchedTokenException.class)
public void testNegative2() throws Exception {
String query = "ship = load 'x';";
try {
ParserTestingUtils.generateLogicalPlan( query );
} catch(Exception ex) {
MismatchedTokenException mex = (MismatchedTokenException)ex;
assertTrue( mex.token.getText().equals("ship") );
throw ex;
}
}
@Test(expected = MismatchedTokenException.class)
public void testNegative3() throws Exception {
String query = "A = load 'y'; all = load 'x';";
try {
ParserTestingUtils.generateLogicalPlan( query );
} catch(Exception ex) {
MismatchedTokenException mex = (MismatchedTokenException)ex;
assertTrue( mex.token.getText().equals("all") );
throw ex;
}
}
@Test
public void testMultilineFunctionArgument() throws Exception {
String query = "LOAD 'testIn' \n" +
"USING PigStorage ('\n');";
generateLogicalPlan( query );
}
@Test
// See PIG-2320
public void testInlineOpInGroup() throws Exception {
String query = "a = load 'data1' as (x:int); \n" +
"a_1 = filter (group a by x) by COUNT(a) > 0;";
generateLogicalPlan( query );
}
@Test
public void testRank01() throws Exception {
String query = "A = LOAD 'data4' AS (name:chararray,surname:chararray,sales:double,code:int);"
+ "B = rank A by sales;" + "store B into 'rank01_test';";
generateLogicalPlan(query);
}
@Test
public void testRank02() throws Exception {
String query = "A = LOAD 'data4' AS (name:chararray,surname:chararray,sales:double,code:int);"
+ "C = rank A by sales DENSE;" + "store C into 'rank02_test';";
generateLogicalPlan(query);
}
@Test
public void testRank03() throws Exception {
String query = "A = load 'test02' using PigStorage(',') as (firstname:chararray,lastname:chararray,rownumberPrev:int,rankPrev:int,denserankPrev:int,quartilePrev:int,sales:double,postalcode:int);"
+ "B = rank A;" + "store B into 'rank03_test';";
generateLogicalPlan(query);
}
@Test
public void testRank04() throws Exception {
String query = "A = load 'test02' using PigStorage(',') as (firstname:chararray,lastname:chararray,rownumberPrev:int,rankPrev:int,denserankPrev:int,quartilePrev:int,sales:double,postalcode:int);"
+ "C = rank A by postalcode DESC;"
+ "store C into 'rank04_test';";
generateLogicalPlan(query);
}
@Test
public void testRank05() throws Exception {
String query = "A = load 'test02' using PigStorage(',') as (firstname:chararray,lastname:chararray,rownumberPrev:int,rankPrev:int,denserankPrev:int,quartilePrev:int,sales:double,postalcode:int);"
+ "D = rank A by postalcode DENSE;"
+ "store D into 'rank05_test';";
generateLogicalPlan(query);
}
@Test
public void testRank06() throws Exception {
String query = "A = load 'data' as (x:int,y:chararray,z:int,rz:chararray);"
+ "C = rank A by x..rz;";
generateLogicalPlan(query);
}
@Test
public void testRank07() throws Exception {
String query = "A = load 'data' as (x:int,y:chararray,z:int,rz:chararray);"
+ "C = rank A by x ASC, y DESC;";
generateLogicalPlan(query);
}
@Test
public void testRank08() throws Exception {
String query = "A = load 'data' as (x:int,y:chararray,z:int,rz:chararray);"
+ "C = rank A;";
generateLogicalPlan(query);
}
@Test
public void testCast1() throws Exception {
String query = "data = LOAD 'data.txt' AS (num:CHARARRAY);" +
"numbers = FOREACH data GENERATE (INT) num;";
generateLogicalPlan(query);
}
@Test
public void testCast2() throws Exception {
generateLogicalPlan(
"sds = LOAD '/my/data/location' AS (simpleFields:map[], mapFields:map[], listMapFields:map[]); " +
"queries_rand = FOREACH sds GENERATE (CHARARRAY) (mapFields#'page_params'#'query') AS query_string;");
}
@Test
public void testBoolean1() throws Exception {
generateLogicalPlan(
"A = load 'INPUT_FILE' as (id:int, fruit);" +
"B = group A by id;" +
"C = foreach B generate group, " +
"((org.apache.pig.test.utils.AccumulatorBagCount(A)>1 and " +
"org.apache.pig.test.utils.AccumulatorBagCount(A)<3)?0:1);");
}
@Test
public void testBoolean2() throws Exception {
generateLogicalPlan(
"A = load 'INPUT_FILE' as (id:int, fruit);" +
"B = group A by id;" +
"C = foreach B generate group, " +
"((org.apache.pig.test.utils.AccumulatorBagCount(A)>3 or " +
"org.apache.pig.test.utils.AccumulatorBagCount(A)<2)?0:1);");
}
@Test
public void testBoolean3() throws Exception {
generateLogicalPlan(
"A = load 'INPUT_FILE' as (id:int, fruit);" +
"B = filter A by id < 5 and ( fruit neq 'cabbage' or id == 17 );");
}
@Test
public void testBoolean4() throws Exception {
generateLogicalPlan(
"a = load '1.txt' as (a0, a1);" +
"b = foreach a generate (a0 is not null ? 0 : 1);");
}
@Test
public void testBoolean5() throws Exception {
generateLogicalPlan(
"a = load '1.txt' as (a0, a1);" +
"b = foreach a generate (a0 is null ? 0 : 2);");
}
@Test
public void testAccumWithRegexp() throws Exception {
generateLogicalPlan(
"A = load 'AccumulatorInput.txt' as (id:int, fruit);" +
"B = group A by id;" +
"C = foreach B generate group, (((chararray)org.apache.pig.test.utils.AccumulatorBagCount(A)) matches '1*' ?0:1);");
}
@Test
public void testMapsideGroupByMultipleColumns() throws Exception {
generateLogicalPlan(
"A = LOAD 'MapSideGroupInput.txt' using org.apache.pig.test.TestCollectedGroup$DummyCollectableLoader() as (id, name, grade);" +
"B = group A by (id, name) using 'collected';");
}
@Test
public void testMapUDF() throws Exception {
generateLogicalPlan(
"A = LOAD 'someData';" +
"B = foreach A generate " + MapUDF.class.getName() + "($0) as mymap;" +
"C = foreach B {" +
"generate (double)mymap#'double' as d, " +
"(long)mymap#'long' + (float)mymap#'float' as float_sum, " +
"CONCAT((chararray) mymap#'string', ' World!'), " +
"mymap#'int' * 10, " +
"(bag{tuple()}) mymap#'bag' as mybag, " +
"(tuple()) mymap#'tuple' as mytuple, " +
"(map[])mymap#'map' as mapInMap, " +
"mymap#'dba' as dba;" +
"};");
}
@Test
public void testSimpleMapCast() throws Exception {
generateLogicalPlan(
"a = load 'testSimpleMapCast' as (m);" +
"b = foreach a generate ([int])m;");
}
@Test
public void testComplexCast() throws Exception {
generateLogicalPlan(
"a = load 'testComplexCast' as (m);" +
"b = foreach a generate ([{(i:int,j:int)}])m;");
}
@Test
public void testNullConstant() throws Exception {
generateLogicalPlan(
"a = load 'foo' as (x:int, y:double, str:chararray);" +
"b = foreach a generate {(null)}, ['2'#null];");
}
@Test
public void testEmptyTupConst() throws Exception {
generateLogicalPlan( "a = foreach (load 'b') generate ({});");
}
@Test
public void testJoin1() throws Exception {
generateLogicalPlan(
"A = load 'hat' as (m:map[]);" +
"B = filter A by m#'cond'==1;" +
"C = filter B by m#'key1'==1;" +
"D = filter B by m#'key2'==2;" +
"E = join C by m#'key1', D by m#'key1';");
}
// See: PIG-2937
@Test
public void testRelationAliasInNestedForeachWhereUnspecified() throws Exception {
Data data = resetData(pigServer);
List<Tuple> values = Lists.newArrayList();
for (int i = 0; i < 10; i++) {
values.add(tuple(i % 3 == 0 ? null : "a", "b"));
}
data.set("foo", values);
pigServer.registerQuery("raw_data = load 'foo' using mock.Storage() as (field_a:chararray, field_b:chararray);");
pigServer.registerQuery("records = foreach raw_data {" +
" generated_field = (field_a is null ? '-' : field_b);"+
" GENERATE" +
" field_a," +
" field_b," +
" generated_field; }");
pigServer.registerQuery("use_records = foreach records generate generated_field, CONCAT(generated_field,generated_field);");
Schema expectedSchema = Utils.getSchemaFromString("field_a:chararray, field_b:chararray, generated_field:chararray");
assertEquals(expectedSchema, pigServer.dumpSchema("records"));
for (Iterator<Tuple> it = pigServer.openIterator("records"); it.hasNext();) {
Tuple t = it.next();
String a = (String)t.get(0);
String b = (String)t.get(1);
assertEquals("b", b);
if (a == null) {
assertEquals("-", t.get(2));
} else {
assertEquals("a", a);
assertEquals(b, t.get(2));
}
}
for (Iterator<Tuple> it = pigServer.openIterator("use_records"); it.hasNext();) {
Tuple t = it.next();
String x = (String)t.get(0);
assertEquals(x+x, t.get(1));
}
}
}