| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.pig.test; |
| |
| import static org.apache.pig.builtin.mock.Storage.tuple; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertNull; |
| import static org.junit.Assert.fail; |
| |
| import java.io.File; |
| import java.io.FileWriter; |
| import java.io.IOException; |
| import java.io.PrintWriter; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Properties; |
| |
| import org.apache.pig.PigConfiguration; |
| import org.apache.pig.PigServer; |
| import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; |
| import org.apache.pig.data.Tuple; |
| import org.apache.pig.parser.ParserException; |
| import org.junit.After; |
| import org.junit.AfterClass; |
| import org.junit.Before; |
| import org.junit.BeforeClass; |
| import org.junit.Test; |
| |
| public class TestAccumulator { |
| private static final String INPUT_FILE1 = "AccumulatorInput1.txt"; |
| private static final String INPUT_FILE2 = "AccumulatorInput2.txt"; |
| private static final String INPUT_FILE3 = "AccumulatorInput3.txt"; |
| private static final String INPUT_FILE4 = "AccumulatorInput4.txt"; |
| private static final String INPUT_DIR = Util.getTestDirectory(TestAccumulator.class); |
| |
| private static PigServer pigServer; |
| private static Properties properties; |
| private static MiniGenericCluster cluster; |
| |
| @BeforeClass |
| public static void oneTimeSetUp() throws Exception { |
| cluster = MiniGenericCluster.buildCluster(); |
| properties = cluster.getProperties(); |
| properties.setProperty("pig.accumulative.batchsize", "2"); |
| properties.setProperty("pig.exec.nocombiner", "true"); |
| // Reducing the number of retry attempts to speed up test completion |
| properties.setProperty(MRConfiguration.MAP_MAX_ATTEMPTS,"1"); |
| properties.setProperty(MRConfiguration.REDUCE_MAX_ATTEMPTS,"1"); |
| createFiles(); |
| } |
| |
| @AfterClass |
| public static void oneTimeTearDown() throws Exception { |
| deleteFiles(); |
| cluster.shutDown(); |
| } |
| |
| @Before |
| public void setUp() throws Exception { |
| Util.resetStateForExecModeSwitch(); |
| // Drop stale configuration from previous test run |
| properties.remove(PigConfiguration.PIG_OPT_ACCUMULATOR); |
| pigServer = new PigServer(cluster.getExecType(), properties); |
| } |
| |
| @After |
| public void tearDown() throws Exception { |
| pigServer.shutdown(); |
| pigServer = null; |
| } |
| |
| private static void createFiles() throws IOException { |
| new File(INPUT_DIR).mkdirs(); |
| |
| PrintWriter w = new PrintWriter(new FileWriter(INPUT_DIR + "/" + INPUT_FILE1)); |
| |
| w.println("100\tapple"); |
| w.println("200\torange"); |
| w.println("300\tstrawberry"); |
| w.println("300\tpear"); |
| w.println("100\tapple"); |
| w.println("300\tpear"); |
| w.println("400\tapple"); |
| w.close(); |
| |
| Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE1, INPUT_FILE1); |
| |
| w = new PrintWriter(new FileWriter(INPUT_DIR + "/" + INPUT_FILE2)); |
| |
| w.println("100\t"); |
| w.println("100\t"); |
| w.println("200\t"); |
| w.println("200\t"); |
| w.println("300\tstrawberry"); |
| w.close(); |
| |
| Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE2, INPUT_FILE2); |
| |
| w = new PrintWriter(new FileWriter(INPUT_DIR + "/" + INPUT_FILE3)); |
| |
| w.println("100\t1.0"); |
| w.println("100\t2.0"); |
| w.println("200\t1.1"); |
| w.println("200\t2.1"); |
| w.println("100\t3.0"); |
| w.println("100\t4.0"); |
| w.println("200\t3.1"); |
| w.println("100\t5.0"); |
| w.println("300\t3.3"); |
| w.println("400\t"); |
| w.println("400\t"); |
| w.close(); |
| |
| Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE3, INPUT_FILE3); |
| |
| w = new PrintWriter(new FileWriter(INPUT_DIR + "/" + INPUT_FILE4)); |
| |
| w.println("100\thttp://ibm.com,ibm"); |
| w.println("100\thttp://ibm.com,ibm"); |
| w.println("200\thttp://yahoo.com,yahoo"); |
| w.println("300\thttp://sun.com,sun"); |
| w.close(); |
| |
| Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE4, INPUT_FILE4); |
| } |
| |
| private static void deleteFiles() { |
| Util.deleteDirectory(new File(INPUT_DIR)); |
| } |
| |
| @Test |
| public void testAccumBasic() throws IOException{ |
| // test group by |
| pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, fruit);"); |
| pigServer.registerQuery("B = group A by id;"); |
| pigServer.registerQuery("C = foreach B generate group, org.apache.pig.test.utils.AccumulatorBagCount(A);"); |
| |
| HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>(); |
| expected.put(100, 2); |
| expected.put(200, 1); |
| expected.put(300, 3); |
| expected.put(400, 1); |
| |
| Iterator<Tuple> iter = pigServer.openIterator("C"); |
| |
| while(iter.hasNext()) { |
| Tuple t = iter.next(); |
| assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1)); |
| } |
| |
| pigServer.registerQuery("B = group A by id;"); |
| pigServer.registerQuery("C = foreach B generate group, " + |
| "org.apache.pig.test.utils.AccumulatorBagCount(A), org.apache.pig.test.utils.BagCount(A);"); |
| |
| try{ |
| iter = pigServer.openIterator("C"); |
| |
| while(iter.hasNext()) { |
| Tuple t = iter.next(); |
| assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1)); |
| } |
| fail("accumulator should not be called."); |
| }catch(IOException e) { |
| // should throw exception from AccumulatorBagCount. |
| } |
| |
| // test cogroup |
| pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, fruit);"); |
| pigServer.registerQuery("B = load '" + INPUT_FILE1 + "' as (id:int, fruit);"); |
| pigServer.registerQuery("C = cogroup A by id, B by id;"); |
| pigServer.registerQuery("D = foreach C generate group, " + |
| "org.apache.pig.test.utils.AccumulatorBagCount(A), org.apache.pig.test.utils.AccumulatorBagCount(B);"); |
| |
| HashMap<Integer, String> expected2 = new HashMap<Integer, String>(); |
| expected2.put(100, "2,2"); |
| expected2.put(200, "1,1"); |
| expected2.put(300, "3,3"); |
| expected2.put(400, "1,1"); |
| |
| iter = pigServer.openIterator("D"); |
| |
| while(iter.hasNext()) { |
| Tuple t = iter.next(); |
| assertEquals(expected2.get((Integer)t.get(0)), t.get(1).toString()+","+t.get(2).toString()); |
| } |
| } |
| |
| @Test |
| public void testAccumWithNegative() throws IOException{ |
| pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, fruit);"); |
| pigServer.registerQuery("B = group A by id;"); |
| pigServer.registerQuery("C = foreach B generate group, -org.apache.pig.test.utils.AccumulatorBagCount(A);"); |
| |
| HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>(); |
| expected.put(100, -2); |
| expected.put(200, -1); |
| expected.put(300, -3); |
| expected.put(400, -1); |
| |
| |
| Iterator<Tuple> iter = pigServer.openIterator("C"); |
| |
| while(iter.hasNext()) { |
| Tuple t = iter.next(); |
| assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1)); |
| } |
| } |
| |
| @Test |
| public void testAccumWithAdd() throws IOException{ |
| pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, fruit);"); |
| pigServer.registerQuery("B = group A by id;"); |
| pigServer.registerQuery("C = foreach B generate group, org.apache.pig.test.utils.AccumulatorBagCount(A)+1.0;"); |
| |
| { |
| HashMap<Integer, Double> expected = new HashMap<Integer, Double>(); |
| expected.put(100, 3.0); |
| expected.put(200, 2.0); |
| expected.put(300, 4.0); |
| expected.put(400, 2.0); |
| |
| |
| Iterator<Tuple> iter = pigServer.openIterator("C"); |
| |
| while(iter.hasNext()) { |
| Tuple t = iter.next(); |
| assertEquals(expected.get((Integer)t.get(0)), (Double)t.get(1)); |
| } |
| } |
| |
| { |
| pigServer.registerQuery("C = foreach B generate group, " + |
| "org.apache.pig.test.utils.AccumulatorBagCount(A)+org.apache.pig.test.utils.AccumulatorBagCount(A);"); |
| |
| HashMap<Integer, Integer>expected = new HashMap<Integer, Integer>(); |
| expected.put(100, 4); |
| expected.put(200, 2); |
| expected.put(300, 6); |
| expected.put(400, 2); |
| |
| |
| Iterator<Tuple> iter = pigServer.openIterator("C"); |
| |
| while(iter.hasNext()) { |
| Tuple t = iter.next(); |
| assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1)); |
| } |
| } |
| } |
| |
| @Test |
| public void testAccumWithMinus() throws IOException{ |
| pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, fruit);"); |
| pigServer.registerQuery("B = group A by id;"); |
| pigServer.registerQuery("C = foreach B generate group, " + |
| " org.apache.pig.test.utils.AccumulatorBagCount(A)*3.0-org.apache.pig.test.utils.AccumulatorBagCount(A);"); |
| |
| HashMap<Integer, Double> expected = new HashMap<Integer, Double>(); |
| expected.put(100, 4.0); |
| expected.put(200, 2.0); |
| expected.put(300, 6.0); |
| expected.put(400, 2.0); |
| |
| |
| Iterator<Tuple> iter = pigServer.openIterator("C"); |
| |
| while(iter.hasNext()) { |
| Tuple t = iter.next(); |
| assertEquals(expected.get((Integer)t.get(0)), (Double)t.get(1)); |
| } |
| } |
| |
| @Test |
| public void testAccumWithMod() throws IOException{ |
| pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, fruit);"); |
| pigServer.registerQuery("B = group A by id;"); |
| pigServer.registerQuery("C = foreach B generate group, " + |
| "org.apache.pig.test.utils.AccumulatorBagCount(A) % 2;"); |
| |
| HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>(); |
| expected.put(100, 0); |
| expected.put(200, 1); |
| expected.put(300, 1); |
| expected.put(400, 1); |
| |
| |
| Iterator<Tuple> iter = pigServer.openIterator("C"); |
| |
| while(iter.hasNext()) { |
| Tuple t = iter.next(); |
| assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1)); |
| } |
| } |
| |
| @Test |
| public void testAccumWithDivide() throws IOException{ |
| pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, fruit);"); |
| pigServer.registerQuery("B = group A by id;"); |
| pigServer.registerQuery("C = foreach B generate group, " + |
| "org.apache.pig.test.utils.AccumulatorBagCount(A)/2;"); |
| |
| HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>(); |
| expected.put(100, 1); |
| expected.put(200, 0); |
| expected.put(300, 1); |
| expected.put(400, 0); |
| |
| |
| Iterator<Tuple> iter = pigServer.openIterator("C"); |
| |
| while(iter.hasNext()) { |
| Tuple t = iter.next(); |
| assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1)); |
| } |
| } |
| |
| @Test |
| public void testAccumWithAnd() throws IOException{ |
| pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, fruit);"); |
| pigServer.registerQuery("B = group A by id;"); |
| pigServer.registerQuery("C = foreach B generate group, " + |
| "((org.apache.pig.test.utils.AccumulatorBagCount(A)>1 and " + |
| "org.apache.pig.test.utils.AccumulatorBagCount(A)<3)?0:1);"); |
| |
| HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>(); |
| expected.put(100, 0); |
| expected.put(200, 1); |
| expected.put(300, 1); |
| expected.put(400, 1); |
| |
| |
| Iterator<Tuple> iter = pigServer.openIterator("C"); |
| |
| while(iter.hasNext()) { |
| Tuple t = iter.next(); |
| assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1)); |
| } |
| } |
| |
| @Test |
| public void testAccumWithOr() throws IOException{ |
| pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, fruit);"); |
| pigServer.registerQuery("B = group A by id;"); |
| pigServer.registerQuery("C = foreach B generate group, " + |
| "((org.apache.pig.test.utils.AccumulatorBagCount(A)>3 or " + |
| "org.apache.pig.test.utils.AccumulatorBagCount(A)<2)?0:1);"); |
| |
| HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>(); |
| expected.put(100, 1); |
| expected.put(200, 0); |
| expected.put(300, 1); |
| expected.put(400, 0); |
| |
| |
| Iterator<Tuple> iter = pigServer.openIterator("C"); |
| |
| while(iter.hasNext()) { |
| Tuple t = iter.next(); |
| assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1)); |
| } |
| } |
| |
| @Test |
| public void testAccumWithRegexp() throws IOException{ |
| pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, fruit);"); |
| pigServer.registerQuery("B = group A by id;"); |
| pigServer.registerQuery("C = foreach B generate group, " + |
| "(((chararray)org.apache.pig.test.utils.AccumulatorBagCount(A)) matches '1*' ?0:1);"); |
| |
| HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>(); |
| expected.put(100, 1); |
| expected.put(200, 0); |
| expected.put(300, 1); |
| expected.put(400, 0); |
| |
| |
| Iterator<Tuple> iter = pigServer.openIterator("C"); |
| |
| while(iter.hasNext()) { |
| Tuple t = iter.next(); |
| assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1)); |
| } |
| } |
| |
| @Test |
| public void testAccumWithIsNull() throws IOException{ |
| pigServer.registerQuery("A = load '" + INPUT_FILE2 + "' as (id:int, fruit);"); |
| pigServer.registerQuery("B = group A by id;"); |
| pigServer.registerQuery("C = foreach B generate group, " + |
| "((chararray)org.apache.pig.test.utils.AccumulativeSumBag(A) is null?0:1);"); |
| |
| HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>(); |
| expected.put(100, 0); |
| expected.put(200, 0); |
| expected.put(300, 1); |
| |
| Iterator<Tuple> iter = pigServer.openIterator("C"); |
| |
| while(iter.hasNext()) { |
| Tuple t = iter.next(); |
| assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1)); |
| } |
| } |
| |
| @SuppressWarnings("unchecked") |
| @Test |
| public void testAccumWithIsEmpty() throws IOException{ |
| pigServer.getPigContext().getProperties().setProperty("pig.accumulative.batchsize", "1"); |
| pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, fruit);"); |
| pigServer.registerQuery("B = load '" + INPUT_FILE2 + "' as (id:int, fruit);"); |
| pigServer.registerQuery("C = cogroup A by id outer, B by id outer;"); |
| pigServer.registerQuery("D = foreach C generate group," + |
| "(int)(IsEmpty(A) ? 0 : SUM(A.id)) as suma," + |
| "(int)(IsEmpty(B) ? 0 : SUM(B.id)) as sumb;"); |
| |
| List<Tuple> expected = new ArrayList<Tuple>(); |
| expected.add(tuple(100, 200, 200)); |
| expected.add(tuple(200, 200, 400)); |
| expected.add(tuple(300, 900, 300)); |
| expected.add(tuple(400, 400, 0)); |
| |
| Iterator<Tuple> iter = pigServer.openIterator("D"); |
| List<Tuple> actual = new ArrayList<Tuple>(); |
| |
| while(iter.hasNext()) { |
| actual.add(iter.next()); |
| } |
| Collections.sort(actual); |
| assertEquals(expected, actual); |
| } |
| |
| @Test |
| public void testAccumWithDistinct() throws IOException{ |
| pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, f);"); |
| pigServer.registerQuery("B = group A by id;"); |
| pigServer.registerQuery("C = foreach B { D = distinct A;" + |
| "generate group, org.apache.pig.test.utils.AccumulatorBagCount(D)+1;};"); |
| |
| HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>(); |
| expected.put(100, 2); |
| expected.put(200, 2); |
| expected.put(300, 3); |
| expected.put(400, 2); |
| |
| Iterator<Tuple> iter = pigServer.openIterator("C"); |
| |
| int count = 0; |
| while(iter.hasNext()) { |
| count++; |
| Tuple t = iter.next(); |
| assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1)); |
| } |
| assertEquals(4, count); |
| } |
| |
| @Test |
| public void testAccumWithSort() throws IOException{ |
| pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, f);"); |
| pigServer.registerQuery("B = foreach A generate id, f, id as t;"); |
| pigServer.registerQuery("C = group B by id;"); |
| pigServer.registerQuery("D = foreach C { E = order B by f; F = E.f;" + |
| "generate group, org.apache.pig.test.utils.AccumulativeSumBag(F);};"); |
| |
| HashMap<Integer, String> expected = new HashMap<Integer, String>(); |
| expected.put(100, "(apple)(apple)"); |
| expected.put(200, "(orange)"); |
| expected.put(300, "(pear)(pear)(strawberry)"); |
| expected.put(400, "(apple)"); |
| |
| Iterator<Tuple> iter = pigServer.openIterator("D"); |
| |
| int count = 0; |
| while(iter.hasNext()) { |
| count++; |
| Tuple t = iter.next(); |
| assertEquals(expected.get((Integer)t.get(0)), (String)t.get(1)); |
| } |
| assertEquals(4, count); |
| } |
| |
| @Test |
| public void testAccumWithBuiltinAvg() throws IOException { |
| HashMap<Integer, Double> expected = new HashMap<Integer, Double>(); |
| expected.put(100, 3.0); |
| expected.put(200, 2.1); |
| expected.put(300, 3.3); |
| expected.put(400, null); |
| // Test all the averages for correct behaviour with null values |
| String[] types = { "double", "float", "int", "long" }; |
| for (int i = 0; i < types.length; i++) { |
| if (i > 1) { // adjust decimal error for non real types |
| expected.put(200, 2.0); |
| expected.put(300, 3.0); |
| } |
| pigServer.registerQuery("A = load '" + INPUT_FILE3 + "' as (id:int, v:" |
| + types[i] + ");"); |
| pigServer.registerQuery("C = group A by id;"); |
| pigServer.registerQuery("D = foreach C generate group, AVG(A.v);"); |
| Iterator<Tuple> iter = pigServer.openIterator("D"); |
| |
| while (iter.hasNext()) { |
| Tuple t = iter.next(); |
| Double v = expected.get((Integer) t.get(0)); |
| if (v != null) { |
| assertEquals(v.doubleValue(), ((Number) t.get(1)).doubleValue(), |
| 0.0001); |
| } else { |
| assertNull(t.get(1)); |
| } |
| } |
| } |
| } |
| |
| @Test |
| public void testAccumWithBuiltin() throws IOException{ |
| pigServer.registerQuery("A = load '" + INPUT_FILE3 + "' as (id:int, v:double);"); |
| pigServer.registerQuery("C = group A by id;"); |
| // moving AVG accumulator test to separate test case |
| pigServer.registerQuery("D = foreach C generate group, SUM(A.v), COUNT(A.v), MIN(A.v), MAX(A.v);"); |
| |
| HashMap<Integer, Double[]> expected = new HashMap<Integer, Double[]>(); |
| expected.put(100, new Double[]{15.0, 5.0, 1.0, 5.0}); |
| expected.put(200, new Double[]{6.3, 3.0, 1.1, 3.1}); |
| expected.put(300, new Double[]{3.3, 1.0, 3.3, 3.3}); |
| expected.put(400, new Double[] { null, 0.0, null, null }); |
| |
| Iterator<Tuple> iter = pigServer.openIterator("D"); |
| |
| while(iter.hasNext()) { |
| Tuple t = iter.next(); |
| Double[] v = expected.get((Integer)t.get(0)); |
| for(int i=0; i<v.length; i++) { |
| if (v[i] != null) { |
| assertEquals(v[i].doubleValue(), ((Number) t.get(i + 1)) |
| .doubleValue(), 0.0001); |
| } else { |
| assertNull(t.get(i + 1)); |
| } |
| } |
| } |
| } |
| |
| // Pig 4365 |
| @Test |
| public void testAccumWithTOP() throws IOException{ |
| pigServer.registerQuery("A = load '" + INPUT_FILE3 + "' as (id:int, v:double);"); |
| pigServer.registerQuery("B = group A all;"); |
| pigServer.registerQuery("D = foreach B { C = TOP(5, 0, A); generate flatten(C); }"); |
| |
| Iterator<Tuple> iter = pigServer.openIterator("D"); |
| |
| List<Tuple> expected = Util.getTuplesFromConstantTupleStrings( |
| new String[] {"(200,1.1)", "(200,2.1)", "(300,3.3)", "(400,null)", "(400,null)" }); |
| |
| Util.checkQueryOutputsAfterSort(iter, expected); |
| } |
| |
| @Test |
| public void testAccumWithMultiBuiltin() throws IOException{ |
| pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, c:chararray);"); |
| pigServer.registerQuery("C = group A by 1;"); |
| pigServer.registerQuery("D = foreach C generate SUM(A.id), 1+SUM(A.id)+SUM(A.id);"); |
| |
| Iterator<Tuple> iter = pigServer.openIterator("D"); |
| |
| while(iter.hasNext()) { |
| Tuple t = iter.next(); |
| t.get(0).toString().equals("1700"); |
| t.get(1).toString().equals("3401"); |
| } |
| } |
| |
| // Pig 1105 |
| @Test |
| public void testAccumCountStar() throws IOException{ |
| pigServer.registerQuery("A = load '" + INPUT_FILE3 + "' as (id:int, v:double);"); |
| pigServer.registerQuery("C = group A by id;"); |
| pigServer.registerQuery("D = foreach C generate group, COUNT_STAR(A.id);"); |
| |
| pigServer.openIterator("D"); |
| } |
| |
| /** |
| * see PIG-1963. |
| * If there is a POSort or PODistinct still remaining in the plan |
| * (after secondary sort optimization), accumulative mode can't |
| * be used as they are blocking operators |
| * @throws IOException |
| */ |
| @Test |
| public void testAccumulatorOffOnSort() throws IOException{ |
| pigServer.registerQuery("A = load '" + INPUT_FILE2 + "' as (id:int, fruit);"); |
| pigServer.registerQuery("B = group A by id;"); |
| //one POSort will remain because secondary sort can be used only for one of them |
| pigServer.registerQuery("C = foreach B " + |
| "{ " + |
| " o1 = order A by fruit;" + |
| " o2 = order A by fruit desc;" + |
| " generate org.apache.pig.test.utils.AccumulativeSumBag(o1.fruit), " + |
| " org.apache.pig.test.utils.AccumulativeSumBag(o2.fruit); " + |
| "};"); |
| |
| checkAccumulatorOff("C"); |
| } |
| |
| /** |
| * see PIG-1963. |
| * If there is a POSort or PODistinct still remaining in the plan |
| * (after secondary sort optimization), accumulative mode can't |
| * be used as they are blocking operators |
| * @throws IOException |
| */ |
| @Test |
| public void testAccumulatorOffOnDistinct() throws IOException{ |
| pigServer.registerQuery("A = load '" + INPUT_FILE2 + "' as (id:int, fruit, category);"); |
| pigServer.registerQuery("B = group A by id;"); |
| |
| pigServer.registerQuery("C = foreach B " + |
| "{ " + |
| " o1 = order A by fruit;" + |
| " d2 = distinct A.category;" + |
| " generate org.apache.pig.test.utils.AccumulativeSumBag(o1.fruit), " + |
| " org.apache.pig.test.utils.AccumulativeSumBag(d2); " + |
| "};"); |
| |
| checkAccumulatorOff("C"); |
| } |
| |
| @Test |
| public void testAccumulatorOff() throws IOException{ |
| pigServer.getPigContext().getProperties().setProperty( |
| PigConfiguration.PIG_OPT_ACCUMULATOR, "false"); |
| |
| pigServer.registerQuery("A = load '" + INPUT_FILE2 + "' as (id:int, fruit);"); |
| pigServer.registerQuery("B = group A by id;"); |
| pigServer.registerQuery("C = foreach B generate group," + |
| "org.apache.pig.test.utils.AccumulativeSumBag(A);"); |
| |
| checkAccumulatorOff("C"); |
| pigServer.getPigContext().getProperties().setProperty( |
| PigConfiguration.PIG_OPT_ACCUMULATOR, "true"); |
| |
| } |
| |
| private void checkAccumulatorOff(String alias) { |
| try { |
| Iterator<Tuple> iter = pigServer.openIterator(alias); |
| while(iter.hasNext()) { |
| iter.next(); |
| } |
| fail("Accumulator should be off."); |
| }catch(Exception e) { |
| // we should get exception |
| } |
| } |
| |
| @Test |
| public void testAccumWithMap() throws IOException{ |
| pigServer.registerQuery("A = load '" + INPUT_FILE4 + "' as (id, url);"); |
| pigServer.registerQuery("B = group A by (id, url);"); |
| pigServer.registerQuery("C = foreach B generate COUNT(A)," + |
| "org.apache.pig.test.utils.URLPARSE(group.url)#'url';"); |
| |
| HashMap<Integer, String> expected = new HashMap<Integer, String>(); |
| expected.put(2, "http://ibm.com"); |
| expected.put(1, "http://yahoo.com"); |
| expected.put(1, "http://sun.com"); |
| |
| Iterator<Tuple> iter = pigServer.openIterator("C"); |
| |
| while(iter.hasNext()) { |
| Tuple t = iter.next(); |
| assertEquals(expected.get((Long)t.get(0)), (String)t.get(1)); |
| } |
| } |
| |
| @Test // PIG-1837 |
| public void testBinCondCheck() throws Exception { |
| PrintWriter w = new PrintWriter(new FileWriter("data1")); |
| w.println("1\t11"); |
| w.println("2\t15"); |
| w.close(); |
| |
| Util.copyFromLocalToCluster(cluster, "data1", "data1"); |
| |
| w = new PrintWriter(new FileWriter("data2")); |
| w.println("1\t10"); |
| w.println("4\t11"); |
| w.println("5\t10"); |
| w.close(); |
| |
| Util.copyFromLocalToCluster(cluster, "data2", "data2"); |
| |
| pigServer.registerQuery("A = load 'data1' as (x:int, y:int);"); |
| pigServer.registerQuery("B = load 'data2' as (x:int, z:int);"); |
| pigServer.registerQuery("C = cogroup A by x, B by x;"); |
| pigServer.registerQuery("D = foreach C generate group," + |
| "SUM((IsEmpty(A.y) ? {(0)} : A.y)) + SUM((IsEmpty(B.z) ? {(0)} : B.z));"); |
| |
| HashMap<Integer, Long> expected = new HashMap<Integer, Long>(); |
| expected.put(1, 21l); |
| expected.put(2, 15l); |
| expected.put(4, 11l); |
| expected.put(5, 10l); |
| |
| Iterator<Tuple> iter = pigServer.openIterator("D"); |
| while(iter.hasNext()) { |
| Tuple t = iter.next(); |
| assertEquals(expected.get((Integer)t.get(0)), (Long)t.get(1)); |
| } |
| } |
| |
| /** |
| * see PIG-1911 . |
| * accumulator udf reading from a nested relational op. generate projects |
| * only the accumulator udf. |
| * @throws IOException |
| * @throws ParseException |
| */ |
| @Test |
| public void testAccumAfterNestedOp() throws IOException, ParserException{ |
| // test group by |
| pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, fruit);"); |
| pigServer.registerQuery("B = foreach A generate id;"); //additional test to enable columnprune(PIG-5224) |
| pigServer.registerQuery("C = group B by id;"); |
| pigServer.registerQuery("D = foreach C " + |
| "{ o = order B by id; " + |
| " generate org.apache.pig.test.utils.AccumulatorBagCount(o);}; "); |
| |
| Iterator<Tuple> iter = pigServer.openIterator("D"); |
| List<Tuple> expectedRes = |
| Util.getTuplesFromConstantTupleStrings( |
| new String[] { |
| "(2)", |
| "(1)", |
| "(3)", |
| "(1)" |
| }); |
| Util.checkQueryOutputsAfterSort(iter, expectedRes); |
| } |
| |
| |
| } |