| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.pig.test; |
| |
| import static org.apache.pig.ExecType.MAPREDUCE; |
| |
| import java.io.IOException; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.Map; |
| |
| import org.apache.pig.PigServer; |
| import org.apache.pig.data.BagFactory; |
| import org.apache.pig.data.DefaultTuple; |
| import org.apache.pig.data.Tuple; |
| import org.apache.pig.data.TupleFactory; |
| import org.apache.pig.parser.ParserException; |
| import org.junit.After; |
| import org.junit.AfterClass; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.junit.runner.RunWith; |
| import org.junit.runners.JUnit4; |
| |
| import junit.framework.TestCase; |
| |
| /** |
| * Test PORelationToExprProject which is a special project |
| * introduced to handle the following case: |
| * This project is Project(*) introduced after a relational operator |
| * to supply a bag as output (as an expression). This project is either |
| * providing the bag as input to a successor expression operator or is |
| * itself the leaf in a inner plan |
| * If the predecessor relational operator sends an EOP |
| * then send an empty bag first to signal "empty" output |
| * and then send an EOP |
| |
| * NOTE: A Project(*) of return type BAG whose predecessor is |
| * from an outside plan (i.e. not in the same inner plan as the project) |
| * will NOT lead us here. So a query like: |
| * a = load 'baginp.txt' as (b:bag{t:tuple()}); b = foreach a generate $0; dump b; |
| * will go through a regular project (without the following flag) |
| */ |
| @RunWith(JUnit4.class) |
| public class TestRelationToExprProject extends TestCase { |
| |
| private static MiniCluster cluster = MiniCluster.buildCluster(); |
| private PigServer pigServer; |
| private static final String TEST_FILTER_COUNT3_INPUT="test/org/apache/pig/test/data/TestRelationToExprProjectInput.txt"; |
| |
| /* (non-Javadoc) |
| * @see junit.framework.TestCase#setUp() |
| */ |
| @Before |
| public void setUp() throws Exception { |
| pigServer = new PigServer(MAPREDUCE, cluster.getProperties()); |
| } |
| |
| /* (non-Javadoc) |
| * @see junit.framework.TestCase#tearDown() |
| */ |
| @After |
| public void tearDown() throws Exception { |
| pigServer.shutdown(); |
| } |
| |
| @AfterClass |
| public static void oneTimeTearDown() throws Exception { |
| cluster.shutDown(); |
| } |
| |
| // based on the script provided in the jira issue:PIG-514 |
| // tests that when a filter inside a foreach filters away all tuples |
| // for a group, an empty bag is still provided to udfs whose |
| // input is the filter |
| @Test |
| public void testFilterCount1() throws IOException, ParserException { |
| |
| String[] inputData = new String[] {"1\t1\t3","1\t2\t3", "2\t1\t3", "2\t1\t3"}; |
| Util.createInputFile(cluster, "test.txt", inputData); |
| String script = "test = load 'test.txt' as (col1: int, col2: int, col3: int);" + |
| "test2 = group test by col1;" + |
| "test3 = foreach test2 {" + |
| " filter_one = filter test by (col2==1);" + |
| " filter_notone = filter test by (col2!=1);" + |
| " generate group as col1, COUNT(filter_one) as cnt_one, COUNT(filter_notone) as cnt_notone;};"; |
| Util.registerMultiLineQuery(pigServer, script); |
| Iterator<Tuple> it = pigServer.openIterator("test3"); |
| Tuple[] expected = new DefaultTuple[2]; |
| expected[0] = (Tuple) Util.getPigConstant("(1,1L,1L)"); |
| expected[1] = (Tuple) Util.getPigConstant("(2,2L,0L)"); |
| Object[] results = new Object[2]; |
| int i = 0; |
| while(it.hasNext()) { |
| if(i == 2) { |
| fail("Got more tuples than expected!"); |
| } |
| Tuple t = it.next(); |
| if(t.get(0).equals(1)) { |
| // this is the first tuple |
| results[0] = t; |
| } else { |
| results[1] = t; |
| } |
| i++; |
| } |
| for (int j = 0; j < expected.length; j++) { |
| assertTrue(expected[j].equals(results[j])); |
| } |
| Util.deleteFile(cluster, "test.txt"); |
| } |
| |
| // based on jira PIG-710 |
| // tests that when a filter inside a foreach filters away all tuples |
| // for a group, an empty bag is still provided to udfs whose |
| // input is the filter |
| @Test |
| public void testFilterCount2() throws IOException, ParserException { |
| Util.createInputFile(cluster, "filterbug.data", new String[] { |
| "a\thello" , |
| "a\tgoodbye" , |
| "b\tgoodbye" , |
| "c\thello" , |
| "c\thello" , |
| "c\thello" , |
| "d\twhat" |
| }); |
| String query = "A = load 'filterbug.data' using PigStorage() as ( id:chararray, str:chararray );" + |
| "B = group A by ( id );" + |
| "Cfiltered = foreach B {" + |
| " D = filter A by (" + |
| " str matches 'hello'" + |
| " );" + |
| " matchedcount = COUNT(D);" + |
| " generate" + |
| " group," + |
| " matchedcount as matchedcount," + |
| " A.str;" + |
| " };"; |
| Util.registerMultiLineQuery(pigServer, query); |
| Iterator<Tuple> it = pigServer.openIterator("Cfiltered"); |
| Map<String, Tuple> expected = new HashMap<String, Tuple>(); |
| expected.put("a", (Tuple) Util.getPigConstant("('a',1L,{('hello'),('goodbye')})")); |
| expected.put("b", (Tuple) Util.getPigConstant("('b',0L,{('goodbye')})")); |
| expected.put("c", (Tuple) Util.getPigConstant("('c',3L,{('hello'),('hello'),('hello')})")); |
| expected.put("d", (Tuple) Util.getPigConstant("('d',0L,{('what')})")); |
| int i = 0; |
| while(it.hasNext()) { |
| Tuple actual = it.next(); |
| assertEquals(expected.get(actual.get(0)), actual); |
| i++; |
| } |
| assertEquals(4, i); |
| Util.deleteFile(cluster, "filterbug.data"); |
| } |
| |
| // based on jira PIG-739 |
| // tests that when a filter inside a foreach filters away all tuples |
| // for a group, an empty bag is still provided to udfs whose |
| // input is the filter |
| @Test |
| public void testFilterCount3() throws IOException, ParserException { |
| Util.copyFromLocalToCluster(cluster, TEST_FILTER_COUNT3_INPUT, "testdata"); |
| String query = "TESTDATA = load 'testdata' using PigStorage() as (timestamp:chararray, testid:chararray, userid: chararray, sessionid:chararray, value:long, flag:int);" + |
| "TESTDATA_FILTERED = filter TESTDATA by (timestamp gte '1230800400000' and timestamp lt '1230804000000' and value != 0);" + |
| "TESTDATA_GROUP = group TESTDATA_FILTERED by testid;" + |
| "TESTDATA_AGG = foreach TESTDATA_GROUP {" + |
| " A = filter TESTDATA_FILTERED by (userid eq sessionid);" + |
| " C = distinct A.userid;" + |
| " generate group as testid, COUNT(TESTDATA_FILTERED) as counttestdata, COUNT(C) as distcount, SUM(TESTDATA_FILTERED.flag) as total_flags;" + |
| " }" + |
| "TESTDATA_AGG_1 = group TESTDATA_AGG ALL;" + |
| "TESTDATA_AGG_2 = foreach TESTDATA_AGG_1 generate COUNT(TESTDATA_AGG);" ; |
| Util.registerMultiLineQuery(pigServer, query); |
| Iterator<Tuple> it = pigServer.openIterator("TESTDATA_AGG_2"); |
| |
| int i = 0; |
| while(it.hasNext()) { |
| Tuple actual = it.next(); |
| assertEquals(20l, actual.get(0)); |
| i++; |
| } |
| assertEquals(1, i); |
| Util.deleteFile(cluster, "testdata"); |
| } |
| |
| // test case where RelationToExprProject is present in the |
| // single inner plan of foreach - this will test that it does |
| // send an EOP eventually for each input of the foreach |
| @Test |
| public void testFilter1() throws IOException, ParserException { |
| |
| String[] inputData = new String[] {"1\t1\t3","1\t2\t3", "2\t1\t3", "2\t1\t3", "3\t4\t4"}; |
| Util.createInputFile(cluster, "test.txt", inputData); |
| String script = "test = load 'test.txt' as (col1: int, col2: int, col3: int);" + |
| "test2 = group test by col1;" + |
| "test3 = foreach test2 {" + |
| " filter_one = filter test by (col2==1);" + |
| " generate filter_one;};"; |
| Util.registerMultiLineQuery(pigServer, script); |
| Iterator<Tuple> it = pigServer.openIterator("test3"); |
| Map<Tuple, Integer> expected = new HashMap<Tuple, Integer>(); |
| expected.put((Tuple) Util.getPigConstant("({(1,1,3)})"), 0); |
| expected.put((Tuple) Util.getPigConstant("({(2,1,3),(2,1,3)})"), 0); |
| Tuple t = TupleFactory.getInstance().newTuple(); |
| t.append(BagFactory.getInstance().newDefaultBag()); |
| expected.put(t, 0); |
| int i = 0; |
| while(it.hasNext()) { |
| if(i == 3) { |
| fail("Got more tuples than expected!"); |
| } |
| t = it.next(); |
| assertTrue(expected.containsKey(t)); |
| int occurences = expected.get(t); |
| occurences++; |
| expected.put(t, occurences); |
| i++; |
| } |
| for (Integer occurences : expected.values()) { |
| assertEquals(new Integer(1), occurences); |
| } |
| Util.deleteFile(cluster, "test.txt"); |
| } |
| |
| // test case where RelationToExprProject is present in a |
| // different inner plan along with another plan to project the group |
| // in foreach - this will test that reset() correctly resets |
| // the state that empty bags need to be sent on EOP if no non-EOP |
| // input has been seen on a fresh input from foreach. |
| @Test |
| public void testFilter2() throws IOException, ParserException { |
| |
| String[] inputData = new String[] {"1\t1\t3","1\t2\t3", "2\t1\t3", "2\t1\t3", "3\t4\t4"}; |
| Util.createInputFile(cluster, "test.txt", inputData); |
| String script = "test = load 'test.txt' as (col1: int, col2: int, col3: int);" + |
| "test2 = group test by col1;" + |
| "test3 = foreach test2 {" + |
| " filter_one = filter test by (col2==1);" + |
| " generate group, filter_one;};"; |
| Util.registerMultiLineQuery(pigServer, script); |
| Iterator<Tuple> it = pigServer.openIterator("test3"); |
| Map<Tuple, Integer> expected = new HashMap<Tuple, Integer>(); |
| expected.put((Tuple) Util.getPigConstant("(1,{(1,1,3)})"), 0); |
| expected.put((Tuple) Util.getPigConstant("(2,{(2,1,3),(2,1,3)})"), 0); |
| Tuple t = TupleFactory.getInstance().newTuple(); |
| t.append(new Integer(3)); |
| t.append(BagFactory.getInstance().newDefaultBag()); |
| expected.put(t, 0); |
| int i = 0; |
| while(it.hasNext()) { |
| if(i == 3) { |
| fail("Got more tuples than expected!"); |
| } |
| t = it.next(); |
| assertTrue(expected.containsKey(t)); |
| int occurences = expected.get(t); |
| occurences++; |
| expected.put(t, occurences); |
| i++; |
| } |
| for (Integer occurences : expected.values()) { |
| assertEquals(new Integer(1), occurences); |
| } |
| Util.deleteFile(cluster, "test.txt"); |
| } |
| |
| |
| } |