blob: a5994ded169c8cde02a59dfbb6de72f9c42a8d1e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.test;
import static org.apache.pig.builtin.mock.Storage.tuple;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import org.apache.pig.PigConfiguration;
import org.apache.pig.PigServer;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
import org.apache.pig.data.Tuple;
import org.apache.pig.parser.ParserException;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestAccumulator {
private static final String INPUT_FILE1 = "AccumulatorInput1.txt";
private static final String INPUT_FILE2 = "AccumulatorInput2.txt";
private static final String INPUT_FILE3 = "AccumulatorInput3.txt";
private static final String INPUT_FILE4 = "AccumulatorInput4.txt";
private static final String INPUT_DIR = Util.getTestDirectory(TestAccumulator.class);
private static PigServer pigServer;
private static Properties properties;
private static MiniGenericCluster cluster;
@BeforeClass
public static void oneTimeSetUp() throws Exception {
cluster = MiniGenericCluster.buildCluster();
properties = cluster.getProperties();
properties.setProperty("pig.accumulative.batchsize", "2");
properties.setProperty("pig.exec.nocombiner", "true");
// Reducing the number of retry attempts to speed up test completion
properties.setProperty(MRConfiguration.MAP_MAX_ATTEMPTS,"1");
properties.setProperty(MRConfiguration.REDUCE_MAX_ATTEMPTS,"1");
createFiles();
}
@AfterClass
public static void oneTimeTearDown() throws Exception {
deleteFiles();
cluster.shutDown();
}
@Before
public void setUp() throws Exception {
Util.resetStateForExecModeSwitch();
// Drop stale configuration from previous test run
properties.remove(PigConfiguration.PIG_OPT_ACCUMULATOR);
pigServer = new PigServer(cluster.getExecType(), properties);
}
@After
public void tearDown() throws Exception {
pigServer.shutdown();
pigServer = null;
}
private static void createFiles() throws IOException {
new File(INPUT_DIR).mkdirs();
PrintWriter w = new PrintWriter(new FileWriter(INPUT_DIR + "/" + INPUT_FILE1));
w.println("100\tapple");
w.println("200\torange");
w.println("300\tstrawberry");
w.println("300\tpear");
w.println("100\tapple");
w.println("300\tpear");
w.println("400\tapple");
w.close();
Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE1, INPUT_FILE1);
w = new PrintWriter(new FileWriter(INPUT_DIR + "/" + INPUT_FILE2));
w.println("100\t");
w.println("100\t");
w.println("200\t");
w.println("200\t");
w.println("300\tstrawberry");
w.close();
Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE2, INPUT_FILE2);
w = new PrintWriter(new FileWriter(INPUT_DIR + "/" + INPUT_FILE3));
w.println("100\t1.0");
w.println("100\t2.0");
w.println("200\t1.1");
w.println("200\t2.1");
w.println("100\t3.0");
w.println("100\t4.0");
w.println("200\t3.1");
w.println("100\t5.0");
w.println("300\t3.3");
w.println("400\t");
w.println("400\t");
w.close();
Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE3, INPUT_FILE3);
w = new PrintWriter(new FileWriter(INPUT_DIR + "/" + INPUT_FILE4));
w.println("100\thttp://ibm.com,ibm");
w.println("100\thttp://ibm.com,ibm");
w.println("200\thttp://yahoo.com,yahoo");
w.println("300\thttp://sun.com,sun");
w.close();
Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE4, INPUT_FILE4);
}
private static void deleteFiles() {
Util.deleteDirectory(new File(INPUT_DIR));
}
@Test
public void testAccumBasic() throws IOException{
// test group by
pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, fruit);");
pigServer.registerQuery("B = group A by id;");
pigServer.registerQuery("C = foreach B generate group, org.apache.pig.test.utils.AccumulatorBagCount(A);");
HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>();
expected.put(100, 2);
expected.put(200, 1);
expected.put(300, 3);
expected.put(400, 1);
Iterator<Tuple> iter = pigServer.openIterator("C");
while(iter.hasNext()) {
Tuple t = iter.next();
assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));
}
pigServer.registerQuery("B = group A by id;");
pigServer.registerQuery("C = foreach B generate group, " +
"org.apache.pig.test.utils.AccumulatorBagCount(A), org.apache.pig.test.utils.BagCount(A);");
try{
iter = pigServer.openIterator("C");
while(iter.hasNext()) {
Tuple t = iter.next();
assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));
}
fail("accumulator should not be called.");
}catch(IOException e) {
// should throw exception from AccumulatorBagCount.
}
// test cogroup
pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, fruit);");
pigServer.registerQuery("B = load '" + INPUT_FILE1 + "' as (id:int, fruit);");
pigServer.registerQuery("C = cogroup A by id, B by id;");
pigServer.registerQuery("D = foreach C generate group, " +
"org.apache.pig.test.utils.AccumulatorBagCount(A), org.apache.pig.test.utils.AccumulatorBagCount(B);");
HashMap<Integer, String> expected2 = new HashMap<Integer, String>();
expected2.put(100, "2,2");
expected2.put(200, "1,1");
expected2.put(300, "3,3");
expected2.put(400, "1,1");
iter = pigServer.openIterator("D");
while(iter.hasNext()) {
Tuple t = iter.next();
assertEquals(expected2.get((Integer)t.get(0)), t.get(1).toString()+","+t.get(2).toString());
}
}
@Test
public void testAccumWithNegative() throws IOException{
pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, fruit);");
pigServer.registerQuery("B = group A by id;");
pigServer.registerQuery("C = foreach B generate group, -org.apache.pig.test.utils.AccumulatorBagCount(A);");
HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>();
expected.put(100, -2);
expected.put(200, -1);
expected.put(300, -3);
expected.put(400, -1);
Iterator<Tuple> iter = pigServer.openIterator("C");
while(iter.hasNext()) {
Tuple t = iter.next();
assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));
}
}
@Test
public void testAccumWithAdd() throws IOException{
pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, fruit);");
pigServer.registerQuery("B = group A by id;");
pigServer.registerQuery("C = foreach B generate group, org.apache.pig.test.utils.AccumulatorBagCount(A)+1.0;");
{
HashMap<Integer, Double> expected = new HashMap<Integer, Double>();
expected.put(100, 3.0);
expected.put(200, 2.0);
expected.put(300, 4.0);
expected.put(400, 2.0);
Iterator<Tuple> iter = pigServer.openIterator("C");
while(iter.hasNext()) {
Tuple t = iter.next();
assertEquals(expected.get((Integer)t.get(0)), (Double)t.get(1));
}
}
{
pigServer.registerQuery("C = foreach B generate group, " +
"org.apache.pig.test.utils.AccumulatorBagCount(A)+org.apache.pig.test.utils.AccumulatorBagCount(A);");
HashMap<Integer, Integer>expected = new HashMap<Integer, Integer>();
expected.put(100, 4);
expected.put(200, 2);
expected.put(300, 6);
expected.put(400, 2);
Iterator<Tuple> iter = pigServer.openIterator("C");
while(iter.hasNext()) {
Tuple t = iter.next();
assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));
}
}
}
@Test
public void testAccumWithMinus() throws IOException{
pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, fruit);");
pigServer.registerQuery("B = group A by id;");
pigServer.registerQuery("C = foreach B generate group, " +
" org.apache.pig.test.utils.AccumulatorBagCount(A)*3.0-org.apache.pig.test.utils.AccumulatorBagCount(A);");
HashMap<Integer, Double> expected = new HashMap<Integer, Double>();
expected.put(100, 4.0);
expected.put(200, 2.0);
expected.put(300, 6.0);
expected.put(400, 2.0);
Iterator<Tuple> iter = pigServer.openIterator("C");
while(iter.hasNext()) {
Tuple t = iter.next();
assertEquals(expected.get((Integer)t.get(0)), (Double)t.get(1));
}
}
@Test
public void testAccumWithMod() throws IOException{
pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, fruit);");
pigServer.registerQuery("B = group A by id;");
pigServer.registerQuery("C = foreach B generate group, " +
"org.apache.pig.test.utils.AccumulatorBagCount(A) % 2;");
HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>();
expected.put(100, 0);
expected.put(200, 1);
expected.put(300, 1);
expected.put(400, 1);
Iterator<Tuple> iter = pigServer.openIterator("C");
while(iter.hasNext()) {
Tuple t = iter.next();
assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));
}
}
@Test
public void testAccumWithDivide() throws IOException{
pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, fruit);");
pigServer.registerQuery("B = group A by id;");
pigServer.registerQuery("C = foreach B generate group, " +
"org.apache.pig.test.utils.AccumulatorBagCount(A)/2;");
HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>();
expected.put(100, 1);
expected.put(200, 0);
expected.put(300, 1);
expected.put(400, 0);
Iterator<Tuple> iter = pigServer.openIterator("C");
while(iter.hasNext()) {
Tuple t = iter.next();
assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));
}
}
@Test
public void testAccumWithAnd() throws IOException{
pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, fruit);");
pigServer.registerQuery("B = group A by id;");
pigServer.registerQuery("C = foreach B generate group, " +
"((org.apache.pig.test.utils.AccumulatorBagCount(A)>1 and " +
"org.apache.pig.test.utils.AccumulatorBagCount(A)<3)?0:1);");
HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>();
expected.put(100, 0);
expected.put(200, 1);
expected.put(300, 1);
expected.put(400, 1);
Iterator<Tuple> iter = pigServer.openIterator("C");
while(iter.hasNext()) {
Tuple t = iter.next();
assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));
}
}
@Test
public void testAccumWithOr() throws IOException{
pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, fruit);");
pigServer.registerQuery("B = group A by id;");
pigServer.registerQuery("C = foreach B generate group, " +
"((org.apache.pig.test.utils.AccumulatorBagCount(A)>3 or " +
"org.apache.pig.test.utils.AccumulatorBagCount(A)<2)?0:1);");
HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>();
expected.put(100, 1);
expected.put(200, 0);
expected.put(300, 1);
expected.put(400, 0);
Iterator<Tuple> iter = pigServer.openIterator("C");
while(iter.hasNext()) {
Tuple t = iter.next();
assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));
}
}
@Test
public void testAccumWithRegexp() throws IOException{
pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, fruit);");
pigServer.registerQuery("B = group A by id;");
pigServer.registerQuery("C = foreach B generate group, " +
"(((chararray)org.apache.pig.test.utils.AccumulatorBagCount(A)) matches '1*' ?0:1);");
HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>();
expected.put(100, 1);
expected.put(200, 0);
expected.put(300, 1);
expected.put(400, 0);
Iterator<Tuple> iter = pigServer.openIterator("C");
while(iter.hasNext()) {
Tuple t = iter.next();
assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));
}
}
@Test
public void testAccumWithIsNull() throws IOException{
pigServer.registerQuery("A = load '" + INPUT_FILE2 + "' as (id:int, fruit);");
pigServer.registerQuery("B = group A by id;");
pigServer.registerQuery("C = foreach B generate group, " +
"((chararray)org.apache.pig.test.utils.AccumulativeSumBag(A) is null?0:1);");
HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>();
expected.put(100, 0);
expected.put(200, 0);
expected.put(300, 1);
Iterator<Tuple> iter = pigServer.openIterator("C");
while(iter.hasNext()) {
Tuple t = iter.next();
assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));
}
}
@SuppressWarnings("unchecked")
@Test
public void testAccumWithIsEmpty() throws IOException{
pigServer.getPigContext().getProperties().setProperty("pig.accumulative.batchsize", "1");
pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, fruit);");
pigServer.registerQuery("B = load '" + INPUT_FILE2 + "' as (id:int, fruit);");
pigServer.registerQuery("C = cogroup A by id outer, B by id outer;");
pigServer.registerQuery("D = foreach C generate group," +
"(int)(IsEmpty(A) ? 0 : SUM(A.id)) as suma," +
"(int)(IsEmpty(B) ? 0 : SUM(B.id)) as sumb;");
List<Tuple> expected = new ArrayList<Tuple>();
expected.add(tuple(100, 200, 200));
expected.add(tuple(200, 200, 400));
expected.add(tuple(300, 900, 300));
expected.add(tuple(400, 400, 0));
Iterator<Tuple> iter = pigServer.openIterator("D");
List<Tuple> actual = new ArrayList<Tuple>();
while(iter.hasNext()) {
actual.add(iter.next());
}
Collections.sort(actual);
assertEquals(expected, actual);
}
@Test
public void testAccumWithDistinct() throws IOException{
pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, f);");
pigServer.registerQuery("B = group A by id;");
pigServer.registerQuery("C = foreach B { D = distinct A;" +
"generate group, org.apache.pig.test.utils.AccumulatorBagCount(D)+1;};");
HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>();
expected.put(100, 2);
expected.put(200, 2);
expected.put(300, 3);
expected.put(400, 2);
Iterator<Tuple> iter = pigServer.openIterator("C");
int count = 0;
while(iter.hasNext()) {
count++;
Tuple t = iter.next();
assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));
}
assertEquals(4, count);
}
@Test
public void testAccumWithSort() throws IOException{
pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, f);");
pigServer.registerQuery("B = foreach A generate id, f, id as t;");
pigServer.registerQuery("C = group B by id;");
pigServer.registerQuery("D = foreach C { E = order B by f; F = E.f;" +
"generate group, org.apache.pig.test.utils.AccumulativeSumBag(F);};");
HashMap<Integer, String> expected = new HashMap<Integer, String>();
expected.put(100, "(apple)(apple)");
expected.put(200, "(orange)");
expected.put(300, "(pear)(pear)(strawberry)");
expected.put(400, "(apple)");
Iterator<Tuple> iter = pigServer.openIterator("D");
int count = 0;
while(iter.hasNext()) {
count++;
Tuple t = iter.next();
assertEquals(expected.get((Integer)t.get(0)), (String)t.get(1));
}
assertEquals(4, count);
}
@Test
public void testAccumWithBuiltinAvg() throws IOException {
HashMap<Integer, Double> expected = new HashMap<Integer, Double>();
expected.put(100, 3.0);
expected.put(200, 2.1);
expected.put(300, 3.3);
expected.put(400, null);
// Test all the averages for correct behaviour with null values
String[] types = { "double", "float", "int", "long" };
for (int i = 0; i < types.length; i++) {
if (i > 1) { // adjust decimal error for non real types
expected.put(200, 2.0);
expected.put(300, 3.0);
}
pigServer.registerQuery("A = load '" + INPUT_FILE3 + "' as (id:int, v:"
+ types[i] + ");");
pigServer.registerQuery("C = group A by id;");
pigServer.registerQuery("D = foreach C generate group, AVG(A.v);");
Iterator<Tuple> iter = pigServer.openIterator("D");
while (iter.hasNext()) {
Tuple t = iter.next();
Double v = expected.get((Integer) t.get(0));
if (v != null) {
assertEquals(v.doubleValue(), ((Number) t.get(1)).doubleValue(),
0.0001);
} else {
assertNull(t.get(1));
}
}
}
}
@Test
public void testAccumWithBuiltin() throws IOException{
pigServer.registerQuery("A = load '" + INPUT_FILE3 + "' as (id:int, v:double);");
pigServer.registerQuery("C = group A by id;");
// moving AVG accumulator test to separate test case
pigServer.registerQuery("D = foreach C generate group, SUM(A.v), COUNT(A.v), MIN(A.v), MAX(A.v);");
HashMap<Integer, Double[]> expected = new HashMap<Integer, Double[]>();
expected.put(100, new Double[]{15.0, 5.0, 1.0, 5.0});
expected.put(200, new Double[]{6.3, 3.0, 1.1, 3.1});
expected.put(300, new Double[]{3.3, 1.0, 3.3, 3.3});
expected.put(400, new Double[] { null, 0.0, null, null });
Iterator<Tuple> iter = pigServer.openIterator("D");
while(iter.hasNext()) {
Tuple t = iter.next();
Double[] v = expected.get((Integer)t.get(0));
for(int i=0; i<v.length; i++) {
if (v[i] != null) {
assertEquals(v[i].doubleValue(), ((Number) t.get(i + 1))
.doubleValue(), 0.0001);
} else {
assertNull(t.get(i + 1));
}
}
}
}
// Pig 4365
@Test
public void testAccumWithTOP() throws IOException{
pigServer.registerQuery("A = load '" + INPUT_FILE3 + "' as (id:int, v:double);");
pigServer.registerQuery("B = group A all;");
pigServer.registerQuery("D = foreach B { C = TOP(5, 0, A); generate flatten(C); }");
Iterator<Tuple> iter = pigServer.openIterator("D");
List<Tuple> expected = Util.getTuplesFromConstantTupleStrings(
new String[] {"(200,1.1)", "(200,2.1)", "(300,3.3)", "(400,null)", "(400,null)" });
Util.checkQueryOutputsAfterSort(iter, expected);
}
@Test
public void testAccumWithMultiBuiltin() throws IOException{
pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, c:chararray);");
pigServer.registerQuery("C = group A by 1;");
pigServer.registerQuery("D = foreach C generate SUM(A.id), 1+SUM(A.id)+SUM(A.id);");
Iterator<Tuple> iter = pigServer.openIterator("D");
while(iter.hasNext()) {
Tuple t = iter.next();
t.get(0).toString().equals("1700");
t.get(1).toString().equals("3401");
}
}
// Pig 1105
@Test
public void testAccumCountStar() throws IOException{
pigServer.registerQuery("A = load '" + INPUT_FILE3 + "' as (id:int, v:double);");
pigServer.registerQuery("C = group A by id;");
pigServer.registerQuery("D = foreach C generate group, COUNT_STAR(A.id);");
pigServer.openIterator("D");
}
/**
* see PIG-1963.
* If there is a POSort or PODistinct still remaining in the plan
* (after secondary sort optimization), accumulative mode can't
* be used as they are blocking operators
* @throws IOException
*/
@Test
public void testAccumulatorOffOnSort() throws IOException{
pigServer.registerQuery("A = load '" + INPUT_FILE2 + "' as (id:int, fruit);");
pigServer.registerQuery("B = group A by id;");
//one POSort will remain because secondary sort can be used only for one of them
pigServer.registerQuery("C = foreach B " +
"{ " +
" o1 = order A by fruit;" +
" o2 = order A by fruit desc;" +
" generate org.apache.pig.test.utils.AccumulativeSumBag(o1.fruit), " +
" org.apache.pig.test.utils.AccumulativeSumBag(o2.fruit); " +
"};");
checkAccumulatorOff("C");
}
/**
* see PIG-1963.
* If there is a POSort or PODistinct still remaining in the plan
* (after secondary sort optimization), accumulative mode can't
* be used as they are blocking operators
* @throws IOException
*/
@Test
public void testAccumulatorOffOnDistinct() throws IOException{
pigServer.registerQuery("A = load '" + INPUT_FILE2 + "' as (id:int, fruit, category);");
pigServer.registerQuery("B = group A by id;");
pigServer.registerQuery("C = foreach B " +
"{ " +
" o1 = order A by fruit;" +
" d2 = distinct A.category;" +
" generate org.apache.pig.test.utils.AccumulativeSumBag(o1.fruit), " +
" org.apache.pig.test.utils.AccumulativeSumBag(d2); " +
"};");
checkAccumulatorOff("C");
}
@Test
public void testAccumulatorOff() throws IOException{
pigServer.getPigContext().getProperties().setProperty(
PigConfiguration.PIG_OPT_ACCUMULATOR, "false");
pigServer.registerQuery("A = load '" + INPUT_FILE2 + "' as (id:int, fruit);");
pigServer.registerQuery("B = group A by id;");
pigServer.registerQuery("C = foreach B generate group," +
"org.apache.pig.test.utils.AccumulativeSumBag(A);");
checkAccumulatorOff("C");
pigServer.getPigContext().getProperties().setProperty(
PigConfiguration.PIG_OPT_ACCUMULATOR, "true");
}
private void checkAccumulatorOff(String alias) {
try {
Iterator<Tuple> iter = pigServer.openIterator(alias);
while(iter.hasNext()) {
iter.next();
}
fail("Accumulator should be off.");
}catch(Exception e) {
// we should get exception
}
}
@Test
public void testAccumWithMap() throws IOException{
pigServer.registerQuery("A = load '" + INPUT_FILE4 + "' as (id, url);");
pigServer.registerQuery("B = group A by (id, url);");
pigServer.registerQuery("C = foreach B generate COUNT(A)," +
"org.apache.pig.test.utils.URLPARSE(group.url)#'url';");
HashMap<Integer, String> expected = new HashMap<Integer, String>();
expected.put(2, "http://ibm.com");
expected.put(1, "http://yahoo.com");
expected.put(1, "http://sun.com");
Iterator<Tuple> iter = pigServer.openIterator("C");
while(iter.hasNext()) {
Tuple t = iter.next();
assertEquals(expected.get((Long)t.get(0)), (String)t.get(1));
}
}
@Test // PIG-1837
public void testBinCondCheck() throws Exception {
PrintWriter w = new PrintWriter(new FileWriter("data1"));
w.println("1\t11");
w.println("2\t15");
w.close();
Util.copyFromLocalToCluster(cluster, "data1", "data1");
w = new PrintWriter(new FileWriter("data2"));
w.println("1\t10");
w.println("4\t11");
w.println("5\t10");
w.close();
Util.copyFromLocalToCluster(cluster, "data2", "data2");
pigServer.registerQuery("A = load 'data1' as (x:int, y:int);");
pigServer.registerQuery("B = load 'data2' as (x:int, z:int);");
pigServer.registerQuery("C = cogroup A by x, B by x;");
pigServer.registerQuery("D = foreach C generate group," +
"SUM((IsEmpty(A.y) ? {(0)} : A.y)) + SUM((IsEmpty(B.z) ? {(0)} : B.z));");
HashMap<Integer, Long> expected = new HashMap<Integer, Long>();
expected.put(1, 21l);
expected.put(2, 15l);
expected.put(4, 11l);
expected.put(5, 10l);
Iterator<Tuple> iter = pigServer.openIterator("D");
while(iter.hasNext()) {
Tuple t = iter.next();
assertEquals(expected.get((Integer)t.get(0)), (Long)t.get(1));
}
}
/**
* see PIG-1911 .
* accumulator udf reading from a nested relational op. generate projects
* only the accumulator udf.
* @throws IOException
* @throws ParseException
*/
@Test
public void testAccumAfterNestedOp() throws IOException, ParserException{
// test group by
pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, fruit);");
pigServer.registerQuery("B = foreach A generate id;"); //additional test to enable columnprune(PIG-5224)
pigServer.registerQuery("C = group B by id;");
pigServer.registerQuery("D = foreach C " +
"{ o = order B by id; " +
" generate org.apache.pig.test.utils.AccumulatorBagCount(o);}; ");
Iterator<Tuple> iter = pigServer.openIterator("D");
List<Tuple> expectedRes =
Util.getTuplesFromConstantTupleStrings(
new String[] {
"(2)",
"(1)",
"(3)",
"(1)"
});
Util.checkQueryOutputsAfterSort(iter, expectedRes);
}
}