blob: db35a3c9d2161b9719998a479ed66ff4e5dc40dd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import org.apache.pig.EvalFunc;
import org.apache.pig.PigServer;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DefaultDataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
import org.junit.AfterClass;
import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestCombiner {
private static MiniGenericCluster cluster;
private static Properties properties;
@BeforeClass
public static void oneTimeSetUp() throws Exception {
cluster = MiniGenericCluster.buildCluster();
properties = cluster.getProperties();
}
@AfterClass
public static void oneTimeTearDown() throws Exception {
cluster.shutDown();
}
@Before
public void setUp() throws Exception {
Util.resetStateForExecModeSwitch();
}
@Test
public void testSuccessiveUserFuncs1() throws Exception {
String query = "a = load 'students.txt' as (c1,c2,c3,c4); " +
"c = group a by c2; " +
"f = foreach c generate COUNT(org.apache.pig.builtin.Distinct($1.$2)); " +
"store f into 'out';";
PigServer pigServer = new PigServer(cluster.getExecType(), properties);
PigContext pc = pigServer.getPigContext();
assertTrue((Util.buildMRPlan(Util.buildPp(pigServer, query), pc).getRoots().get(0).combinePlan
.isEmpty()));
pigServer.shutdown();
}
@Test
public void testSuccessiveUserFuncs2() throws Exception {
String dummyUDF = JiraPig1030.class.getName();
String query = "a = load 'students.txt' as (c1,c2,c3,c4); " +
"c = group a by c2; " +
"f = foreach c generate COUNT(" + dummyUDF + "" +
"(org.apache.pig.builtin.Distinct($1.$2)," + dummyUDF + "())); " +
"store f into 'out';";
PigServer pigServer = new PigServer(cluster.getExecType(), properties);
PigContext pc = pigServer.getPigContext();
assertTrue((Util.buildMRPlan(Util.buildPp(pigServer, query), pc).getRoots().get(0).combinePlan
.isEmpty()));
pigServer.shutdown();
}
@Test
public void testOnCluster() throws Exception {
// run the test on cluster
PigServer pigServer = new PigServer(cluster.getExecType(), properties);
String inputFileName = runTest(pigServer);
Util.deleteFile(cluster, inputFileName);
pigServer.shutdown();
}
@Test
public void testLocal() throws Exception {
// run the test locally
FileLocalizer.deleteTempFiles();
runTest(new PigServer("local"));
FileLocalizer.deleteTempFiles();
}
private String runTest(PigServer pig) throws IOException {
List<String> inputLines = new ArrayList<String>();
inputLines.add("a,b,1");
inputLines.add("a,b,1");
inputLines.add("a,c,1");
String inputFileName = loadWithTestLoadFunc("A", pig, inputLines);
pig.registerQuery("B = foreach A generate $0 as (c0:chararray), $1 as (c1:chararray), $2 as (c2:int);");
pig.registerQuery("C = group B by ($0, $1);");
pig.registerQuery("D = foreach C generate flatten(group), COUNT($1) as int;");
// Since the input has no schema, using Util.getTuplesFromConstantTupleStrings fails assert.
List<Tuple> resultTuples = Util.getTuplesFromConstantTupleStrings(
new String[]{
"('a','b',2)",
"('a','c',1)",
});
Iterator<Tuple> resultIterator = pig.openIterator("D");
Util.checkQueryOutputsAfterSort(resultIterator, resultTuples);
return inputFileName;
}
private String loadWithTestLoadFunc(String loadAlias, PigServer pig,
List<String> inputLines) throws IOException {
File inputFile = File.createTempFile("test", "txt");
inputFile.deleteOnExit();
String inputFileName = inputFile.getAbsolutePath();
if (pig.getPigContext().getExecType().isLocal()) {
PrintStream ps = new PrintStream(new FileOutputStream(inputFile));
for (String line : inputLines) {
ps.println(line);
}
ps.close();
} else {
inputFileName = Util.removeColon(inputFileName);
Util.createInputFile(cluster, inputFileName, inputLines.toArray(new String[] {}));
}
pig.registerQuery(loadAlias + " = load '"
+ Util.encodeEscape(inputFileName) + "' using "
+ PigStorage.class.getName() + "(',');");
return inputFileName;
}
@Test
public void testNoCombinerUse() {
// To simulate this, we will have two input files
// with exactly one input record - this should result
// in two map tasks and each would process only one record
// hence the combiner would not get called.
}
@Test
public void testMultiCombinerUse() throws Exception {
// test the scenario where the combiner is called multiple
// times - this can happen when the output of the map > io.sort.mb
// let's set the io.sort.mb to 1MB and > 1 MB map data.
String[] input = new String[500 * 1024];
for (int i = 0; i < input.length; i++) {
if (i % 2 == 0) {
input[i] = Integer.toString(1);
} else {
input[i] = Integer.toString(0);
}
}
Util.createInputFile(cluster, "MultiCombinerUseInput.txt", input);
String oldValue = properties.getProperty("io.sort.mb");
properties.setProperty("io.sort.mb", "1");
PigServer pigServer = new PigServer(cluster.getExecType(), properties);
pigServer.getPigContext().getProperties().setProperty(MRConfiguration.CHILD_JAVA_OPTS, "-Xmx1024m");
pigServer.registerQuery("a = load 'MultiCombinerUseInput.txt' as (x:int);");
pigServer.registerQuery("b = group a all;");
pigServer.registerQuery("c = foreach b generate COUNT(a), SUM(a.$0), " +
"MIN(a.$0), MAX(a.$0), AVG(a.$0), ((double)SUM(a.$0))/COUNT(a.$0)," +
" COUNT(a.$0) + SUM(a.$0) + MAX(a.$0);");
// make sure there is a combine plan in the explain output
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(baos);
pigServer.explain("c", ps);
checkCombinerUsed(pigServer, "c", true);
Iterator<Tuple> it = pigServer.openIterator("c");
Tuple t = it.next();
assertEquals(512000L, t.get(0));
assertEquals(256000L, t.get(1));
assertEquals(0, t.get(2));
assertEquals(1, t.get(3));
assertEquals(0.5, t.get(4));
assertEquals(0.5, t.get(5));
assertEquals(512000L + 256000L + 1, t.get(6));
assertFalse(it.hasNext());
Util.deleteFile(cluster, "MultiCombinerUseInput.txt");
// Reset io.sort.mb to the original value before exit
if (oldValue == null) {
properties.remove("io.sort.mb");
} else {
properties.setProperty("io.sort.mb", oldValue);
}
pigServer.shutdown();
}
@Test
public void testDistinctAggs1() throws Exception {
// test the use of combiner for distinct aggs:
String input[] = {
"pig1\t18\t2.1",
"pig2\t24\t3.3",
"pig5\t45\t2.4",
"pig1\t18\t2.1",
"pig1\t19\t2.1",
"pig2\t24\t4.5",
"pig1\t20\t3.1" };
Util.createInputFile(cluster, "distinctAggs1Input.txt", input);
PigServer pigServer = new PigServer(cluster.getExecType(), properties);
pigServer.registerQuery("a = load 'distinctAggs1Input.txt' as (name:chararray, age:int, gpa:double);");
pigServer.registerQuery("b = group a by name;");
pigServer.registerQuery("c = foreach b {" +
" x = distinct a.age;" +
" y = distinct a.gpa;" +
" z = distinct a;" +
" generate group, COUNT(x), SUM(x.age), SUM(y.gpa), SUM(a.age), " +
" SUM(a.gpa), COUNT(z.age), COUNT(z), SUM(z.age);};");
// make sure there is a combine plan in the explain output
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(baos);
pigServer.explain("c", ps);
checkCombinerUsed(pigServer, "c", true);
HashMap<String, Object[]> results = new HashMap<String, Object[]>();
results.put("pig1", new Object[] { "pig1", 3L, 57L, 5.2, 75L, 9.4, 3L, 3L, 57L });
results.put("pig2", new Object[] { "pig2", 1L, 24L, 7.8, 48L, 7.8, 2L, 2L, 48L });
results.put("pig5", new Object[] { "pig5", 1L, 45L, 2.4, 45L, 2.4, 1L, 1L, 45L });
Iterator<Tuple> it = pigServer.openIterator("c");
while (it.hasNext()) {
Tuple t = it.next();
List<Object> fields = t.getAll();
Object[] expected = results.get(fields.get(0));
int i = 0;
for (Object field : fields) {
assertEquals(expected[i++], field);
}
}
Util.deleteFile(cluster, "distinctAggs1Input.txt");
pigServer.shutdown();
}
@Test
public void testGroupAndUnion() throws Exception {
// test use of combiner when group elements are accessed in the foreach
String input1[] = {
"ABC\t1\ta\t1",
"ABC\t1\tb\t2",
"ABC\t1\ta\t3",
"ABC\t2\tb\t4",
};
Util.createInputFile(cluster, "testGroupElements1.txt", input1);
PigServer pigServer = new PigServer(cluster.getExecType(), properties);
pigServer.debugOn();
pigServer.registerQuery("a1 = load 'testGroupElements1.txt' " +
"as (str:chararray, num1:int, alph : chararray, num2 : int);");
pigServer.registerQuery("b1 = group a1 by str;");
// check if combiner is present or not for various forms of foreach
pigServer.registerQuery("c1 = foreach b1 generate flatten(group), COUNT(a1.alph), SUM(a1.num2); ");
String input2[] = {
"DEF\t2\ta\t3",
"DEF\t2\td\t5",
};
Util.createInputFile(cluster, "testGroupElements2.txt", input2);
pigServer.registerQuery("a2 = load 'testGroupElements2.txt' " +
"as (str:chararray, num1:int, alph : chararray, num2 : int);");
pigServer.registerQuery("b2 = group a2 by str;");
// check if combiner is present or not for various forms of foreach
pigServer.registerQuery("c2 = foreach b2 generate flatten(group), COUNT(a2.alph), SUM(a2.num2); ");
pigServer.registerQuery("c = union c1, c2;");
List<Tuple> expectedRes =
Util.getTuplesFromConstantTupleStrings(
new String[]{
"('ABC',4L,10L)",
"('DEF',2L,8L)",
});
Iterator<Tuple> it = pigServer.openIterator("c");
Util.checkQueryOutputsAfterSort(it, expectedRes);
Util.deleteFile(cluster, "testGroupElements1.txt");
Util.deleteFile(cluster, "testGroupElements2.txt");
pigServer.shutdown();
}
@Test
public void testGroupElements() throws Exception {
// test use of combiner when group elements are accessed in the foreach
String input[] = {
"ABC\t1\ta\t1",
"ABC\t1\tb\t2",
"ABC\t1\ta\t3",
"ABC\t2\tb\t4",
"DEF\t1\td\t1",
"XYZ\t1\tx\t2"
};
Util.createInputFile(cluster, "testGroupElements.txt", input);
PigServer pigServer = new PigServer(cluster.getExecType(), properties);
pigServer.registerQuery("a = load 'testGroupElements.txt' as (str:chararray, num1:int, alph : chararray, num2 : int);");
pigServer.registerQuery("b = group a by (str, num1);");
// check if combiner is present or not for various forms of foreach
pigServer.registerQuery("c = foreach b generate flatten(group), COUNT(a.alph), SUM(a.num2); ");
checkCombinerUsed(pigServer, "c", true);
pigServer.registerQuery("c = foreach b generate group, COUNT(a.alph), SUM(a.num2); ");
checkCombinerUsed(pigServer, "c", true);
// projecting bag - combiner should not be used
pigServer.registerQuery("c = foreach b generate group, a, COUNT(a.alph), SUM(a.num2); ");
checkCombinerUsed(pigServer, "c", false);
// projecting bag - combiner should not be used
pigServer.registerQuery("c = foreach b generate group, a.num2, COUNT(a.alph), SUM(a.num2); ");
checkCombinerUsed(pigServer, "c", false);
pigServer.registerQuery("c = foreach b generate group.$0, group.$1, COUNT(a.alph), SUM(a.num2); ");
checkCombinerUsed(pigServer, "c", true);
pigServer.registerQuery("c = foreach b generate group.$0, group.$1 + COUNT(a.alph), SUM(a.num2); ");
checkCombinerUsed(pigServer, "c", true);
pigServer.registerQuery("c = foreach b generate group.str, group.$1, COUNT(a.alph), SUM(a.num2); ");
checkCombinerUsed(pigServer, "c", true);
pigServer.registerQuery("c = foreach b generate group.str, group.$1, COUNT(a.alph), SUM(a.num2), "
+ " (group.num1 == 1 ? (COUNT(a.num2) + 1) : (SUM(a.num2) + 10)) ; ");
checkCombinerUsed(pigServer, "c", true);
List<Tuple> expectedRes =
Util.getTuplesFromConstantTupleStrings(
new String[] {
"('ABC',1,3L,6L,4L)",
"('ABC',2,1L,4L,14L)",
"('DEF',1,1L,1L,2L)",
"('XYZ',1,1L,2L,2L)",
});
Iterator<Tuple> it = pigServer.openIterator("c");
Util.checkQueryOutputsAfterSort(it, expectedRes);
Util.deleteFile(cluster, "distinctAggs1Input.txt");
pigServer.shutdown();
}
@Test
public void testGroupByLimit() throws Exception {
Assume.assumeFalse("Skip this test for Tez till PIG-5249 is fixed",
Util.isTezExecType(cluster.getExecType()));
// test use of combiner when group elements are accessed in the foreach
String input[] = {
"ABC 1",
"ABC 2",
"DEF 1",
"XYZ 1",
"XYZ 2",
"XYZ 3",
};
Util.createInputFile(cluster, "testGroupLimit.txt", input);
PigServer pigServer = new PigServer(cluster.getExecType(), properties);
pigServer.registerQuery("a = load 'testGroupLimit.txt' using PigStorage(' ') " +
"as (str:chararray, num1:int) ;");
pigServer.registerQuery("b = group a by str;");
pigServer.registerQuery("c = foreach b generate group, COUNT(a.num1) ; ");
// check if combiner is present
pigServer.registerQuery("d = limit c 2 ; ");
checkCombinerUsed(pigServer, "d", true);
List<Tuple> expectedRes =
Util.getTuplesFromConstantTupleStrings(
new String[] {
"('ABC',2L)",
"('DEF',1L)",
});
Iterator<Tuple> it = pigServer.openIterator("d");
Util.checkQueryOutputsAfterSort(it, expectedRes);
pigServer.shutdown();
}
private void checkCombinerUsed(PigServer pigServer, String alias, boolean combineExpected)
throws IOException {
// make sure there is a combine plan in the explain output
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(baos);
pigServer.explain(alias, ps);
boolean combinerFound;
if (pigServer.getPigContext().getExecType().name().equalsIgnoreCase("spark")) {
combinerFound = baos.toString().contains("Reduce By");
} else {
combinerFound = baos.toString().matches("(?si).*combine plan.*");
}
System.out.println(baos.toString());
assertEquals("is combiner present as expected", combineExpected, combinerFound);
}
@Test
public void testDistinctNoCombiner() throws Exception {
// test that combiner is NOT invoked when
// one of the elements in the foreach generate
// is a distinct() as the leaf
String input[] = {
"pig1\t18\t2.1",
"pig2\t24\t3.3",
"pig5\t45\t2.4",
"pig1\t18\t2.1",
"pig1\t19\t2.1",
"pig2\t24\t4.5",
"pig1\t20\t3.1" };
Util.createInputFile(cluster, "distinctNoCombinerInput.txt", input);
PigServer pigServer = new PigServer(cluster.getExecType(), properties);
pigServer.registerQuery("a = load 'distinctNoCombinerInput.txt' as (name:chararray, age:int, gpa:double);");
pigServer.registerQuery("b = group a by name;");
pigServer.registerQuery("c = foreach b {" +
" z = distinct a;" +
" generate group, z, SUM(a.age), SUM(a.gpa);};");
// make sure there is a combine plan in the explain output
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(baos);
pigServer.explain("c", ps);
assertFalse(baos.toString().matches("(?si).*combine plan.*"));
HashMap<String, Object[]> results = new HashMap<String, Object[]>();
results.put("pig1", new Object[] { "pig1", "bag-place-holder", 75L, 9.4 });
results.put("pig2", new Object[] { "pig2", "bag-place-holder", 48L, 7.8 });
results.put("pig5", new Object[] { "pig5", "bag-place-holder", 45L, 2.4 });
Iterator<Tuple> it = pigServer.openIterator("c");
while (it.hasNext()) {
Tuple t = it.next();
List<Object> fields = t.getAll();
Object[] expected = results.get(fields.get(0));
int i = 0;
for (Object field : fields) {
if (i == 1) {
// ignore the second field which is a bag
// for comparison here
continue;
}
assertEquals(expected[i++], field);
}
}
Util.deleteFile(cluster, "distinctNoCombinerInput.txt");
pigServer.shutdown();
}
@Test
public void testForEachNoCombiner() throws Exception {
// test that combiner is NOT invoked when
// one of the elements in the foreach generate
// has a foreach in the plan without a distinct agg
String input[] = {
"pig1\t18\t2.1",
"pig2\t24\t3.3",
"pig5\t45\t2.4",
"pig1\t18\t2.1",
"pig1\t19\t2.1",
"pig2\t24\t4.5",
"pig1\t20\t3.1" };
Util.createInputFile(cluster, "forEachNoCombinerInput.txt", input);
PigServer pigServer = new PigServer(cluster.getExecType(), properties);
pigServer.registerQuery("a = load 'forEachNoCombinerInput.txt' as (name:chararray, age:int, gpa:double);");
pigServer.registerQuery("b = group a by name;");
pigServer.registerQuery("c = foreach b {" +
" z = a.age;" +
" generate group, z, SUM(a.age), SUM(a.gpa);};");
// make sure there is a combine plan in the explain output
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(baos);
pigServer.explain("c", ps);
assertFalse(baos.toString().matches("(?si).*combine plan.*"));
HashMap<String, Object[]> results = new HashMap<String, Object[]>();
results.put("pig1", new Object[] { "pig1", "bag-place-holder", 75L, 9.4 });
results.put("pig2", new Object[] { "pig2", "bag-place-holder", 48L, 7.8 });
results.put("pig5", new Object[] { "pig5", "bag-place-holder", 45L, 2.4 });
Iterator<Tuple> it = pigServer.openIterator("c");
while (it.hasNext()) {
Tuple t = it.next();
List<Object> fields = t.getAll();
Object[] expected = results.get(fields.get(0));
int i = 0;
for (Object field : fields) {
if (i == 1) {
// ignore the second field which is a bag
// for comparison here
continue;
}
assertEquals(expected[i++], field);
}
}
Util.deleteFile(cluster, "forEachNoCombinerInput.txt");
pigServer.shutdown();
}
@Test
public void testJiraPig746() throws Exception {
// test that combiner is NOT invoked when
// one of the elements in the foreach generate
// has a foreach in the plan without a distinct agg
String input[] = {
"pig1\t18\t2.1",
"pig2\t24\t3.3",
"pig5\t45\t2.4",
"pig1\t18\t2.1",
"pig1\t19\t2.1",
"pig2\t24\t4.5",
"pig1\t20\t3.1" };
String expected[] = {
"(pig1,75,{(pig1,18,2.1),(pig1,18,2.1),(pig1,19,2.1),(pig1,20,3.1)})",
"(pig2,48,{(pig2,24,3.3),(pig2,24,4.5)})",
"(pig5,45,{(pig5,45,2.4)})"
};
try {
Util.createInputFile(cluster, "forEachNoCombinerInput.txt", input);
PigServer pigServer = new PigServer(cluster.getExecType(), properties);
pigServer.registerQuery("a = load 'forEachNoCombinerInput.txt' as (name:chararray, age:int, gpa:double);");
pigServer.registerQuery("b = group a by name;");
pigServer.registerQuery("c = foreach b generate group, SUM(a.age), a;");
// make sure there isn't a combine plan in the explain output
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(baos);
pigServer.explain("c", ps);
assertFalse(baos.toString().matches("(?si).*combine plan.*"));
Iterator<Tuple> it = pigServer.openIterator("c");
Util.checkQueryOutputsAfterSortRecursive(it, expected,
"group:chararray,age:long,b:{t:(name:chararray,age:int,gpa:double)}");
pigServer.shutdown();
} finally {
Util.deleteFile(cluster, "forEachNoCombinerInput.txt");
}
}
public static class JiraPig1030 extends EvalFunc<DataBag> {
@Override
public DataBag exec(Tuple input) throws IOException {
return new DefaultDataBag();
}
}
@Test
public void testJiraPig1030() throws Exception {
// test that combiner is NOT invoked when
// one of the elements in the foreach generate
// has a non-algebraic UDF that have multiple inputs
// (one of them is distinct).
String input[] = {
"pig1\t18\t2.1",
"pig2\t24\t3.3",
"pig5\t45\t2.4",
"pig1\t18\t2.1",
"pig1\t19\t2.1",
"pig2\t24\t4.5",
"pig1\t20\t3.1" };
try {
Util.createInputFile(cluster, "forEachNoCombinerInput.txt", input);
PigServer pigServer = new PigServer(cluster.getExecType(), properties);
pigServer.registerQuery("a = load 'forEachNoCombinerInput.txt' as (name:chararray, age:int, gpa:double);");
pigServer.registerQuery("b = group a all;");
pigServer.registerQuery("c = foreach b {" +
" d = distinct a.age;" +
" generate group, " + JiraPig1030.class.getName() + "(d, 0);};");
// make sure there isn't a combine plan in the explain output
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(baos);
pigServer.explain("c", ps);
assertFalse(baos.toString().matches("(?si).*combine plan.*"));
pigServer.shutdown();
} finally {
Util.deleteFile(cluster, "forEachNoCombinerInput.txt");
}
}
}