blob: e7e8815ab6e70738cf406f9cf19be9e21fec181f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.test;
import static org.apache.pig.builtin.mock.Storage.tuple;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.hadoop.conf.Configuration;
import org.apache.pig.FuncSpec;
import org.apache.pig.PigConfiguration;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg;
import org.apache.pig.builtin.IntSum;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.util.Spillable;
import org.apache.pig.parser.ParserException;
import org.apache.pig.test.utils.GenPhyOp;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
import com.google.common.base.Strings;
/**
* Test POPartialAgg runtime
*/
public class TestPOPartialAgg {
private static ExecutorService executor = Executors.newSingleThreadExecutor();
private POPartialAgg partAggOp;
private PhysicalPlan parentPlan;
@AfterClass
public static void oneTimeTearDown() {
executor.shutdownNow();
}
@Before
public void setUp() throws Exception {
PigMapReduce.sJobConfInternal.set(new Configuration());
createPOPartialPlan(1);
}
private void createPOPartialPlan(int valueCount) throws PlanException {
createPOPartialPlan(valueCount, false);
}
private void createPOPartialPlan(int valueCount, boolean isGroupAll) throws PlanException {
parentPlan = new PhysicalPlan();
partAggOp = new POPartialAgg(GenPhyOp.getOK(), isGroupAll);
partAggOp.setParentPlan(parentPlan);
// setup key plan
PhysicalPlan keyPlan = new PhysicalPlan();
POProject keyProj = new POProject(GenPhyOp.getOK(), -1, 0);
keyProj.setResultType(DataType.INTEGER);
keyPlan.add(keyProj);
partAggOp.setKeyPlan(keyPlan);
// setup value plans
List<PhysicalPlan> valuePlans = new ArrayList<PhysicalPlan>();
for (int i = 0; i < valueCount; i++) {
// project arg for udf
PhysicalPlan valPlan = new PhysicalPlan();
POProject projVal1 = new POProject(GenPhyOp.getOK(), -1, i + 1);
projVal1.setResultType(DataType.BAG);
valPlan.add(projVal1);
// setup udf
List<PhysicalOperator> udfInps = new ArrayList<PhysicalOperator>();
udfInps.add(projVal1);
FuncSpec sumSpec = new FuncSpec(IntSum.Intermediate.class.getName());
POUserFunc sumUdf = new POUserFunc(GenPhyOp.getOK(), -1, udfInps,
sumSpec);
valPlan.add(sumUdf);
valPlan.connect(projVal1, sumUdf);
valuePlans.add(valPlan);
}
partAggOp.setValuePlans(valuePlans);
}
@After
public void tearDown() throws Exception {
}
@Test
public void testPartialOneInput1() throws ExecException, ParserException {
// input tuple has key, and bag containing SUM.Init output
String[] tups1 = { "(1,(2L))" };
Tuple t = Util.getTuplesFromConstantTupleStrings(tups1).get(0);
checkSingleRow(t);
}
@Test
public void testPartialOneInput2() throws ExecException, ParserException {
// input tuple has key, and bag containing SUM.Init output
String[] tups1 = { "(null,(2L))" };
Tuple t = Util.getTuplesFromConstantTupleStrings(tups1).get(0);
checkSingleRow(t);
}
@Test
public void testPartialOneInput3() throws ExecException, ParserException {
// input tuple has key, and bag containing SUM.Init output
String[] tups1 = { "(1,(null))" };
Tuple t = Util.getTuplesFromConstantTupleStrings(tups1).get(0);
checkSingleRow(t);
}
private void checkSingleRow(Tuple t) throws ExecException {
Result res;
// attaching one input tuple, result tuple stays in operator, expect EOP
partAggOp.attachInput(t);
res = partAggOp.getNextTuple();
assertEquals(POStatus.STATUS_EOP, res.returnStatus);
// end of all input, now expecting results
parentPlan.endOfAllInput = true;
res = partAggOp.getNextTuple();
assertEquals(POStatus.STATUS_OK, res.returnStatus);
assertEquals(t, res.result);
}
@Test
public void testPartialAggNoInput() throws ExecException, ParserException {
// nothing attached, expecting EOP
Result res = partAggOp.getNextTuple();
assertEquals(POStatus.STATUS_EOP, res.returnStatus);
// end of all input, still no results
parentPlan.endOfAllInput = true;
res = partAggOp.getNextTuple();
assertEquals(POStatus.STATUS_EOP, res.returnStatus);
}
@Test
public void testPartialMultiInput1() throws Exception {
// input tuple has key, and bag containing SUM.Init output
String[] inputTups = { "(1,(1L))", "(1,(2L))", "(2,(1L))" };
String[] outputTups = { "(1,(3L))", "(2,(1L))" };
checkInputAndOutput(inputTups, outputTups, false);
}
@Test
public void testPartialMultiInput2() throws Exception {
// input tuple has key, and bag containing SUM.Init output
String[] inputTups = { "(1,(1L))", "(2,(2L))", "(1,(2L))" };
String[] outputTups = { "(1,(3L))", "(2,(2L))" };
checkInputAndOutput(inputTups, outputTups, false);
}
@Test
public void testPartialMultiInput3() throws Exception {
// input tuple has key, and bag containing SUM.Init output
String[] inputTups = { "(null,(1L))", "(null,(2L))", "(null,(2L))" };
String[] outputTups = { "(null,(5L))" };
checkInputAndOutput(inputTups, outputTups, false);
}
@Test
public void testPartialMultiInput4() throws Exception {
// input tuple has key, and bag containing SUM.Init output
String[] inputTups = { "(1,(1L))", "(2,(2L))", "(null,(2L))" };
String[] outputTups = { "(1,(1L))", "(2,(2L))", "(null,(2L))" };
checkInputAndOutput(inputTups, outputTups, false);
}
@Test
// The case where there is no memory for use by hashmap
public void testPartialMultiInputHashMemEmpty1() throws Exception {
// input tuple has key, and bag containing SUM.Init output
String[] inputTups = { "(1,(1L))", "(2,(2L))", "(null,(2L))" };
String[] outputTups = { "(1,(1L))", "(2,(2L))", "(null,(2L))" };
checkInputAndOutput(inputTups, outputTups, true);
}
@Test
// The case where there is no memory for use by hashmap
public void testPartialMultiInputHashMemEmpty2() throws Exception {
// input tuple has key, and bag containing SUM.Init output
String[] inputTups = { "(1,(1L))", "(2,(2L))", "(1,(2L))" };
// since the group keys with same value are not in consecutive rows
// and hashmap is not given any memory they don't get
// aggreated with POPartialAgg
String[] outputTups = { "(1,(1L))", "(2,(2L))", "(1,(2L))" };
checkInputAndOutput(inputTups, outputTups, true);
}
@Test
public void testMultiInput1HashMemEmpty() throws Exception {
// input tuple has key, and bag containing SUM.Init output
String[] inputTups = { "(1,(1L))", "(2,(2L))", "(1,(2L))" };
String[] outputTups = { "(1,(3L))", "(2,(2L))" };
checkInputAndOutput(inputTups, outputTups, false);
}
@Test
public void testPartialMultiInputMultiInput1HashMemEmpty() throws Exception {
// input tuple has key, and bag containing SUM.Init output
String[] inputTups = { "(null,(1L))", "(null,(2L))", "(null,(2L))" };
String[] outputTups = { "(null,(5L))" };
checkInputAndOutput(inputTups, outputTups, false);
}
@Test
public void testMultiVals() throws Exception {
// more than one value to be aggregated
createPOPartialPlan(2);
// input tuple has key, and bag containing SUM.Init output
String[] inputTups = { "(1,(1L),(2L))", "(2,(2L),(1L))", "(1,(2L),(2L))" };
String[] outputTups = { "(1,(3L),(4L))", "(2,(2L),(1L))" };
checkInputAndOutput(inputTups, outputTups, false);
}
@Test
public void testMultiValCheckNotDisabled() throws Exception {
// "large" number of values per input to aggregate but good reduction
// in size due to aggregation.
// This case should result in a reduction from 10500 inputs to 500
// outputs (factor of 20), so in-memory aggregation should not be
// disabled in checkSize(). If it is disabled, too many output rows
// will be generated.
int numKeys = 500;
int numVals = 3;
createPOPartialPlan(numVals);
// Build a string of values to use in all input tuples
String vals = Strings.repeat(",(1L)", numVals);
// And input tuples.
// We need the next multiple of numKeys over 10,000 because we need to
// trigger the size check (at 10,000), and we want an even multiple of
// numKeys so result values end up even across keys
int numInputs = (10000 + numKeys * 2 - 1) / numKeys * numKeys;
String[] inputTups = new String[numInputs];
for (int i = 0; i < numInputs; i++) {
inputTups[i] = "(" + (i % numKeys) + vals + ")";
}
// Build expected results
int expectedVal = numInputs / numKeys;
vals = Strings.repeat(",(" + expectedVal + "L)", numVals);
String[] outputTups = new String[numKeys];
for (int i = 0; i < numKeys; i++) {
outputTups[i] = "(" + i + vals + ")";
}
// input tuple has key, and bag containing SUM.Init output
checkInputAndOutput(inputTups, outputTups, false);
}
@Test
public void testMemorySpill1() throws Exception {
// Test spill which only does aggregation
Result res;
for (long i=1; i <= 15; i ++) {
Tuple t = tuple(1, tuple(i));
partAggOp.attachInput(t);
res = partAggOp.getNextTuple();
assertEquals(POStatus.STATUS_EOP, res.returnStatus);
}
Future<Long> spilled = executor.submit(new Spill(partAggOp));
Thread.sleep(100);
partAggOp.attachInput(tuple(2, tuple(-1L)));
assertFalse(spilled.isDone());
res = partAggOp.getNextTuple();
// Since it was aggregated there should be no records emitted
assertEquals(POStatus.STATUS_EOP, res.returnStatus);
Thread.sleep(100);
assertTrue(spilled.isDone());
assertEquals(new Long(1), spilled.get());
List<Tuple> expectedValues = new ArrayList<Tuple>();
expectedValues.add(tuple(1, tuple(120L))); //aggregated result
expectedValues.add(tuple(2, tuple(-1L)));
// end of all input, now expecting all tuples
parentPlan.endOfAllInput = true;
res = partAggOp.getNextTuple();
do {
assertEquals(POStatus.STATUS_OK, res.returnStatus);
assertTrue(expectedValues.remove(res.result));
res = partAggOp.getNextTuple();
} while (res.returnStatus != POStatus.STATUS_EOP);
assertTrue(expectedValues.isEmpty());
}
@Test
public void testMemorySpill2() throws Exception {
// Test spill which emits records as aggregation does not meet secondary tier threshold
Result res = null;
List<Tuple> expectedValues = new ArrayList<Tuple>();
//POPartialAgg.SECOND_TIER_THRESHOLD evaluates to 2000 by default
for (long i=1; i <= 2001; i ++) {
Tuple t = tuple(i, tuple(i));
expectedValues.add(t);
partAggOp.attachInput(t);
res = partAggOp.getNextTuple();
assertEquals(POStatus.STATUS_EOP, res.returnStatus);
}
Future<Long> spilled = executor.submit(new Spill(partAggOp));
Thread.sleep(100);
partAggOp.attachInput(tuple(2, tuple(-1L)));
expectedValues.add(tuple(2, tuple(-1L)));
long i = 0;
res = partAggOp.getNextTuple();
do {
assertFalse(spilled.isDone());
assertEquals(POStatus.STATUS_OK, res.returnStatus);
assertTrue(expectedValues.remove(res.result));
i++;
res = partAggOp.getNextTuple();
} while (res.returnStatus != POStatus.STATUS_EOP);
assertEquals(2002, i);
assertTrue(expectedValues.isEmpty());
Thread.sleep(100);
assertTrue(spilled.isDone());
assertEquals(new Long(1), spilled.get());
}
@Test
public void testGroupAll() throws Exception {
createPOPartialPlan(1, true);
Result res;
for (long i=1; i <= 10010; i ++) {
Tuple t = tuple("all", tuple(i));
partAggOp.attachInput(t);
res = partAggOp.getNextTuple();
assertEquals(POStatus.STATUS_EOP, res.returnStatus);
}
// end of all input, now expecting all tuples
parentPlan.endOfAllInput = true;
res = partAggOp.getNextTuple();
assertEquals(tuple("all", tuple(50105055L)), res.result);
assertEquals(POStatus.STATUS_OK, res.returnStatus);
res = partAggOp.getNextTuple();
assertEquals(POStatus.STATUS_EOP, res.returnStatus);
}
private static class Spill implements Callable<Long> {
private Spillable spillable;
public Spill(Spillable spillable) {
this.spillable = spillable;
}
@Override
public Long call() throws Exception {
return spillable.spill();
}
}
/**
* run the plan on inputTups and check if output matches outputTups if
* isMapMemEmpty is set to true, set memory available for the hash-map to
* zero
*
* @param inputTups
* @param outputTups
* @param isMapMemEmpty
* @throws ParserException
* @throws ExecException
* @throws PlanException
*/
private void checkInputAndOutput(String[] inputTups, String[] outputTups,
boolean isMapMemEmpty) throws Exception {
if (isMapMemEmpty) {
PigMapReduce.sJobConfInternal.get().set(PigConfiguration.PIG_CACHEDBAG_MEMUSAGE,
"0");
}
List<Tuple> inputs = Util.getTuplesFromConstantTupleStrings(inputTups);
List<Tuple> expectedOuts = Util
.getTuplesFromConstantTupleStrings(outputTups);
List<Tuple> outputs = new ArrayList<Tuple>();
// run through the inputs
for (Tuple t : inputs) {
Result res;
// attaching one input tuple, result tuple stays in operator, expect
// EOP
partAggOp.attachInput(t);
res = partAggOp.getNextTuple();
if (isMapMemEmpty) {
addResults(res, outputs);
} else {
assertEquals(POStatus.STATUS_EOP, res.returnStatus);
}
}
// start getting the outputs
// end of all input, now expecting results
parentPlan.endOfAllInput = true;
if (isMapMemEmpty) {
Result res = partAggOp.getNextTuple();
// only one last output expected
addResults(res, outputs);
res = partAggOp.getNextTuple();
assertEquals(POStatus.STATUS_EOP, res.returnStatus);
Util.checkQueryOutputsAfterSort(outputs, expectedOuts);
} else {
while (true) {
Result res = partAggOp.getNextTuple();
if (!addResults(res, outputs)) {
break;
}
}
Util.checkQueryOutputsAfterSort(outputs, expectedOuts);
}
}
private boolean addResults(Result res, List<Tuple> outputs) {
if (res.returnStatus == POStatus.STATUS_EOP) {
return false;
} else if (res.returnStatus == POStatus.STATUS_OK) {
outputs.add((Tuple) res.result);
return true;
} else {
fail("Invalid result status " + res.returnStatus);
return false; // to keep compiler happy
}
}
}