blob: 14114518771f3b861c484d82862bb229f148e49a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.test;
import static org.apache.pig.ExecType.MAPREDUCE;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.pig.PigServer;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DefaultTuple;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.parser.ParserException;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import junit.framework.TestCase;
/**
* Test PORelationToExprProject which is a special project
* introduced to handle the following case:
* This project is Project(*) introduced after a relational operator
* to supply a bag as output (as an expression). This project is either
* providing the bag as input to a successor expression operator or is
* itself the leaf in a inner plan
* If the predecessor relational operator sends an EOP
* then send an empty bag first to signal "empty" output
* and then send an EOP
* NOTE: A Project(*) of return type BAG whose predecessor is
* from an outside plan (i.e. not in the same inner plan as the project)
* will NOT lead us here. So a query like:
* a = load 'baginp.txt' as (b:bag{t:tuple()}); b = foreach a generate $0; dump b;
* will go through a regular project (without the following flag)
*/
@RunWith(JUnit4.class)
public class TestRelationToExprProject extends TestCase {
private static MiniCluster cluster = MiniCluster.buildCluster();
private PigServer pigServer;
private static final String TEST_FILTER_COUNT3_INPUT="test/org/apache/pig/test/data/TestRelationToExprProjectInput.txt";
/* (non-Javadoc)
* @see junit.framework.TestCase#setUp()
*/
@Before
public void setUp() throws Exception {
pigServer = new PigServer(MAPREDUCE, cluster.getProperties());
}
/* (non-Javadoc)
* @see junit.framework.TestCase#tearDown()
*/
@After
public void tearDown() throws Exception {
pigServer.shutdown();
}
@AfterClass
public static void oneTimeTearDown() throws Exception {
cluster.shutDown();
}
// based on the script provided in the jira issue:PIG-514
// tests that when a filter inside a foreach filters away all tuples
// for a group, an empty bag is still provided to udfs whose
// input is the filter
@Test
public void testFilterCount1() throws IOException, ParserException {
String[] inputData = new String[] {"1\t1\t3","1\t2\t3", "2\t1\t3", "2\t1\t3"};
Util.createInputFile(cluster, "test.txt", inputData);
String script = "test = load 'test.txt' as (col1: int, col2: int, col3: int);" +
"test2 = group test by col1;" +
"test3 = foreach test2 {" +
" filter_one = filter test by (col2==1);" +
" filter_notone = filter test by (col2!=1);" +
" generate group as col1, COUNT(filter_one) as cnt_one, COUNT(filter_notone) as cnt_notone;};";
Util.registerMultiLineQuery(pigServer, script);
Iterator<Tuple> it = pigServer.openIterator("test3");
Tuple[] expected = new DefaultTuple[2];
expected[0] = (Tuple) Util.getPigConstant("(1,1L,1L)");
expected[1] = (Tuple) Util.getPigConstant("(2,2L,0L)");
Object[] results = new Object[2];
int i = 0;
while(it.hasNext()) {
if(i == 2) {
fail("Got more tuples than expected!");
}
Tuple t = it.next();
if(t.get(0).equals(1)) {
// this is the first tuple
results[0] = t;
} else {
results[1] = t;
}
i++;
}
for (int j = 0; j < expected.length; j++) {
assertTrue(expected[j].equals(results[j]));
}
Util.deleteFile(cluster, "test.txt");
}
// based on jira PIG-710
// tests that when a filter inside a foreach filters away all tuples
// for a group, an empty bag is still provided to udfs whose
// input is the filter
@Test
public void testFilterCount2() throws IOException, ParserException {
Util.createInputFile(cluster, "filterbug.data", new String[] {
"a\thello" ,
"a\tgoodbye" ,
"b\tgoodbye" ,
"c\thello" ,
"c\thello" ,
"c\thello" ,
"d\twhat"
});
String query = "A = load 'filterbug.data' using PigStorage() as ( id:chararray, str:chararray );" +
"B = group A by ( id );" +
"Cfiltered = foreach B {" +
" D = filter A by (" +
" str matches 'hello'" +
" );" +
" matchedcount = COUNT(D);" +
" generate" +
" group," +
" matchedcount as matchedcount," +
" A.str;" +
" };";
Util.registerMultiLineQuery(pigServer, query);
Iterator<Tuple> it = pigServer.openIterator("Cfiltered");
Map<String, Tuple> expected = new HashMap<String, Tuple>();
expected.put("a", (Tuple) Util.getPigConstant("('a',1L,{('hello'),('goodbye')})"));
expected.put("b", (Tuple) Util.getPigConstant("('b',0L,{('goodbye')})"));
expected.put("c", (Tuple) Util.getPigConstant("('c',3L,{('hello'),('hello'),('hello')})"));
expected.put("d", (Tuple) Util.getPigConstant("('d',0L,{('what')})"));
int i = 0;
while(it.hasNext()) {
Tuple actual = it.next();
assertEquals(expected.get(actual.get(0)), actual);
i++;
}
assertEquals(4, i);
Util.deleteFile(cluster, "filterbug.data");
}
// based on jira PIG-739
// tests that when a filter inside a foreach filters away all tuples
// for a group, an empty bag is still provided to udfs whose
// input is the filter
@Test
public void testFilterCount3() throws IOException, ParserException {
Util.copyFromLocalToCluster(cluster, TEST_FILTER_COUNT3_INPUT, "testdata");
String query = "TESTDATA = load 'testdata' using PigStorage() as (timestamp:chararray, testid:chararray, userid: chararray, sessionid:chararray, value:long, flag:int);" +
"TESTDATA_FILTERED = filter TESTDATA by (timestamp gte '1230800400000' and timestamp lt '1230804000000' and value != 0);" +
"TESTDATA_GROUP = group TESTDATA_FILTERED by testid;" +
"TESTDATA_AGG = foreach TESTDATA_GROUP {" +
" A = filter TESTDATA_FILTERED by (userid eq sessionid);" +
" C = distinct A.userid;" +
" generate group as testid, COUNT(TESTDATA_FILTERED) as counttestdata, COUNT(C) as distcount, SUM(TESTDATA_FILTERED.flag) as total_flags;" +
" }" +
"TESTDATA_AGG_1 = group TESTDATA_AGG ALL;" +
"TESTDATA_AGG_2 = foreach TESTDATA_AGG_1 generate COUNT(TESTDATA_AGG);" ;
Util.registerMultiLineQuery(pigServer, query);
Iterator<Tuple> it = pigServer.openIterator("TESTDATA_AGG_2");
int i = 0;
while(it.hasNext()) {
Tuple actual = it.next();
assertEquals(20l, actual.get(0));
i++;
}
assertEquals(1, i);
Util.deleteFile(cluster, "testdata");
}
// test case where RelationToExprProject is present in the
// single inner plan of foreach - this will test that it does
// send an EOP eventually for each input of the foreach
@Test
public void testFilter1() throws IOException, ParserException {
String[] inputData = new String[] {"1\t1\t3","1\t2\t3", "2\t1\t3", "2\t1\t3", "3\t4\t4"};
Util.createInputFile(cluster, "test.txt", inputData);
String script = "test = load 'test.txt' as (col1: int, col2: int, col3: int);" +
"test2 = group test by col1;" +
"test3 = foreach test2 {" +
" filter_one = filter test by (col2==1);" +
" generate filter_one;};";
Util.registerMultiLineQuery(pigServer, script);
Iterator<Tuple> it = pigServer.openIterator("test3");
Map<Tuple, Integer> expected = new HashMap<Tuple, Integer>();
expected.put((Tuple) Util.getPigConstant("({(1,1,3)})"), 0);
expected.put((Tuple) Util.getPigConstant("({(2,1,3),(2,1,3)})"), 0);
Tuple t = TupleFactory.getInstance().newTuple();
t.append(BagFactory.getInstance().newDefaultBag());
expected.put(t, 0);
int i = 0;
while(it.hasNext()) {
if(i == 3) {
fail("Got more tuples than expected!");
}
t = it.next();
assertTrue(expected.containsKey(t));
int occurences = expected.get(t);
occurences++;
expected.put(t, occurences);
i++;
}
for (Integer occurences : expected.values()) {
assertEquals(new Integer(1), occurences);
}
Util.deleteFile(cluster, "test.txt");
}
// test case where RelationToExprProject is present in a
// different inner plan along with another plan to project the group
// in foreach - this will test that reset() correctly resets
// the state that empty bags need to be sent on EOP if no non-EOP
// input has been seen on a fresh input from foreach.
@Test
public void testFilter2() throws IOException, ParserException {
String[] inputData = new String[] {"1\t1\t3","1\t2\t3", "2\t1\t3", "2\t1\t3", "3\t4\t4"};
Util.createInputFile(cluster, "test.txt", inputData);
String script = "test = load 'test.txt' as (col1: int, col2: int, col3: int);" +
"test2 = group test by col1;" +
"test3 = foreach test2 {" +
" filter_one = filter test by (col2==1);" +
" generate group, filter_one;};";
Util.registerMultiLineQuery(pigServer, script);
Iterator<Tuple> it = pigServer.openIterator("test3");
Map<Tuple, Integer> expected = new HashMap<Tuple, Integer>();
expected.put((Tuple) Util.getPigConstant("(1,{(1,1,3)})"), 0);
expected.put((Tuple) Util.getPigConstant("(2,{(2,1,3),(2,1,3)})"), 0);
Tuple t = TupleFactory.getInstance().newTuple();
t.append(new Integer(3));
t.append(BagFactory.getInstance().newDefaultBag());
expected.put(t, 0);
int i = 0;
while(it.hasNext()) {
if(i == 3) {
fail("Got more tuples than expected!");
}
t = it.next();
assertTrue(expected.containsKey(t));
int occurences = expected.get(t);
occurences++;
expected.put(t, occurences);
i++;
}
for (Integer occurences : expected.values()) {
assertEquals(new Integer(1), occurences);
}
Util.deleteFile(cluster, "test.txt");
}
}