blob: 7b0a6f1101003e6fe72205c2c754ddf2f0514468 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.test;
import static org.apache.pig.builtin.mock.Storage.resetData;
import static org.apache.pig.builtin.mock.Storage.tuple;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.pig.PigServer;
import org.apache.pig.builtin.mock.Storage.Data;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DefaultTuple;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.parser.ParserException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
/**
* Test PORelationToExprProject which is a special project
* introduced to handle the following case:
* This project is Project(*) introduced after a relational operator
* to supply a bag as output (as an expression). This project is either
* providing the bag as input to a successor expression operator or is
* itself the leaf in a inner plan
* If the predecessor relational operator sends an EOP
* then send an empty bag first to signal "empty" output
* and then send an EOP
* NOTE: A Project(*) of return type BAG whose predecessor is
* from an outside plan (i.e. not in the same inner plan as the project)
* will NOT lead us here. So a query like:
* a = load 'baginp.txt' as (b:bag{t:tuple()}); b = foreach a generate $0; dump b;
* will go through a regular project (without the following flag)
*/
public class TestRelationToExprProject {
private PigServer pigServer;
private static final String TEST_FILTER_COUNT3_INPUT="test/org/apache/pig/test/data/TestRelationToExprProjectInput.txt";
/* (non-Javadoc)
* @see junit.framework.TestCase#setUp()
*/
@Before
public void setUp() throws Exception {
pigServer = new PigServer(Util.getLocalTestMode());
}
/* (non-Javadoc)
* @see junit.framework.TestCase#tearDown()
*/
@After
public void tearDown() throws Exception {
pigServer.shutdown();
}
// based on the script provided in the jira issue:PIG-514
// tests that when a filter inside a foreach filters away all tuples
// for a group, an empty bag is still provided to udfs whose
// input is the filter
@Test
public void testFilterCount1() throws IOException, ParserException {
Data data = resetData(pigServer);
data.set("foo", tuple(1,1,3), tuple(1,2,3), tuple(2,1,3), tuple(2,1,3));
String script = "test = load 'foo' using mock.Storage() as (col1: int, col2: int, col3: int);" +
"test2 = group test by col1;" +
"test3 = foreach test2 {" +
" filter_one = filter test by (col2==1);" +
" filter_notone = filter test by (col2!=1);" +
" generate group as col1, COUNT(filter_one) as cnt_one, COUNT(filter_notone) as cnt_notone;};";
pigServer.registerQuery(script);
Iterator<Tuple> it = pigServer.openIterator("test3");
Tuple[] expected = new DefaultTuple[2];
expected[0] = (Tuple) Util.getPigConstant("(1,1L,1L)");
expected[1] = (Tuple) Util.getPigConstant("(2,2L,0L)");
Object[] results = new Object[2];
int i = 0;
while(it.hasNext()) {
if(i == 2) {
fail("Got more tuples than expected!");
}
Tuple t = it.next();
if(t.get(0).equals(1)) {
// this is the first tuple
results[0] = t;
} else {
results[1] = t;
}
i++;
}
for (int j = 0; j < expected.length; j++) {
assertTrue(expected[j].equals(results[j]));
}
}
// based on jira PIG-710
// tests that when a filter inside a foreach filters away all tuples
// for a group, an empty bag is still provided to udfs whose
// input is the filter
@Test
public void testFilterCount2() throws IOException, ParserException {
Data data = resetData(pigServer);
data.set("foo",
tuple("a", "hello"),
tuple("a", "goodbye"),
tuple("b", "goodbye"),
tuple("c", "hello"),
tuple("c", "hello"),
tuple("c", "hello"),
tuple("d", "what")
);
String query = "A = load 'foo' using mock.Storage() as ( id:chararray, str:chararray );" +
"B = group A by ( id );" +
"Cfiltered = foreach B {" +
" D = filter A by (" +
" str matches 'hello'" +
" );" +
" matchedcount = COUNT(D);" +
" generate" +
" group," +
" matchedcount as matchedcount," +
" A.str;" +
" };";
pigServer.registerQuery(query);
Iterator<Tuple> it = pigServer.openIterator("Cfiltered");
Map<String, Tuple> expected = new HashMap<String, Tuple>();
expected.put("a", (Tuple) Util.getPigConstant("('a',1L,{('hello'),('goodbye')})"));
expected.put("b", (Tuple) Util.getPigConstant("('b',0L,{('goodbye')})"));
expected.put("c", (Tuple) Util.getPigConstant("('c',3L,{('hello'),('hello'),('hello')})"));
expected.put("d", (Tuple) Util.getPigConstant("('d',0L,{('what')})"));
int i = 0;
while(it.hasNext()) {
Tuple actual = it.next();
assertEquals(expected.get(actual.get(0)), actual);
i++;
}
assertEquals(4, i);
}
// based on jira PIG-739
// tests that when a filter inside a foreach filters away all tuples
// for a group, an empty bag is still provided to udfs whose
// input is the filter
@Test
public void testFilterCount3() throws IOException, ParserException {
String query = "TESTDATA = load '"+TEST_FILTER_COUNT3_INPUT+"' using PigStorage() as (timestamp:chararray, testid:chararray, userid: chararray, sessionid:chararray, value:long, flag:int);" +
"TESTDATA_FILTERED = filter TESTDATA by (timestamp gte '1230800400000' and timestamp lt '1230804000000' and value != 0);" +
"TESTDATA_GROUP = group TESTDATA_FILTERED by testid;" +
"TESTDATA_AGG = foreach TESTDATA_GROUP {" +
" A = filter TESTDATA_FILTERED by (userid eq sessionid);" +
" C = distinct A.userid;" +
" generate group as testid, COUNT(TESTDATA_FILTERED) as counttestdata, COUNT(C) as distcount, SUM(TESTDATA_FILTERED.flag) as total_flags;" +
" }" +
"TESTDATA_AGG_1 = group TESTDATA_AGG ALL;" +
"TESTDATA_AGG_2 = foreach TESTDATA_AGG_1 generate COUNT(TESTDATA_AGG);" ;
pigServer.registerQuery(query);
Iterator<Tuple> it = pigServer.openIterator("TESTDATA_AGG_2");
int i = 0;
while(it.hasNext()) {
Tuple actual = it.next();
assertEquals(20l, actual.get(0));
i++;
}
assertEquals(1, i);
}
// test case where RelationToExprProject is present in the
// single inner plan of foreach - this will test that it does
// send an EOP eventually for each input of the foreach
@Test
public void testFilter1() throws IOException, ParserException {
Data data = resetData(pigServer);
data.set("foo", tuple(1,1,3), tuple(1,2,3), tuple(2,1,3), tuple(2,1,3), tuple(3,4,4));
String script = "test = load 'foo' using mock.Storage() as (col1: int, col2: int, col3: int);" +
"test2 = group test by col1;" +
"test3 = foreach test2 {" +
" filter_one = filter test by (col2==1);" +
" generate filter_one;};";
pigServer.registerQuery(script);
Iterator<Tuple> it = pigServer.openIterator("test3");
Map<Tuple, Integer> expected = new HashMap<Tuple, Integer>();
expected.put((Tuple) Util.getPigConstant("({(1,1,3)})"), 0);
expected.put((Tuple) Util.getPigConstant("({(2,1,3),(2,1,3)})"), 0);
Tuple t = TupleFactory.getInstance().newTuple();
t.append(BagFactory.getInstance().newDefaultBag());
expected.put(t, 0);
int i = 0;
while(it.hasNext()) {
if(i == 3) {
fail("Got more tuples than expected!");
}
t = it.next();
assertTrue(expected.containsKey(t));
int occurences = expected.get(t);
occurences++;
expected.put(t, occurences);
i++;
}
for (Integer occurences : expected.values()) {
assertEquals(new Integer(1), occurences);
}
}
// test case where RelationToExprProject is present in a
// different inner plan along with another plan to project the group
// in foreach - this will test that reset() correctly resets
// the state that empty bags need to be sent on EOP if no non-EOP
// input has been seen on a fresh input from foreach.
@Test
public void testFilter2() throws IOException, ParserException {
Data data = resetData(pigServer);
data.set("foo", tuple(1,1,3), tuple(1,2,3), tuple(2,1,3), tuple(2,1,3), tuple(3,4,4));
String script = "test = load 'foo' using mock.Storage() as (col1: int, col2: int, col3: int);" +
"test2 = group test by col1;" +
"test3 = foreach test2 {" +
" filter_one = filter test by (col2==1);" +
" generate group, filter_one;};";
Util.registerMultiLineQuery(pigServer, script);
Iterator<Tuple> it = pigServer.openIterator("test3");
Map<Tuple, Integer> expected = new HashMap<Tuple, Integer>();
expected.put((Tuple) Util.getPigConstant("(1,{(1,1,3)})"), 0);
expected.put((Tuple) Util.getPigConstant("(2,{(2,1,3),(2,1,3)})"), 0);
Tuple t = TupleFactory.getInstance().newTuple();
t.append(new Integer(3));
t.append(BagFactory.getInstance().newDefaultBag());
expected.put(t, 0);
int i = 0;
while(it.hasNext()) {
if(i == 3) {
fail("Got more tuples than expected!");
}
t = it.next();
assertTrue(expected.containsKey(t));
int occurences = expected.get(t);
occurences++;
expected.put(t, occurences);
i++;
}
for (Integer occurences : expected.values()) {
assertEquals(Integer.valueOf(1), occurences);
}
}
}