PIG-617 Multi-query M3, support for multiple group bys in one MR job.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/pig/branches/multiquery@769498 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 7fc41e4..4f6df08 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -613,3 +613,5 @@
PIG-627: multiquery support incremental patch (hagleitn via pradeepkth)
PIG-627: multiquery support incremental patch (hagleitn via pradeepkth)
+
+ PIG-627: multiquery support M3 (rding via gates)
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
index 7d9e0b5..89f194b 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
@@ -238,6 +238,8 @@
NoopFilterRemover fRem = new NoopFilterRemover(plan);
fRem.visit();
+ // reduces the number of MROpers in the MR plan generated
+ // by multi-query (multi-store) script.
MultiQueryOptimizer mqOptimizer = new MultiQueryOptimizer(plan);
mqOptimizer.visit();
@@ -246,7 +248,7 @@
// NoopFilterRemover.
NoopStoreRemover sRem = new NoopStoreRemover(plan);
sRem.visit();
-
+
// check whether stream operator is present
// after MultiQueryOptimizer because it can shift streams from
// map to reduce, etc.
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
index 05f4ce0..9fc377a 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
@@ -19,13 +19,19 @@
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODemux;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMultiQueryPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.data.DataType;
@@ -34,24 +40,28 @@
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.ReverseDependencyOrderWalker;
import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.Pair;
import org.apache.pig.impl.plan.optimizer.OptimizerException;
import org.apache.pig.PigException;
+
/**
* An optimizer that merges all or part splittee MapReduceOpers into
- * splitter MapReduceOper. The merge can produce a MROperPlan that has
+ * splitter MapReduceOper.
+ * <p>
+ * The merge can produce a MROperPlan that has
* fewer MapReduceOpers than MapReduceOpers in the original MROperPlan.
- *
+ * <p>
* The MRCompler generates multiple MapReduceOpers whenever it encounters
* a split operator and connects the single splitter MapReduceOper to
* one or more splittee MapReduceOpers using store/load operators:
- *
+ * <p>
* ---- POStore (in splitter) -... ----
* | | ... |
* | | ... |
* POLoad POLoad ... POLoad (in splittees)
* | | |
- *
+ * <p>
* This optimizer merges those MapReduceOpers by replacing POLoad/POStore
* combination with POSplit operator.
*/
@@ -68,15 +78,24 @@
nig = NodeIdGenerator.getGenerator();
List<MapReduceOper> roots = plan.getRoots();
scope = roots.get(0).getOperatorKey().getScope();
+
+ log.info("MR plan size before optimization: " + plan.size());
}
@Override
+ public void visit() throws VisitorException {
+ super.visit();
+
+ log.info("MR plan size after optimization: " + mPlan.size());
+ }
+
+ @Override
public void visitMROp(MapReduceOper mr) throws VisitorException {
-
+
if (!mr.isSplitter()) {
return;
- }
-
+ }
+
// first classify all the splittees
List<MapReduceOper> mappers = new ArrayList<MapReduceOper>();
List<MapReduceOper> multiLoadMROpers = new ArrayList<MapReduceOper>();
@@ -86,23 +105,26 @@
for (MapReduceOper successor : successors) {
if (isMapOnly(successor)) {
if (isSingleLoadMapperPlan(successor.mapPlan)) {
- mappers.add(successor);
+ mappers.add(successor);
} else {
multiLoadMROpers.add(successor);
}
} else {
if (isSingleLoadMapperPlan(successor.mapPlan)) {
- mapReducers.add(successor);
+ mapReducers.add(successor);
} else {
- multiLoadMROpers.add(successor);
+ multiLoadMROpers.add(successor);
}
}
}
-
+
// case 1: exactly one splittee and it's map-only
if (mappers.size() == 1 && mapReducers.size() == 0
&& multiLoadMROpers.size() == 0 ) {
mergeOnlyMapperSplittee(mappers.get(0), mr);
+
+ log.info("Merged the only map-only splittee.");
+
return;
}
@@ -110,9 +132,15 @@
if (isMapOnly(mr) && mapReducers.size() == 1
&& mappers.size() == 0 && multiLoadMROpers.size() == 0) {
mergeOnlyMapReduceSplittee(mapReducers.get(0), mr);
+
+ log.info("Merged the only map-reduce splittee.");
+
return;
}
+ int numSplittees = successors.size();
+ int numMerges = 0;
+
PhysicalPlan splitterPl = isMapOnly(mr) ? mr.mapPlan : mr.reducePlan;
POStore storeOp = (POStore)splitterPl.getLeaves().get(0);
@@ -120,38 +148,32 @@
// case 3: multiple splittees and at least one of them is map-only
if (mappers.size() > 0) {
- splitOp = getSplit();
- mergeAllMapOnlySplittees(mappers, mr, splitOp);
+ splitOp = getSplit();
+ int n = mergeAllMapOnlySplittees(mappers, mr, splitOp);
+
+ log.info("Merged " + n + " map-only splittees.");
+
+ numMerges += n;
}
-
- boolean splitterMapOnly = isMapOnly(mr);
-
- // case 4: multiple splittees and at least one of them has reducer
- if (splitterMapOnly && mapReducers.size() > 0) {
-
- // pick one to merge, prefer one that has a combiner
- MapReduceOper mapReducer= mapReducers.get(0);
- for (MapReduceOper mro : mapReducers) {
- if (!mro.combinePlan.isEmpty()) {
- mapReducer = mro;
- break;
- }
- }
+ // case 4: multiple splittees and at least one of them has reducer
+ if (isMapOnly(mr) && mapReducers.size() > 0) {
+
PhysicalOperator leaf = splitterPl.getLeaves().get(0);
- splitOp = (leaf instanceof POStore) ?
- getSplit() : (POSplit)leaf;
+ splitOp = (leaf instanceof POStore) ? getSplit() : (POSplit)leaf;
- mergeSingleMapReduceSplittee(mapReducer, mr, splitOp);
+ int n = mergeMapReduceSplittees(mapReducers, mr, splitOp);
+
+ log.info("Merged " + n + " map-reduce splittees.");
+
+ numMerges += n;
}
-
+
// finally, add original store to the split operator
// if there is splittee that hasn't been merged
if (splitOp != null
- && ((multiLoadMROpers.size() > 0)
- || (mapReducers.size() > 1)
- || (!splitterMapOnly && mapReducers.size() > 0))) {
+ && (numMerges < numSplittees)) {
PhysicalPlan storePlan = new PhysicalPlan();
try {
@@ -161,13 +183,17 @@
int errCode = 2129;
String msg = "Internal Error. Unable to add store to the split plan for optimization.";
throw new OptimizerException(msg, errCode, PigException.BUG, e);
- }
+ }
}
+
+ log.info("Merged " + numMerges + " out of total "
+ + numSplittees + " splittees.");
}
-
+
private void mergeOneMapPart(MapReduceOper mapper, MapReduceOper splitter)
throws VisitorException {
- PhysicalPlan splitterPl = isMapOnly(splitter) ? splitter.mapPlan : splitter.reducePlan;
+ PhysicalPlan splitterPl = isMapOnly(splitter) ?
+ splitter.mapPlan : splitter.reducePlan;
POStore storeOp = (POStore)splitterPl.getLeaves().get(0);
List<PhysicalOperator> storePreds = splitterPl.getPredecessors(storeOp);
@@ -203,14 +229,14 @@
}
}
- private void mergeOnlyMapperSplittee(MapReduceOper mapper, MapReduceOper splitter)
- throws VisitorException {
+ private void mergeOnlyMapperSplittee(MapReduceOper mapper,
+ MapReduceOper splitter) throws VisitorException {
mergeOneMapPart(mapper, splitter);
removeAndReconnect(mapper, splitter);
}
- private void mergeOnlyMapReduceSplittee(MapReduceOper mapReducer, MapReduceOper splitter)
- throws VisitorException {
+ private void mergeOnlyMapReduceSplittee(MapReduceOper mapReducer,
+ MapReduceOper splitter) throws VisitorException {
mergeOneMapPart(mapReducer, splitter);
splitter.setMapDone(true);
@@ -220,7 +246,7 @@
removeAndReconnect(mapReducer, splitter);
}
- private void mergeAllMapOnlySplittees(List<MapReduceOper> mappers,
+ private int mergeAllMapOnlySplittees(List<MapReduceOper> mappers,
MapReduceOper splitter, POSplit splitOp) throws VisitorException {
PhysicalPlan splitterPl = isMapOnly(splitter) ?
@@ -229,22 +255,16 @@
List<PhysicalOperator> storePreds = splitterPl.getPredecessors(storeOp);
// merge splitee's map plans into nested plan of
- // the splitter operator
+ // the split operator
for (MapReduceOper mapper : mappers) {
PhysicalPlan pl = mapper.mapPlan;
PhysicalOperator load = pl.getRoots().get(0);
- pl.remove(load);
- try {
- splitOp.addPlan(pl);
- } catch (PlanException e) {
- int errCode = 2130;
- String msg = "Internal Error. Unable to merge split plans for optimization.";
- throw new OptimizerException(msg, errCode, PigException.BUG, e);
- }
+ pl.remove(load);
+ splitOp.addPlan(pl);
}
// replace store operator in the splitter with split operator
- splitOp.setInputs(storePreds);
+ splitOp.setInputs(storePreds);
try {
splitterPl.replace(storeOp, splitOp);;
} catch (PlanException e) {
@@ -257,6 +277,406 @@
for (MapReduceOper mapper : mappers) {
removeAndReconnect(mapper, splitter);
}
+
+ return mappers.size();
+ }
+
+ private boolean isSplitteeMergeable(MapReduceOper splittee) {
+
+ // cannot be global sort or limit after sort, they are
+ // using a different partitioner
+ if (splittee.isGlobalSort() || splittee.isLimitAfterSort()) {
+ log.info("Cannot merge this splittee: " +
+ "it is global sort or limit after sort");
+ return false;
+ }
+
+ // check the plan leaf: only merge local rearrange or split
+ PhysicalOperator leaf = splittee.mapPlan.getLeaves().get(0);
+ if (!(leaf instanceof POLocalRearrange) &&
+ ! (leaf instanceof POSplit)) {
+ log.info("Cannot merge this splittee: " +
+ "its map plan doesn't end with LR or Split operator: "
+ + leaf.getClass().getName());
+ return false;
+ }
+
+ // cannot have distinct combiner, it uses a different combiner
+ if (splittee.needsDistinctCombiner()) {
+ log.info("Cannot merge this splittee: " +
+ "it has distinct combiner.");
+ return false;
+ }
+
+ return true;
+ }
+
+ private List<MapReduceOper> getMergeList(List<MapReduceOper> mapReducers) {
+ List<MapReduceOper> mergeNoCmbList = new ArrayList<MapReduceOper>();
+ List<MapReduceOper> mergeCmbList = new ArrayList<MapReduceOper>();
+
+ for (MapReduceOper mrOp : mapReducers) {
+ if (isSplitteeMergeable(mrOp)) {
+ if (mrOp.combinePlan.isEmpty()) {
+ mergeNoCmbList.add(mrOp);
+ } else {
+ mergeCmbList.add(mrOp);
+ }
+ }
+ }
+ return (mergeNoCmbList.size() > mergeCmbList.size()) ?
+ mergeNoCmbList : mergeCmbList;
+ }
+
+ private int mergeMapReduceSplittees(List<MapReduceOper> mapReducers,
+ MapReduceOper splitter, POSplit splitOp) throws VisitorException {
+
+ List<MapReduceOper> mergeList = getMergeList(mapReducers);
+
+ if (mergeList.size() <= 1) {
+
+ // chose one to merge, prefer the one with a combiner
+ MapReduceOper mapReducer = mapReducers.get(0);
+ for (MapReduceOper mro : mapReducers) {
+ if (!mro.combinePlan.isEmpty()) {
+ mapReducer = mro;
+ break;
+ }
+ }
+ mergeList.clear();
+ mergeList.add(mapReducer);
+ }
+
+ if (mergeList.size() == 1) {
+ mergeSingleMapReduceSplittee(mergeList.get(0), splitter, splitOp);
+ } else {
+ mergeAllMapReduceSplittees(mergeList, splitter, splitOp);
+ }
+
+ return mergeList.size();
+ }
+
+ private boolean hasSameMapKeyType(List<MapReduceOper> splittees) {
+ boolean sameKeyType = true;
+ for (MapReduceOper outer : splittees) {
+ for (MapReduceOper inner : splittees) {
+ if (inner.mapKeyType != outer.mapKeyType) {
+ sameKeyType = false;
+ break;
+ }
+ }
+ if (!sameKeyType) break;
+ }
+
+ return sameKeyType;
+ }
+
+ private int setIndexOnLRInSplit(int initial, POSplit splitOp)
+ throws VisitorException {
+ int index = initial;
+
+ List<PhysicalPlan> pls = splitOp.getPlans();
+ for (PhysicalPlan pl : pls) {
+ PhysicalOperator leaf = pl.getLeaves().get(0);
+ if (leaf instanceof POLocalRearrange) {
+ POLocalRearrange lr = (POLocalRearrange)leaf;
+ try {
+ lr.setMultiQueryIndex(index++);
+ } catch (ExecException e) {
+ int errCode = 2136;
+ String msg = "Internal Error. Unable to set multi-query index for optimization.";
+ throw new OptimizerException(msg, errCode, PigException.BUG, e);
+ }
+ } else if (leaf instanceof POSplit) {
+ POSplit spl = (POSplit)leaf;
+ index = setIndexOnLRInSplit(index, spl);
+ }
+ }
+
+ return index;
+ }
+
+ private int mergeOneMapPlanWithIndex(PhysicalPlan pl, POSplit splitOp,
+ int index, boolean sameKeyType) throws VisitorException {
+ PhysicalOperator load = pl.getRoots().get(0);
+ pl.remove(load);
+
+ int curIndex = index;
+
+ PhysicalOperator leaf = pl.getLeaves().get(0);
+ if (leaf instanceof POLocalRearrange) {
+ POLocalRearrange lr = (POLocalRearrange)leaf;
+ try {
+ lr.setMultiQueryIndex(curIndex++);
+ } catch (ExecException e) {
+ int errCode = 2136;
+ String msg = "Internal Error. Unable to set multi-query index for optimization.";
+ throw new OptimizerException(msg, errCode, PigException.BUG, e);
+ }
+
+ // change the map key type to tuple when
+ // multiple splittees have different map key types
+ if (!sameKeyType) {
+ lr.setKeyType(DataType.TUPLE);
+ }
+ } else if (leaf instanceof POSplit) {
+ POSplit spl = (POSplit)leaf;
+ curIndex = setIndexOnLRInSplit(index, spl);
+ }
+
+ splitOp.addPlan(pl);
+
+ return curIndex;
+ }
+
+ private int setBaseIndexOnDemux(int initial, PODemux demuxOp)
+ throws VisitorException {
+ int index = initial;
+ demuxOp.setBaseIndex(index++);
+
+ List<PhysicalPlan> pls = demuxOp.getPlans();
+ for (PhysicalPlan pl : pls) {
+ PhysicalOperator leaf = pl.getLeaves().get(0);
+ if (leaf instanceof POLocalRearrange) {
+ POLocalRearrange lr = (POLocalRearrange)leaf;
+ try {
+ lr.setMultiQueryIndex(initial + lr.getIndex());
+ } catch (ExecException e) {
+ int errCode = 2136;
+ String msg = "Internal Error. Unable to set multi-query index for optimization.";
+ throw new OptimizerException(msg, errCode, PigException.BUG, e);
+ }
+ }
+ PhysicalOperator root = pl.getRoots().get(0);
+ if (root instanceof PODemux) {
+ index = setBaseIndexOnDemux(index, (PODemux)root);
+ } else {
+ index++;
+ }
+ }
+ return index;
+ }
+
+ private int setBaseIndexOnPackage(int initial, POMultiQueryPackage pkgOp) {
+ int index = initial;
+ pkgOp.setBaseIndex(index++);
+
+ List<POPackage> pkgs = pkgOp.getPackages();
+ for (POPackage pkg : pkgs) {
+ if (pkg instanceof POMultiQueryPackage) {
+ POMultiQueryPackage mpkg = (POMultiQueryPackage)pkg;
+ index = setBaseIndexOnPackage(index, mpkg);
+ } else {
+ index++;
+ }
+ }
+ return index;
+ }
+
+ private void mergeOneReducePlanWithIndex(PhysicalPlan from,
+ PhysicalPlan to, int initial, int current) throws VisitorException {
+ POPackage pk = (POPackage)from.getRoots().get(0);
+ from.remove(pk);
+
+ // XXX the index of the original keyInfo map is always 0,
+ // we need to shift the index so that the lookups works
+ // with the new indexed key
+ Map<Integer, Pair<Boolean, Map<Integer, Integer>>> keyInfo = pk.getKeyInfo();
+ if (keyInfo != null && keyInfo.size() > 0) {
+ byte b = (byte)(initial | 0x80);
+ keyInfo.put(new Integer(b), keyInfo.get(0));
+ }
+
+ if (pk instanceof POMultiQueryPackage) {
+ POMultiQueryPackage mpkg = (POMultiQueryPackage)pk;
+ setBaseIndexOnPackage(initial, mpkg);
+ }
+
+ PhysicalOperator root = from.getRoots().get(0);
+ if (root instanceof PODemux) {
+ PODemux demux = (PODemux)root;
+ setBaseIndexOnDemux(initial, demux);
+ }
+
+ POMultiQueryPackage pkg = (POMultiQueryPackage)to.getRoots().get(0);
+ for (int i=initial; i<current; i++) {
+ pkg.addPackage(pk);
+ }
+
+ PODemux demux = (PODemux)to.getLeaves().get(0);
+ for (int i=initial; i<current; i++) {
+ demux.addPlan(from);
+ }
+
+ if (demux.isSameMapKeyType()) {
+ pkg.setKeyType(pk.getKeyType());
+ } else {
+ pkg.setKeyType(DataType.TUPLE);
+ }
+ }
+
+ private void mergeOneCombinePlanWithIndex(PhysicalPlan from,
+ PhysicalPlan to, int initial, int current) throws VisitorException {
+ POPackage cpk = (POPackage)from.getRoots().get(0);
+ from.remove(cpk);
+
+ if (cpk instanceof POMultiQueryPackage) {
+ POMultiQueryPackage mpkg = (POMultiQueryPackage)cpk;
+ setBaseIndexOnPackage(initial, mpkg);
+ }
+
+ PODemux demux = (PODemux)to.getLeaves().get(0);
+
+ boolean isSameKeyType = demux.isSameMapKeyType();
+
+ PhysicalOperator leaf = from.getLeaves().get(0);
+ if (leaf instanceof POLocalRearrange) {
+ POLocalRearrange clr = (POLocalRearrange)leaf;
+ try {
+ clr.setMultiQueryIndex(initial);
+ } catch (ExecException e) {
+ int errCode = 2136;
+ String msg = "Internal Error. Unable to set multi-query index for optimization.";
+ throw new OptimizerException(msg, errCode, PigException.BUG, e);
+ }
+
+ // change the map key type to tuple when
+ // multiple splittees have different map key types
+ if (!isSameKeyType) {
+ clr.setKeyType(DataType.TUPLE);
+ }
+ } else if (leaf instanceof PODemux) {
+ PODemux locDemux = (PODemux)leaf;
+ setBaseIndexOnDemux(initial, locDemux);
+ }
+
+ POMultiQueryPackage pkg = (POMultiQueryPackage)to.getRoots().get(0);
+ for (int i=initial; i<current; i++) {
+ pkg.addPackage(cpk);
+ }
+
+ // all packages should have the same key type
+ if (!isSameKeyType) {
+ cpk.setKeyType(DataType.TUPLE);
+ }
+
+ pkg.setKeyType(cpk.getKeyType());
+
+ for (int i=initial; i<current; i++) {
+ demux.addPlan(from);
+ }
+ }
+
+ private boolean needCombiner(List<MapReduceOper> mapReducers) {
+ boolean needCombiner = false;
+ for (MapReduceOper mrOp : mapReducers) {
+ if (!mrOp.combinePlan.isEmpty()) {
+ needCombiner = true;
+ break;
+ }
+ }
+ return needCombiner;
+ }
+
+ private PhysicalPlan createDemuxPlan(boolean sameKeyType, boolean isCombiner)
+ throws VisitorException {
+ PODemux demux = getDemux(sameKeyType, isCombiner);
+ POMultiQueryPackage pkg= getMultiQueryPackage();
+
+ PhysicalPlan pl = new PhysicalPlan();
+ pl.add(pkg);
+ try {
+ pl.addAsLeaf(demux);
+ } catch (PlanException e) {
+ int errCode = 2137;
+ String msg = "Internal Error. Unable to add demux to the plan as leaf for optimization.";
+ throw new OptimizerException(msg, errCode, PigException.BUG, e);
+ }
+ return pl;
+ }
+
+ private void mergeAllMapReduceSplittees(List<MapReduceOper> mergeList,
+ MapReduceOper splitter, POSplit splitOp) throws VisitorException {
+
+ boolean sameKeyType = hasSameMapKeyType(mergeList);
+
+ log.info("Splittees have the same key type: " + sameKeyType);
+
+ // create a new reduce plan that will be the container
+ // for the multiple reducer plans of the MROpers in the mergeList
+ PhysicalPlan redPl = createDemuxPlan(sameKeyType, false);
+
+ // create a new combine plan that will be the container
+ // for the multiple combiner plans of the MROpers in the mergeList
+ PhysicalPlan comPl = needCombiner(mergeList) ?
+ createDemuxPlan(sameKeyType, true) : null;
+
+ log.info("Splittees have combiner: " + (comPl != null));
+
+ int index = 0;
+
+ for (MapReduceOper mrOp : mergeList) {
+
+ // merge the map plan
+ int incIndex = mergeOneMapPlanWithIndex(
+ mrOp.mapPlan, splitOp, index, sameKeyType);
+
+ // merge the combiner plan
+ if (comPl != null) {
+ if (!mrOp.combinePlan.isEmpty()) {
+ mergeOneCombinePlanWithIndex(
+ mrOp.combinePlan, comPl, index, incIndex);
+ } else {
+ int errCode = 2141;
+ String msg = "Internal Error. Cannot merge non-combiner with combiners for optimization.";
+ throw new OptimizerException(msg, errCode, PigException.BUG);
+ }
+ }
+
+ // merge the reducer plan
+ mergeOneReducePlanWithIndex(
+ mrOp.reducePlan, redPl, index, incIndex);
+
+ index = incIndex;
+
+ log.info("Merged MR job " + mrOp.getOperatorKey().getId()
+ + " into MR job " + splitter.getOperatorKey().getId());
+ }
+
+ PhysicalPlan splitterPl = splitter.mapPlan;
+ PhysicalOperator leaf = splitterPl.getLeaves().get(0);
+ PhysicalOperator storeOp = splitterPl.getLeaves().get(0);
+ List<PhysicalOperator> storePreds = splitterPl.getPredecessors(storeOp);
+
+ // replace store operator in the splitter with split operator
+ if (leaf instanceof POStore) {
+ splitOp.setInputs(storePreds);
+ try {
+ splitterPl.replace(storeOp, splitOp);;
+ } catch (PlanException e) {
+ int errCode = 2132;
+ String msg = "Internal Error. Unable to replace store with split operator for optimization.";
+ throw new OptimizerException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ splitter.setMapDone(true);
+ splitter.reducePlan = redPl;
+ splitter.setReduceDone(true);
+
+ if (comPl != null) {
+ splitter.combinePlan = comPl;
+ }
+
+ for (MapReduceOper mrOp : mergeList) {
+ removeAndReconnect(mrOp, splitter);
+ }
+
+ splitter.mapKeyType = sameKeyType ?
+ mergeList.get(0).mapKeyType : DataType.TUPLE;
+
+ log.info("Requested parallelism of splitter: "
+ + splitter.getRequestedParallelism());
}
private void mergeSingleMapReduceSplittee(MapReduceOper mapReduce,
@@ -270,13 +690,8 @@
PhysicalPlan pl = mapReduce.mapPlan;
PhysicalOperator load = pl.getRoots().get(0);
pl.remove(load);
- try {
- splitOp.addPlan(pl);
- } catch (PlanException e) {
- int errCode = 2130;
- String msg = "Internal Error. Unable to merge split plans for optimization.";
- throw new OptimizerException(msg, errCode, PigException.BUG, e);
- }
+
+ splitOp.addPlan(pl);
splitter.setMapDone(true);
splitter.reducePlan = mapReduce.reducePlan;
@@ -301,6 +716,7 @@
/**
* Removes the specified MR operator from the plan after the merge.
* Connects its predecessors and successors to the merged MR operator
+ *
* @param mr the MR operator to remove
* @param newMR the MR operator to be connected to the predecessors and
* the successors of the removed operator
@@ -385,12 +801,21 @@
}
private boolean isSingleLoadMapperPlan(PhysicalPlan pl) {
- List<PhysicalOperator> roots = pl.getRoots();
- return (roots.size() == 1);
+ return (pl.getRoots().size() == 1);
}
private POSplit getSplit(){
- POSplit sp = new POSplit(new OperatorKey(scope, nig.getNextNodeId(scope)));
- return sp;
+ return new POSplit(new OperatorKey(scope, nig.getNextNodeId(scope)));
}
+
+ private PODemux getDemux(boolean sameMapKeyType, boolean inCombiner){
+ PODemux demux = new PODemux(new OperatorKey(scope, nig.getNextNodeId(scope)));
+ demux.setSameMapKeyType(sameMapKeyType);
+ demux.setInCombiner(inCombiner);
+ return demux;
+ }
+
+ private POMultiQueryPackage getMultiQueryPackage(){
+ return new POMultiQueryPackage(new OperatorKey(scope, nig.getNextNodeId(scope)));
+ }
}
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java
index 660e965..6877bfb 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java
@@ -93,6 +93,12 @@
super.visitSplit(spl);
spl.setParentPlan(parent);
}
+
+ @Override
+ public void visitDemux(PODemux demux) throws VisitorException{
+ super.visitDemux(demux);
+ demux.setParentPlan(parent);
+ }
@Override
public void visitDistinct(PODistinct distinct) throws VisitorException {
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
index e399cf5..d336653 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
@@ -80,6 +80,10 @@
public void visitCombinerPackage(POCombinerPackage pkg) throws VisitorException{
//do nothing
}
+
+ public void visitMultiQueryPackage(POMultiQueryPackage pkg) throws VisitorException{
+ //do nothing
+ }
public void visitPOForEach(POForEach nfe) throws VisitorException {
List<PhysicalPlan> inpPlans = nfe.getInputPlans();
@@ -103,6 +107,15 @@
}
}
+ public void visitDemux(PODemux demux) throws VisitorException{
+ List<PhysicalPlan> plans = demux.getPlans();
+ for (PhysicalPlan plan : plans) {
+ pushWalker(mCurrentWalker.spawnChildWalker(plan));
+ visit();
+ popWalker();
+ }
+ }
+
public void visitDistinct(PODistinct distinct) throws VisitorException {
//do nothing
}
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java
index d2c6ebc..ad7f9ed 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java
@@ -163,6 +163,15 @@
else if (node instanceof POSplit) {
sb.append(planString(((POSplit)node).getPlans()));
}
+ else if (node instanceof PODemux) {
+ sb.append(planString(((PODemux)node).getPlans()));
+ }
+ else if (node instanceof POMultiQueryPackage) {
+ List<POPackage> pkgs = ((POMultiQueryPackage)node).getPackages();
+ for (POPackage pkg : pkgs) {
+ sb.append(LSep + pkg.name() + "\n");
+ }
+ }
else if(node instanceof POFRJoin){
POFRJoin frj = (POFRJoin)node;
List<List<PhysicalPlan>> joinPlans = frj.getJoinPlans();
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java
new file mode 100644
index 0000000..9e90c3b
--- /dev/null
+++ b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * The MapReduce Demultiplexer operator.
+ * <p>
+ * This operator is used when merging multiple Map-Reduce splittees
+ * into a Map-only splitter during multi-query optimization.
+ * The reduce physical plans of the splittees become the inner plans
+ * of this operator.
+ * <p>
+ * Due to the recursive nature of multi-query optimization, this operator
+ * may be contained in another demux operator.
+ * <p>
+ * The predecessor of this operator must be a POMultiQueryPackage
+ * operator which passes the index (indicating which inner reduce plan to run)
+ * along with other data to this operator.
+ */
+public class PODemux extends PhysicalOperator {
+
+ private static final long serialVersionUID = 1L;
+
+ private static int idxPart = 0x7F;
+
+ private static Result empty = new Result(POStatus.STATUS_NULL, null);
+
+ private static Result eop = new Result(POStatus.STATUS_EOP, null);
+
+ private Log log = LogFactory.getLog(getClass());
+
+ /*
+ * The base index of this demux. In the case of
+ * a demux contained in another demux, the index
+ * passed in must be shifted before it can be used.
+ */
+ private int baseIndex = 0;
+
+ /*
+ * The list of sub-plans the inner plan is composed of
+ */
+ private ArrayList<PhysicalPlan> myPlans = new ArrayList<PhysicalPlan>();
+
+ /*
+ * Flag indicating when a new pull should start
+ */
+ private boolean getNext = true;
+
+ /*
+ * Flag indicating when a new pull should start.
+ * It's used only when receiving the call
+ * from reducer's close() method in the streaming case.
+ */
+ private boolean inpEOP = false;
+
+ /*
+ * The leaf of the current pipeline
+ */
+ private PhysicalOperator curLeaf = null;
+
+ /*
+ * Indicating if all the inner plans have the same
+ * map key type. If not, the keys passed in are
+ * wrapped inside tuples and need to be extracted
+ * out during the reduce phase
+ */
+ private boolean sameMapKeyType = true;
+
+ /*
+ * Indicating if this operator is in a combiner.
+ * If not, this operator is in a reducer and the key
+ * values must first be extracted from the tuple-wrap
+ * before writing out to the disk
+ */
+ private boolean inCombiner = false;
+
+ BitSet processedSet = new BitSet();
+
+ /**
+ * Constructs an operator with the specified key.
+ *
+ * @param k the operator key
+ */
+ public PODemux(OperatorKey k) {
+ this(k, -1, null);
+ }
+
+ /**
+ * Constructs an operator with the specified key
+ * and degree of parallelism.
+ *
+ * @param k the operator key
+ * @param rp the degree of parallelism requested
+ */
+ public PODemux(OperatorKey k, int rp) {
+ this(k, rp, null);
+ }
+
+ /**
+ * Constructs an operator with the specified key and inputs.
+ *
+ * @param k the operator key
+ * @param inp the inputs that this operator will read data from
+ */
+ public PODemux(OperatorKey k, List<PhysicalOperator> inp) {
+ this(k, -1, inp);
+ }
+
+ /**
+ * Constructs an operator with the specified key,
+ * degree of parallelism and inputs.
+ *
+ * @param k the operator key
+ * @param rp the degree of parallelism requested
+ * @param inp the inputs that this operator will read data from
+ */
+ public PODemux(OperatorKey k, int rp, List<PhysicalOperator> inp) {
+ super(k, rp, inp);
+ }
+
+ @Override
+ public void visit(PhyPlanVisitor v) throws VisitorException {
+ v.visitDemux(this);
+ }
+
+ @Override
+ public String name() {
+ return "Demux - " + mKey.toString();
+ }
+
+ @Override
+ public boolean supportsMultipleInputs() {
+ return false;
+ }
+
+ @Override
+ public boolean supportsMultipleOutputs() {
+ return false;
+ }
+
+ /**
+ * Sets the base index of this demux.
+ *
+ * @param idx the base index
+ */
+ public void setBaseIndex(int idx) {
+ baseIndex = idx;
+ }
+
+ /**
+ * Returns the base index of this demux
+ *
+ * @return the base index
+ */
+ public int getBaseIndex() {
+ return baseIndex;
+ }
+
+ /**
+ * Returns the list of inner plans.
+ *
+ * @return the list of the nested plans
+ */
+ public List<PhysicalPlan> getPlans() {
+ return myPlans;
+ }
+
+ /**
+ * Appends the specified plan at the end of the list.
+ *
+ * @param inPlan plan to be appended to the inner plan list
+ */
+ public void addPlan(PhysicalPlan inPlan) {
+ myPlans.add(inPlan);
+ processedSet.set(myPlans.size()-1);
+ }
+
+ @Override
+ public Result getNext(Tuple t) throws ExecException {
+
+ if (!inCombiner && this.parentPlan.endOfAllInput) {
+
+ // If there is a stream in one of the inner plans,
+ // there could potentially be more to process - the
+ // reducer sets the flag stating that all map input has
+ // been sent already and runs the pipeline one more time
+ // in its close() call.
+ return getStreamCloseResult();
+
+ } else {
+
+ if (getNext) {
+
+ Result inp = processInput();
+
+ if (inp.returnStatus == POStatus.STATUS_EOP) {
+ return inp;
+ }
+
+ curLeaf = attachInputWithIndex((Tuple)inp.result);
+
+ getNext = false;
+ }
+
+ return runPipeline(curLeaf);
+ }
+ }
+
+ private Result runPipeline(PhysicalOperator leaf) throws ExecException {
+
+ Result res = null;
+
+ while (true) {
+
+ res = leaf.getNext(dummyTuple);
+
+ if (res.returnStatus == POStatus.STATUS_OK ||
+ res.returnStatus == POStatus.STATUS_EOP ||
+ res.returnStatus == POStatus.STATUS_ERR) {
+ break;
+ } else if (res.returnStatus == POStatus.STATUS_NULL) {
+ continue;
+ }
+ }
+
+ if (res.returnStatus == POStatus.STATUS_EOP) {
+ getNext = true;
+ }
+
+ return (res.returnStatus == POStatus.STATUS_OK) ? res : empty;
+ }
+
+ private Result getStreamCloseResult() throws ExecException {
+ Result res = null;
+
+ while (true) {
+
+ if (processedSet.cardinality() == myPlans.size()) {
+ curLeaf = null;
+ Result inp = processInput();
+ if (inp.returnStatus == POStatus.STATUS_OK) {
+ attachInputWithIndex((Tuple)inp.result);
+ inpEOP = false;
+ } else if (inp.returnStatus == POStatus.STATUS_EOP){
+ inpEOP = true;
+ } else if (inp.returnStatus == POStatus.STATUS_NULL) {
+ inpEOP = false;
+ } else if (inp.returnStatus == POStatus.STATUS_ERR) {
+ return inp;
+ }
+ processedSet.clear();
+ }
+
+ int idx = processedSet.nextClearBit(0);
+ PhysicalOperator leaf = myPlans.get(idx).getLeaves().get(0);
+
+ // a nested demux object is stored in multiple positions
+ // of the inner plan list, corresponding to the indexes of
+ // its inner plans; skip the object if it's already processed.
+ if (curLeaf != null && leaf.getOperatorKey().equals(curLeaf.getOperatorKey())) {
+ processedSet.set(idx++);
+ if (idx < myPlans.size()) {
+ continue;
+ } else {
+ res = eop;
+ }
+ } else {
+ curLeaf = leaf;
+ res = leaf.getNext(dummyTuple);
+
+ if (res.returnStatus == POStatus.STATUS_EOP) {
+ processedSet.set(idx++);
+ if (idx < myPlans.size()) {
+ continue;
+ }
+ } else {
+ break;
+ }
+
+ }
+
+ if (!inpEOP && res.returnStatus == POStatus.STATUS_EOP) {
+ continue;
+ } else {
+ break;
+ }
+ }
+
+ return res;
+ }
+
+ private PhysicalOperator attachInputWithIndex(Tuple res) throws ExecException {
+
+ // unwrap the key to get the wrapped value which
+ // is expected by the inner plans
+ PigNullableWritable key = (PigNullableWritable)res.get(0);
+
+ // choose an inner plan to run based on the index set by
+ // the POLocalRearrange operator and passed to this operator
+ // by POMultiQueryPackage
+ int index = key.getIndex();
+ index &= idxPart;
+ index -= baseIndex;
+
+ PhysicalPlan pl = myPlans.get(index);
+ if (!(pl.getRoots().get(0) instanceof PODemux)) {
+ if (!sameMapKeyType & !inCombiner) {
+ Tuple tup = (Tuple)key.getValueAsPigType();
+ res.set(0, tup.get(0));
+ } else {
+ res.set(0, key.getValueAsPigType());
+ }
+ }
+
+ myPlans.get(index).attachInput(res);
+ return myPlans.get(index).getLeaves().get(0);
+ }
+
+ /**
+ * Sets a flag indicating if all inner plans have
+ * the same map key type.
+ *
+ * @param sameMapKeyType true if all inner plans have
+ * the same map key type; otherwise false
+ */
+ public void setSameMapKeyType(boolean sameMapKeyType) {
+ this.sameMapKeyType = sameMapKeyType;
+ }
+
+ /**
+ * Returns a flag indicating if all inner plans
+ * have the same map key type
+ *
+ * @return true if all inner plans have
+ * the same map key type; otherwise false
+ */
+ public boolean isSameMapKeyType() {
+ return sameMapKeyType;
+ }
+
+ /**
+ * Sets a flag indicating if this operator is
+ * in a combiner.
+ *
+ * @param inCombiner true if this operator is in
+ * a combiner; false if this operator is in a reducer
+ */
+ public void setInCombiner(boolean inCombiner) {
+ this.inCombiner = inCombiner;
+ }
+
+ /**
+ * Returns a flag indicating if this operator is
+ * in a combiner.
+ *
+ * @return true if this operator is in a combiner;
+ * otherwise this operator is in a reducer
+ */
+ public boolean isInCombiner() {
+ return inCombiner;
+ }
+
+}
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
index 24fd257..fc17e9b 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
@@ -154,18 +154,41 @@
return index;
}
+ /**
+ * Sets the co-group index of this operator
+ *
+ * @param index the position of this operator in
+ * a co-group operation
+ * @throws ExecException if the index value is bigger then 0x7F
+ */
public void setIndex(int index) throws ExecException {
- if (index > 0x7F) {
- int errCode = 1082;
- String msg = "Cogroups with more than 127 inputs "
- + " not supported.";
- throw new ExecException(msg, errCode, PigException.INPUT);
- } else {
- this.index = (byte)index;
- }
- lrOutput.set(0, new Byte(this.index));
+ setIndex(index, false);
}
+ /**
+ * Sets the multi-query index of this operator
+ *
+ * @param index the position of the parent plan of this operator
+ * in the enclosed split operator
+ * @throws ExecException if the index value is bigger then 0x7F
+ */
+ public void setMultiQueryIndex(int index) throws ExecException {
+ setIndex(index, true);
+ }
+
+ private void setIndex(int index, boolean multiQuery) throws ExecException {
+ if (index > 0x7F) {
+ int errCode = 1082;
+ String msg = multiQuery?
+ "Merge more than 127 map-reduce jobs not supported."
+ : "Cogroups with more than 127 inputs not supported.";
+ throw new ExecException(msg, errCode, PigException.INPUT);
+ } else {
+ this.index = multiQuery ? (byte)(index | 0x80) : (byte)index;
+ }
+ lrOutput.set(0, new Byte(this.index));
+ }
+
public boolean isDistinct() {
return mIsDistinct;
}
@@ -255,12 +278,26 @@
protected Tuple constructLROutput(List<Result> resLst, Tuple value) throws ExecException{
//Construct key
Object key;
+
if(resLst.size()>1){
Tuple t = mTupleFactory.newTuple(resLst.size());
int i=-1;
for(Result res : resLst)
t.set(++i, res.result);
- key = t;
+ key = t;
+ } else if (resLst.size() == 1 && keyType == DataType.TUPLE) {
+
+ // We get here after merging multiple jobs that have different
+ // map key types into a single job during multi-query optimization.
+ // If the key isn't a tuple, it must be wrapped in a tuple.
+ Object obj = resLst.get(0).result;
+ if (obj instanceof Tuple) {
+ key = (Tuple)obj;
+ } else {
+ Tuple t = mTupleFactory.newTuple(1);
+ t.set(0, resLst.get(0).result);
+ key = t;
+ }
}
else{
key = resLst.get(0).result;
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java
new file mode 100644
index 0000000..c3df038
--- /dev/null
+++ b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * The package operator that packages the globally rearranged tuples
+ * into output format as required by multi-query de-multiplexer.
+ * <p>
+ * This operator is used when merging multiple Map-Reduce splittees
+ * into a Map-only splitter during multi-query optimization.
+ * The package operators of the reduce plans of the splittees form an
+ * indexed package list inside this operator. When this operator
+ * receives an input, it extracts the index from the key and calls the
+ * corresponding package to get the output data.
+ * <p>
+ * Due to the recursive nature of multi-query optimization, this operator
+ * may be contained in another multi-query packager.
+ * <p>
+ * The successor of this operator must be a PODemux operator which
+ * knows how to consume the output of this operator.
+ */
+public class POMultiQueryPackage extends POPackage {
+
+ private static final long serialVersionUID = 1L;
+
+ private static int idxPart = 0x7F;
+
+ private final Log log = LogFactory.getLog(getClass());
+
+ private List<POPackage> packages = new ArrayList<POPackage>();
+
+ private PigNullableWritable myKey;
+
+ private int baseIndex = 0;
+
+ /**
+ * Constructs an operator with the specified key.
+ *
+ * @param k the operator key
+ */
+ public POMultiQueryPackage(OperatorKey k) {
+ this(k, -1, null);
+ }
+
+ /**
+ * Constructs an operator with the specified key
+ * and degree of parallelism.
+ *
+ * @param k the operator key
+ * @param rp the degree of parallelism requested
+ */
+ public POMultiQueryPackage(OperatorKey k, int rp) {
+ this(k, rp, null);
+ }
+
+ /**
+ * Constructs an operator with the specified key and inputs.
+ *
+ * @param k the operator key
+ * @param inp the inputs that this operator will read data from
+ */
+ public POMultiQueryPackage(OperatorKey k, List<PhysicalOperator> inp) {
+ this(k, -1, inp);
+ }
+
+ /**
+ * Constructs an operator with the specified key,
+ * degree of parallelism and inputs.
+ *
+ * @param k the operator key
+ * @param rp the degree of parallelism requested
+ * @param inp the inputs that this operator will read data from
+ */
+ public POMultiQueryPackage(OperatorKey k, int rp, List<PhysicalOperator> inp) {
+ super(k, rp, inp);
+ }
+
+ @Override
+ public String name() {
+ return "MultiQuery Package - " + getOperatorKey().toString();
+ }
+
+ @Override
+ public boolean supportsMultipleInputs() {
+ return false;
+ }
+
+ @Override
+ public void visit(PhyPlanVisitor v) throws VisitorException {
+ v.visitMultiQueryPackage(this);
+ }
+
+ @Override
+ public boolean supportsMultipleOutputs() {
+ return false;
+ }
+
+ @Override
+ public void attachInput(PigNullableWritable k, Iterator<NullableTuple> inp) {
+ tupIter = inp;
+ myKey = k;
+ }
+
+ @Override
+ public void detachInput() {
+ tupIter = null;
+ myKey = null;
+ }
+
+ /**
+ * Appends the specified package object to the end of
+ * the package list.
+ *
+ * @param pack package to be appended to the list
+ */
+ public void addPackage(POPackage pack) {
+ packages.add(pack);
+ }
+
+ /**
+ * Returns the list of packages.
+ *
+ * @return the list of the packages
+ */
+ public List<POPackage> getPackages() {
+ return packages;
+ }
+
+ /**
+ * Constructs the output tuple from the inputs.
+ * <p>
+ * The output is consumed by for the demultiplexer operator
+ * (PODemux) in the format (key, {bag of tuples}) where key
+ * is an indexed WritableComparable, not the wrapped value as a pig type.
+ */
+ @Override
+ public Result getNext(Tuple t) throws ExecException {
+
+ int index = myKey.getIndex();
+ index &= idxPart;
+ index -= baseIndex;
+
+ if (index >= packages.size() || index < 0) {
+ int errCode = 2140;
+ String msg = "Invalid package index " + index
+ + " should be in the range between 0 and " + packages.size();
+ throw new ExecException(msg, errCode, PigException.BUG);
+ }
+
+ POPackage pack = packages.get(index);
+
+ pack.attachInput(myKey, tupIter);
+
+ Result res = pack.getNext(t);
+
+ Tuple tuple = (Tuple)res.result;
+
+ // replace the wrapped value in the key with the key itself
+ tuple.set(0, myKey);
+
+ return res;
+ }
+
+ /**
+ * Sets the base index of this operator
+ *
+ * @param baseIndex the base index of this operator
+ */
+ public void setBaseIndex(int baseIndex) {
+ this.baseIndex = baseIndex;
+ }
+
+ /**
+ * Returns the base index of this operator
+ *
+ * @return the base index of this operator
+ */
+ public int getBaseIndex() {
+ return baseIndex;
+ }
+}
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
index d148f45..afba7d9 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
@@ -206,8 +206,18 @@
while (tupIter.hasNext()) {
NullableTuple ntup = tupIter.next();
int index = ntup.getIndex();
- Tuple copy = getValueTuple(ntup, index);
- dbs[index].add(copy);
+ Tuple copy = getValueTuple(ntup, index);
+
+ if (numInputs == 1) {
+
+ // this is for multi-query merge where
+ // the numInputs is always 1, but the index
+ // (the position of the inner plan in the
+ // enclosed operator) may not be 1.
+ dbs[0].add(copy);
+ } else {
+ dbs[index].add(copy);
+ }
if(reporter!=null) reporter.progress();
}
@@ -240,21 +250,15 @@
// Need to make a copy of the value, as hadoop uses the same ntup
// to represent each value.
Tuple val = (Tuple)ntup.getValueAsPigType();
- /*
- Tuple copy = mTupleFactory.newTuple(val.size());
- for (int i = 0; i < val.size(); i++) {
- copy.set(i, val.get(i));
- }
- */
Tuple copy = null;
// The "value (val)" that we just got may not
// be the complete "value". It may have some portions
// in the "key" (look in POLocalRearrange for more comments)
// If this is the case we need to stitch
- // the "value" together.
- Pair<Boolean, Map<Integer, Integer>> lrKeyInfo =
- keyInfo.get(index);
+ // the "value" together.
+ Pair<Boolean, Map<Integer, Integer>> lrKeyInfo =
+ keyInfo.get(index);
boolean isProjectStar = lrKeyInfo.first;
Map<Integer, Integer> keyLookup = lrKeyInfo.second;
int keyLookupSize = keyLookup.size();
@@ -364,5 +368,4 @@
this.distinct = distinct;
}
-
}
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java
index 773a376..e472265 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java
@@ -32,11 +32,11 @@
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.VisitorException;
/**
* The MapReduce Split operator.
+ * <p>
* The assumption here is that
* the logical to physical translation
* will create this dummy operator with
@@ -56,7 +56,7 @@
* This is different than the existing implementation
* where the POSplit writes to sidefiles after filtering
* and then loads the appropriate file.
- *
+ * <p>
* The approach followed here is as good as the old
* approach if not better in many cases because
* of the availability of attachinInputs. An optimization
@@ -185,7 +185,7 @@
* the nested input plan list
* @param inPlan plan to be appended to the list
*/
- public void addPlan(PhysicalPlan inPlan) throws PlanException {
+ public void addPlan(PhysicalPlan inPlan) {
myPlans.add(inPlan);
processedSet.set(myPlans.size()-1);
}
diff --git a/src/org/apache/pig/impl/io/PigNullableWritable.java b/src/org/apache/pig/impl/io/PigNullableWritable.java
index 0495c94..00f0a95 100644
--- a/src/org/apache/pig/impl/io/PigNullableWritable.java
+++ b/src/org/apache/pig/impl/io/PigNullableWritable.java
@@ -36,12 +36,16 @@
*/
public abstract class PigNullableWritable implements WritableComparable {
+ private static byte mqFlag = (byte)0x80;
+
+ private static byte idxSpace = (byte)0x7F;
+
private boolean mNull;
protected WritableComparable mValue;
private byte mIndex;
-
+
/**
* Compare two nullable objects. Step one is to check if either or both
* are null. If one is null and the other is not, then the one that is
@@ -54,12 +58,19 @@
*/
public int compareTo(Object o) {
PigNullableWritable w = (PigNullableWritable)o;
+
+ if ((mIndex & mqFlag) != 0) { // this is a multi-query index
+
+ if ((mIndex & idxSpace) < (w.mIndex & idxSpace)) return -1;
+ else if ((mIndex & idxSpace) > (w.mIndex & idxSpace)) return 1;
+ }
+
if (!mNull && !w.mNull) {
return mValue.compareTo(w.mValue);
} else if (mNull && w.mNull) {
// If they're both null, compare the indicies
- if (mIndex < w.mIndex) return -1;
- else if (mIndex > w.mIndex) return 1;
+ if ((mIndex & idxSpace) < (w.mIndex & idxSpace)) return -1;
+ else if ((mIndex & idxSpace) > (w.mIndex & idxSpace)) return 1;
else return 0;
}
else if (mNull) return -1;
diff --git a/test/org/apache/pig/test/TestMultiQuery.java b/test/org/apache/pig/test/TestMultiQuery.java
index 8d6f369..c2ae3e0 100644
--- a/test/org/apache/pig/test/TestMultiQuery.java
+++ b/test/org/apache/pig/test/TestMultiQuery.java
@@ -181,6 +181,342 @@
}
@Test
+ public void testMultiQueryPhase3BaseCase() {
+
+ System.out.println("===== multi-query phase 3 base case =====");
+
+ try {
+ myPig.setBatchOn();
+
+ myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " +
+ "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+ myPig.registerQuery("b = filter a by uid < 5;");
+ myPig.registerQuery("c = filter a by uid >= 5 and uid < 10;");
+ myPig.registerQuery("d = filter a by uid >= 10;");
+ myPig.registerQuery("b1 = group b by gid;");
+ myPig.registerQuery("b2 = foreach b1 generate group, COUNT(b.uid);");
+ myPig.registerQuery("b3 = filter b2 by $1 > 5;");
+ myPig.registerQuery("store b3 into '/tmp/output1';");
+ myPig.registerQuery("c1 = group c by gid;");
+ myPig.registerQuery("c2 = foreach c1 generate group, SUM(c.uid);");
+ myPig.registerQuery("store c2 into '/tmp/output2';");
+ myPig.registerQuery("d1 = group d by gid;");
+ myPig.registerQuery("d2 = foreach d1 generate group, AVG(d.uid);");
+ myPig.registerQuery("store d2 into '/tmp/output3';");
+
+ LogicalPlan lp = checkLogicalPlan(1, 3, 19);
+
+ PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 25);
+
+ checkMRPlan(pp, 1, 1, 1);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+
+ @Test
+ public void testMultiQueryPhase3BaseCase2() {
+
+ System.out.println("===== multi-query phase 3 base case (2) =====");
+
+ try {
+ myPig.setBatchOn();
+
+ myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " +
+ "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+ myPig.registerQuery("b = filter a by uid < 5;");
+ myPig.registerQuery("c = filter a by uid >= 5 and uid < 10;");
+ myPig.registerQuery("d = filter a by uid >= 10;");
+ myPig.registerQuery("b1 = group b by gid;");
+ myPig.registerQuery("b2 = foreach b1 generate group, COUNT(b.uid);");
+ myPig.registerQuery("b3 = filter b2 by $1 > 5;");
+ myPig.registerQuery("store b3 into '/tmp/output1';");
+ myPig.registerQuery("c1 = group c by gid;");
+ myPig.registerQuery("c2 = foreach c1 generate group, SUM(c.uid);");
+ myPig.registerQuery("store c2 into '/tmp/output2';");
+ myPig.registerQuery("d1 = group d by gid;");
+ myPig.registerQuery("d2 = foreach d1 generate group, AVG(d.uid);");
+ myPig.registerQuery("store d2 into '/tmp/output3';");
+
+ myPig.executeBatch();
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+
+ @Test
+ public void testMultiQueryPhase3WithoutCombiner() {
+
+ System.out.println("===== multi-query phase 3 without combiner =====");
+
+ try {
+ myPig.setBatchOn();
+
+ myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " +
+ "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+ myPig.registerQuery("b = filter a by uid < 5;");
+ myPig.registerQuery("c = filter a by uid >= 5 and uid < 10;");
+ myPig.registerQuery("d = filter a by uid >= 10;");
+ myPig.registerQuery("b1 = group b by gid;");
+ myPig.registerQuery("b2 = foreach b1 generate group, COUNT(b.uid) + SUM(b.uid);");
+ myPig.registerQuery("b3 = filter b2 by $1 > 5;");
+ myPig.registerQuery("store b3 into '/tmp/output1';");
+ myPig.registerQuery("c1 = group c by gid;");
+ myPig.registerQuery("c2 = foreach c1 generate group, SUM(c.uid) - COUNT(c.uid);");
+ myPig.registerQuery("store c2 into '/tmp/output2';");
+ myPig.registerQuery("d1 = group d by gid;");
+ myPig.registerQuery("d2 = foreach d1 generate group, MAX(d.uid) - MIN(d.uid);");
+ myPig.registerQuery("store d2 into '/tmp/output3';");
+
+ LogicalPlan lp = checkLogicalPlan(1, 3, 19);
+
+ PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 25);
+
+ checkMRPlan(pp, 1, 1, 1);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+
+ @Test
+ public void testMultiQueryPhase3WithoutCombiner2() {
+
+ System.out.println("===== multi-query phase 3 without combiner (2) =====");
+
+ try {
+ myPig.setBatchOn();
+
+ myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " +
+ "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+ myPig.registerQuery("b = filter a by uid < 5;");
+ myPig.registerQuery("c = filter a by uid >= 5 and uid < 10;");
+ myPig.registerQuery("d = filter a by uid >= 10;");
+ myPig.registerQuery("b1 = group b by gid;");
+ myPig.registerQuery("b2 = foreach b1 generate group, COUNT(b.uid) + SUM(b.uid);");
+ myPig.registerQuery("b3 = filter b2 by $1 > 5;");
+ myPig.registerQuery("store b3 into '/tmp/output1';");
+ myPig.registerQuery("c1 = group c by gid;");
+ myPig.registerQuery("c2 = foreach c1 generate group, SUM(c.uid) - COUNT(c.uid);");
+ myPig.registerQuery("store c2 into '/tmp/output2';");
+ myPig.registerQuery("d1 = group d by gid;");
+ myPig.registerQuery("d2 = foreach d1 generate group, MAX(d.uid) - MIN(d.uid);");
+ myPig.registerQuery("store d2 into '/tmp/output3';");
+
+ myPig.executeBatch();
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+
+ @Test
+ public void testMultiQueryPhase3WithMixedCombiner() {
+
+ System.out.println("===== multi-query phase 3 with mixed combiner =====");
+
+ try {
+ myPig.setBatchOn();
+
+ myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " +
+ "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+ myPig.registerQuery("b = filter a by uid < 5;");
+ myPig.registerQuery("c = filter a by uid >= 5 and uid < 10;");
+ myPig.registerQuery("d = filter a by uid >= 10;");
+ myPig.registerQuery("b1 = group b by gid;");
+ myPig.registerQuery("b2 = foreach b1 generate group, COUNT(b.uid);");
+ myPig.registerQuery("b3 = filter b2 by $1 > 5;");
+ myPig.registerQuery("store b3 into '/tmp/output1';");
+ myPig.registerQuery("c1 = group c by gid;");
+ myPig.registerQuery("c2 = foreach c1 generate group, SUM(c.uid);");
+ myPig.registerQuery("store c2 into '/tmp/output2';");
+ myPig.registerQuery("d1 = group d by gid;");
+ myPig.registerQuery("d2 = foreach d1 generate group, MAX(d.uid) - MIN(d.uid);");
+ myPig.registerQuery("store d2 into '/tmp/output3';");
+
+ LogicalPlan lp = checkLogicalPlan(1, 3, 19);
+
+ PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 25);
+
+ checkMRPlan(pp, 1, 1, 2);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+
+ @Test
+ public void testMultiQueryPhase3WithMixedCombiner2() {
+
+ System.out.println("===== multi-query phase 3 with mixed combiner (2) =====");
+
+ try {
+ myPig.setBatchOn();
+
+ myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " +
+ "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+ myPig.registerQuery("b = filter a by uid < 5;");
+ myPig.registerQuery("c = filter a by uid >= 5 and uid < 10;");
+ myPig.registerQuery("d = filter a by uid >= 10;");
+ myPig.registerQuery("b1 = group b by gid;");
+ myPig.registerQuery("b2 = foreach b1 generate group, COUNT(b.uid);");
+ myPig.registerQuery("b3 = filter b2 by $1 > 5;");
+ myPig.registerQuery("store b3 into '/tmp/output1';");
+ myPig.registerQuery("c1 = group c by gid;");
+ myPig.registerQuery("c2 = foreach c1 generate group, SUM(c.uid);");
+ myPig.registerQuery("store c2 into '/tmp/output2';");
+ myPig.registerQuery("d1 = group d by gid;");
+ myPig.registerQuery("d2 = foreach d1 generate group, MAX(d.uid) - MIN(d.uid);");
+ myPig.registerQuery("store d2 into '/tmp/output3';");
+
+ myPig.executeBatch();
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+
+ @Test
+ public void testMultiQueryPhase3WithDifferentMapDataTypes() {
+
+ System.out.println("===== multi-query phase 3 with different map datatypes =====");
+
+ try {
+ myPig.setBatchOn();
+
+ myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " +
+ "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+ myPig.registerQuery("b = filter a by uid < 5;");
+ myPig.registerQuery("c = filter a by uid >= 5 and uid < 10;");
+ myPig.registerQuery("d = filter a by uid >= 10;");
+ myPig.registerQuery("b1 = group b by gid parallel 2;");
+ myPig.registerQuery("b2 = foreach b1 generate group, COUNT(b.uid);");
+ myPig.registerQuery("b3 = filter b2 by $1 > 5;");
+ myPig.registerQuery("store b3 into '/tmp/output1';");
+ myPig.registerQuery("c1 = group c by $1 parallel 3;");
+ myPig.registerQuery("c2 = foreach c1 generate group, SUM(c.uid);");
+ myPig.registerQuery("store c2 into '/tmp/output2';");
+ myPig.registerQuery("d1 = group d by $1 parallel 4;");
+ myPig.registerQuery("d2 = foreach d1 generate group, COUNT(d.uid);");
+ myPig.registerQuery("store d2 into '/tmp/output3';");
+
+ LogicalPlan lp = checkLogicalPlan(1, 3, 19);
+
+ PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 25);
+
+ checkMRPlan(pp, 1, 1, 1);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+
+ @Test
+ public void testMultiQueryPhase3WithDifferentMapDataTypes2() {
+
+ System.out.println("===== multi-query phase 3 with different map datatypes (2) =====");
+
+ try {
+ myPig.setBatchOn();
+
+ myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " +
+ "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+ myPig.registerQuery("b = filter a by uid < 5;");
+ myPig.registerQuery("c = filter a by uid >= 5 and uid < 10;");
+ myPig.registerQuery("d = filter a by uid >= 10;");
+ myPig.registerQuery("b1 = group b by gid;");
+ myPig.registerQuery("b2 = foreach b1 generate group, COUNT(b.uid);");
+ myPig.registerQuery("b3 = filter b2 by $1 > 5;");
+ myPig.registerQuery("store b3 into '/tmp/output1';");
+ myPig.registerQuery("c1 = group c by $1;");
+ myPig.registerQuery("c2 = foreach c1 generate group, SUM(c.uid);");
+ myPig.registerQuery("store c2 into '/tmp/output2';");
+ myPig.registerQuery("d1 = group d by $1;");
+ myPig.registerQuery("d2 = foreach d1 generate group, COUNT(d.uid);");
+ myPig.registerQuery("store d2 into '/tmp/output3';");
+
+ myPig.executeBatch();
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+
+ @Test
+ public void testMultiQueryPhase3StreamingInReducer() {
+
+ System.out.println("===== multi-query phase 3 with streaming in reducer =====");
+
+ try {
+ myPig.setBatchOn();
+
+ myPig.registerQuery("A = load 'file:test/org/apache/pig/test/data/passwd' split by 'file';");
+ myPig.registerQuery("Split A into A1 if $2 > 5, A2 if $2 >= 5;");
+ myPig.registerQuery("Split A1 into A3 if $0 > 'm', A4 if $0 >= 'm';");
+ myPig.registerQuery("B = group A3 by $2;");
+ myPig.registerQuery("C = foreach B generate flatten(A3);");
+ myPig.registerQuery("D = stream B through `cat`;");
+ myPig.registerQuery("store D into '/tmp/output1';");
+ myPig.registerQuery("E = group A4 by $2;");
+ myPig.registerQuery("F = foreach E generate group, COUNT(A4);");
+ myPig.registerQuery("store F into '/tmp/output2';");
+ myPig.registerQuery("G = group A1 by $2;");
+ myPig.registerQuery("H = foreach G generate group, COUNT(A1);");
+ myPig.registerQuery("store H into '/tmp/output3';");
+
+ LogicalPlan lp = checkLogicalPlan(1, 3, 16);
+
+ PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 24);
+
+ checkMRPlan(pp, 1, 1, 2);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+
+ @Test
+ public void testMultiQueryPhase3StreamingInReducer2() {
+
+ System.out.println("===== multi-query phase 3 with streaming in reducer (2) =====");
+
+ try {
+ myPig.setBatchOn();
+
+ myPig.registerQuery("A = load 'file:test/org/apache/pig/test/data/passwd' split by 'file';");
+ myPig.registerQuery("Split A into A1 if $2 > 5, A2 if $2 >= 5;");
+ myPig.registerQuery("Split A1 into A3 if $0 > 'm', A4 if $0 >= 'm';");
+ myPig.registerQuery("B = group A3 by $2;");
+ myPig.registerQuery("C = foreach B generate flatten(A3);");
+ myPig.registerQuery("D = stream B through `cat`;");
+ myPig.registerQuery("store D into '/tmp/output1';");
+ myPig.registerQuery("E = group A4 by $2;");
+ myPig.registerQuery("F = foreach E generate group, COUNT(A4);");
+ myPig.registerQuery("store F into '/tmp/output2';");
+ myPig.registerQuery("G = group A1 by $2;");
+ myPig.registerQuery("H = foreach G generate group, COUNT(A1);");
+ myPig.registerQuery("store H into '/tmp/output3';");
+
+ myPig.executeBatch();
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+
+ @Test
public void testMultiQueryWithPigMixL12() {
System.out.println("===== multi-query with PigMix L12 =====");
@@ -207,7 +543,7 @@
PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 23);
- checkMRPlan(pp, 1, 2, 3);
+ checkMRPlan(pp, 1, 1, 1);
} catch (Exception e) {
e.printStackTrace();
@@ -695,7 +1031,7 @@
PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 25);
- checkMRPlan(pp, 1, 2, 3);
+ checkMRPlan(pp, 1, 1, 1);
} catch (Exception e) {
e.printStackTrace();