PIG-617 Multi-query M3, support for multiple group bys in one MR job.



git-svn-id: https://svn.apache.org/repos/asf/hadoop/pig/branches/multiquery@769498 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 7fc41e4..4f6df08 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -613,3 +613,5 @@
     PIG-627: multiquery support incremental patch (hagleitn via pradeepkth)
 
     PIG-627: multiquery support incremental patch (hagleitn via pradeepkth)
+
+	PIG-627: multiquery support M3 (rding via gates)
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
index 7d9e0b5..89f194b 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
@@ -238,6 +238,8 @@
         NoopFilterRemover fRem = new NoopFilterRemover(plan);
         fRem.visit();
         
+        // reduces the number of MROpers in the MR plan generated 
+        // by multi-query (multi-store) script.
         MultiQueryOptimizer mqOptimizer = new MultiQueryOptimizer(plan);
         mqOptimizer.visit();
 
@@ -246,7 +248,7 @@
         // NoopFilterRemover.
         NoopStoreRemover sRem = new NoopStoreRemover(plan);
         sRem.visit();
-
+      
         // check whether stream operator is present
         // after MultiQueryOptimizer because it can shift streams from
         // map to reduce, etc.
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
index 05f4ce0..9fc377a 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
@@ -19,13 +19,19 @@
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODemux;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMultiQueryPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.data.DataType;
@@ -34,24 +40,28 @@
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.ReverseDependencyOrderWalker;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.Pair;
 import org.apache.pig.impl.plan.optimizer.OptimizerException;
 import org.apache.pig.PigException;
 
+
 /** 
  * An optimizer that merges all or part splittee MapReduceOpers into 
- * splitter MapReduceOper. The merge can produce a MROperPlan that has 
+ * splitter MapReduceOper. 
+ * <p>
+ * The merge can produce a MROperPlan that has 
  * fewer MapReduceOpers than MapReduceOpers in the original MROperPlan.
- * 
+ * <p>
  * The MRCompler generates multiple MapReduceOpers whenever it encounters 
  * a split operator and connects the single splitter MapReduceOper to 
  * one or more splittee MapReduceOpers using store/load operators:  
- *  
+ * <p>
  *     ---- POStore (in splitter) -... ----
  *     |        |    ...    |
  *     |        |    ...    |
  *    POLoad  POLoad ...  POLoad (in splittees)
  *      |        |           |
- *      
+ * <p>   
  *  This optimizer merges those MapReduceOpers by replacing POLoad/POStore 
  *  combination with POSplit operator.    
  */
@@ -68,15 +78,24 @@
         nig = NodeIdGenerator.getGenerator();
         List<MapReduceOper> roots = plan.getRoots();
         scope = roots.get(0).getOperatorKey().getScope();
+        
+        log.info("MR plan size before optimization: " + plan.size());
     }
 
     @Override
+    public void visit() throws VisitorException {
+        super.visit();
+        
+        log.info("MR plan size after optimization: " + mPlan.size());
+    }
+    
+    @Override
     public void visitMROp(MapReduceOper mr) throws VisitorException {
-       
+        
         if (!mr.isSplitter()) {
             return;
-        }
-       
+        }       
+        
         // first classify all the splittees
         List<MapReduceOper> mappers = new ArrayList<MapReduceOper>();
         List<MapReduceOper> multiLoadMROpers = new ArrayList<MapReduceOper>();
@@ -86,23 +105,26 @@
         for (MapReduceOper successor : successors) {
             if (isMapOnly(successor)) {
                 if (isSingleLoadMapperPlan(successor.mapPlan)) {                    
-                    mappers.add(successor);
+                    mappers.add(successor);                
                 } else {                    
                     multiLoadMROpers.add(successor);
                 }
             } else {
                 if (isSingleLoadMapperPlan(successor.mapPlan)) {                     
-                    mapReducers.add(successor);
+                    mapReducers.add(successor);                  
                 } else {                    
-                    multiLoadMROpers.add(successor);
+                    multiLoadMROpers.add(successor);                      
                 }
             }                
         }
-              
+                      
         // case 1: exactly one splittee and it's map-only
         if (mappers.size() == 1 && mapReducers.size() == 0 
                 && multiLoadMROpers.size() == 0 ) {            
             mergeOnlyMapperSplittee(mappers.get(0), mr);
+            
+            log.info("Merged the only map-only splittee.");
+            
             return;              
         }
         
@@ -110,9 +132,15 @@
         if (isMapOnly(mr) && mapReducers.size() == 1 
                 && mappers.size() == 0 && multiLoadMROpers.size() == 0) {            
             mergeOnlyMapReduceSplittee(mapReducers.get(0), mr);
+            
+            log.info("Merged the only map-reduce splittee.");
+            
             return;
         } 
         
+        int numSplittees = successors.size();
+        int numMerges = 0;
+        
         PhysicalPlan splitterPl = isMapOnly(mr) ? mr.mapPlan : mr.reducePlan;                            
         POStore storeOp = (POStore)splitterPl.getLeaves().get(0); 
         
@@ -120,38 +148,32 @@
         
         // case 3: multiple splittees and at least one of them is map-only
         if (mappers.size() > 0) {                
-            splitOp = getSplit();                
-            mergeAllMapOnlySplittees(mappers, mr, splitOp);
+            splitOp = getSplit();  
+            int n = mergeAllMapOnlySplittees(mappers, mr, splitOp);            
+            
+            log.info("Merged " + n + " map-only splittees.");
+            
+            numMerges += n;   
         }            
-               
-        boolean splitterMapOnly = isMapOnly(mr);
-        
-        // case 4: multiple splittees and at least one of them has reducer  
-        if (splitterMapOnly && mapReducers.size() > 0) {
-           
-            // pick one to merge, prefer one that has a combiner
-            MapReduceOper mapReducer= mapReducers.get(0);
-            for (MapReduceOper mro : mapReducers) {
-                if (!mro.combinePlan.isEmpty()) {
-                    mapReducer = mro;
-                    break;
-                }
-            }
               
+        // case 4: multiple splittees and at least one of them has reducer  
+        if (isMapOnly(mr) && mapReducers.size() > 0) {
+                         
             PhysicalOperator leaf = splitterPl.getLeaves().get(0);
                                                             
-            splitOp = (leaf instanceof POStore) ? 
-                    getSplit() : (POSplit)leaf;
+            splitOp = (leaf instanceof POStore) ? getSplit() : (POSplit)leaf;
                     
-            mergeSingleMapReduceSplittee(mapReducer, mr, splitOp);                
+            int n = mergeMapReduceSplittees(mapReducers, mr, splitOp);  
+            
+            log.info("Merged " + n + " map-reduce splittees.");
+            
+            numMerges += n;      
         }
-        
+       
         // finally, add original store to the split operator 
         // if there is splittee that hasn't been merged
         if (splitOp != null 
-                && ((multiLoadMROpers.size() > 0)
-                        || (mapReducers.size() > 1) 
-                        || (!splitterMapOnly && mapReducers.size() > 0))) {
+                && (numMerges < numSplittees)) {
 
             PhysicalPlan storePlan = new PhysicalPlan();
             try {
@@ -161,13 +183,17 @@
                 int errCode = 2129;
                 String msg = "Internal Error. Unable to add store to the split plan for optimization.";
                 throw new OptimizerException(msg, errCode, PigException.BUG, e);
-            }                               
+            }    
         }
+        
+        log.info("Merged " + numMerges + " out of total " 
+                + numSplittees + " splittees.");
     }                
-   
+    
     private void mergeOneMapPart(MapReduceOper mapper, MapReduceOper splitter)
     throws VisitorException {
-        PhysicalPlan splitterPl = isMapOnly(splitter) ? splitter.mapPlan : splitter.reducePlan;
+        PhysicalPlan splitterPl = isMapOnly(splitter) ? 
+                splitter.mapPlan : splitter.reducePlan;
         POStore storeOp = (POStore)splitterPl.getLeaves().get(0);
         List<PhysicalOperator> storePreds = splitterPl.getPredecessors(storeOp);           
         
@@ -203,14 +229,14 @@
         }
     }
 
-    private void mergeOnlyMapperSplittee(MapReduceOper mapper, MapReduceOper splitter) 
-    throws VisitorException {
+    private void mergeOnlyMapperSplittee(MapReduceOper mapper, 
+            MapReduceOper splitter) throws VisitorException {
         mergeOneMapPart(mapper, splitter);       
         removeAndReconnect(mapper, splitter);
     }
     
-    private void mergeOnlyMapReduceSplittee(MapReduceOper mapReducer, MapReduceOper splitter)
-    throws VisitorException {
+    private void mergeOnlyMapReduceSplittee(MapReduceOper mapReducer, 
+            MapReduceOper splitter) throws VisitorException {
         mergeOneMapPart(mapReducer, splitter);
 
         splitter.setMapDone(true);
@@ -220,7 +246,7 @@
         removeAndReconnect(mapReducer, splitter);          
     }
     
-    private void mergeAllMapOnlySplittees(List<MapReduceOper> mappers, 
+    private int mergeAllMapOnlySplittees(List<MapReduceOper> mappers, 
             MapReduceOper splitter, POSplit splitOp) throws VisitorException {
         
         PhysicalPlan splitterPl = isMapOnly(splitter) ? 
@@ -229,22 +255,16 @@
         List<PhysicalOperator> storePreds = splitterPl.getPredecessors(storeOp);  
 
         // merge splitee's map plans into nested plan of 
-        // the splitter operator
+        // the split operator
         for (MapReduceOper mapper : mappers) {                                
             PhysicalPlan pl = mapper.mapPlan;
             PhysicalOperator load = pl.getRoots().get(0);
-            pl.remove(load);
-            try {
-                splitOp.addPlan(pl);
-            } catch (PlanException e) {
-                int errCode = 2130;
-                String msg = "Internal Error. Unable to merge split plans for optimization.";
-                throw new OptimizerException(msg, errCode, PigException.BUG, e);
-            }
+            pl.remove(load);                   
+            splitOp.addPlan(pl);
         }
                            
         // replace store operator in the splitter with split operator
-        splitOp.setInputs(storePreds);
+        splitOp.setInputs(storePreds);    
         try {
             splitterPl.replace(storeOp, splitOp);;
         } catch (PlanException e) {
@@ -257,6 +277,406 @@
         for (MapReduceOper mapper : mappers) {
             removeAndReconnect(mapper, splitter);                  
         }
+        
+        return mappers.size();
+    }
+    
+    private boolean isSplitteeMergeable(MapReduceOper splittee) {
+        
+        // cannot be global sort or limit after sort, they are 
+        // using a different partitioner
+        if (splittee.isGlobalSort() || splittee.isLimitAfterSort()) {
+            log.info("Cannot merge this splittee: " +
+            		"it is global sort or limit after sort");
+            return false;
+        }
+        
+        // check the plan leaf: only merge local rearrange or split
+        PhysicalOperator leaf = splittee.mapPlan.getLeaves().get(0);
+        if (!(leaf instanceof POLocalRearrange) && 
+                ! (leaf instanceof POSplit)) {
+            log.info("Cannot merge this splittee: " +
+            		"its map plan doesn't end with LR or Split operator: " 
+                    + leaf.getClass().getName());
+            return false;
+        }
+           
+        // cannot have distinct combiner, it uses a different combiner
+        if (splittee.needsDistinctCombiner()) {
+            log.info("Cannot merge this splittee: " +
+            		"it has distinct combiner.");
+            return false;           
+        }
+        
+        return true;
+    }
+       
+    private List<MapReduceOper> getMergeList(List<MapReduceOper> mapReducers) {
+        List<MapReduceOper> mergeNoCmbList = new ArrayList<MapReduceOper>();
+        List<MapReduceOper> mergeCmbList = new ArrayList<MapReduceOper>();
+
+        for (MapReduceOper mrOp : mapReducers) {
+            if (isSplitteeMergeable(mrOp)) {
+                if (mrOp.combinePlan.isEmpty()) {
+                    mergeNoCmbList.add(mrOp);
+                } else {
+                    mergeCmbList.add(mrOp);
+                } 
+            }           
+        }     
+        return (mergeNoCmbList.size() > mergeCmbList.size()) ?
+                mergeNoCmbList : mergeCmbList;
+    }
+    
+    private int mergeMapReduceSplittees(List<MapReduceOper> mapReducers, 
+            MapReduceOper splitter, POSplit splitOp) throws VisitorException {
+                
+        List<MapReduceOper> mergeList = getMergeList(mapReducers);
+    
+        if (mergeList.size() <= 1) {
+
+            // chose one to merge, prefer the one with a combiner
+            MapReduceOper mapReducer = mapReducers.get(0);
+            for (MapReduceOper mro : mapReducers) {
+                if (!mro.combinePlan.isEmpty()) {
+                    mapReducer = mro;
+                    break;
+                }
+            }
+            mergeList.clear();
+            mergeList.add(mapReducer);
+        } 
+                         
+        if (mergeList.size() == 1) {
+            mergeSingleMapReduceSplittee(mergeList.get(0), splitter, splitOp);
+        } else {                                   
+            mergeAllMapReduceSplittees(mergeList, splitter, splitOp);
+        }
+        
+        return mergeList.size();
+    }
+    
+    private boolean hasSameMapKeyType(List<MapReduceOper> splittees) {
+        boolean sameKeyType = true;
+        for (MapReduceOper outer : splittees) {
+            for (MapReduceOper inner : splittees) {
+                if (inner.mapKeyType != outer.mapKeyType) {
+                    sameKeyType = false;
+                    break;
+                }
+            }
+            if (!sameKeyType) break;
+        }      
+ 
+        return sameKeyType;
+    }
+    
+    private int setIndexOnLRInSplit(int initial, POSplit splitOp)
+            throws VisitorException {
+        int index = initial;
+        
+        List<PhysicalPlan> pls = splitOp.getPlans();
+        for (PhysicalPlan pl : pls) {
+            PhysicalOperator leaf = pl.getLeaves().get(0);
+            if (leaf instanceof POLocalRearrange) {
+                POLocalRearrange lr = (POLocalRearrange)leaf;
+                try {
+                    lr.setMultiQueryIndex(index++); 
+                } catch (ExecException e) {                    
+                    int errCode = 2136;
+                    String msg = "Internal Error. Unable to set multi-query index for optimization.";
+                    throw new OptimizerException(msg, errCode, PigException.BUG, e);                   
+                }
+            } else if (leaf instanceof POSplit) {
+                POSplit spl = (POSplit)leaf;
+                index = setIndexOnLRInSplit(index, spl);
+            }
+        }
+
+        return index;
+    }
+    
+    private int mergeOneMapPlanWithIndex(PhysicalPlan pl, POSplit splitOp, 
+            int index, boolean sameKeyType) throws VisitorException {        
+        PhysicalOperator load = pl.getRoots().get(0);
+        pl.remove(load);
+                
+        int curIndex = index;
+        
+        PhysicalOperator leaf = pl.getLeaves().get(0);
+        if (leaf instanceof POLocalRearrange) {
+            POLocalRearrange lr = (POLocalRearrange)leaf;
+            try {
+                lr.setMultiQueryIndex(curIndex++);  
+            } catch (ExecException e) {                                      
+                int errCode = 2136;
+                String msg = "Internal Error. Unable to set multi-query index for optimization.";
+                throw new OptimizerException(msg, errCode, PigException.BUG, e);
+            }
+            
+            // change the map key type to tuple when 
+            // multiple splittees have different map key types
+            if (!sameKeyType) {
+                lr.setKeyType(DataType.TUPLE);
+            }
+        } else if (leaf instanceof POSplit) {
+            POSplit spl = (POSplit)leaf;
+            curIndex = setIndexOnLRInSplit(index, spl);
+        }
+                    
+        splitOp.addPlan(pl);
+               
+        return curIndex;
+    }
+    
+    private int setBaseIndexOnDemux(int initial, PODemux demuxOp) 
+            throws VisitorException {
+        int index = initial;
+        demuxOp.setBaseIndex(index++);
+
+        List<PhysicalPlan> pls = demuxOp.getPlans();
+        for (PhysicalPlan pl : pls) {
+            PhysicalOperator leaf = pl.getLeaves().get(0);
+            if (leaf instanceof POLocalRearrange) {
+                POLocalRearrange lr = (POLocalRearrange)leaf;
+                try {                    
+                    lr.setMultiQueryIndex(initial + lr.getIndex());                   
+                } catch (ExecException e) {                   
+                    int errCode = 2136;
+                    String msg = "Internal Error. Unable to set multi-query index for optimization.";
+                    throw new OptimizerException(msg, errCode, PigException.BUG, e);                         
+                }   
+            }
+            PhysicalOperator root = pl.getRoots().get(0);
+            if (root instanceof PODemux) {                
+                index = setBaseIndexOnDemux(index, (PODemux)root);
+            } else {
+                index++;
+            }
+        }
+        return index;
+    }
+    
+    private int setBaseIndexOnPackage(int initial, POMultiQueryPackage pkgOp) {
+        int index = initial;
+        pkgOp.setBaseIndex(index++);
+        
+        List<POPackage> pkgs = pkgOp.getPackages();
+        for (POPackage pkg : pkgs) {            
+            if (pkg instanceof POMultiQueryPackage) {
+                POMultiQueryPackage mpkg = (POMultiQueryPackage)pkg;
+                index = setBaseIndexOnPackage(index, mpkg);
+            } else {
+                index++;
+            }
+        }
+        return index;
+    }
+    
+    private void mergeOneReducePlanWithIndex(PhysicalPlan from, 
+            PhysicalPlan to, int initial, int current) throws VisitorException {                    
+        POPackage pk = (POPackage)from.getRoots().get(0);
+        from.remove(pk);
+ 
+        // XXX the index of the original keyInfo map is always 0,
+        // we need to shift the index so that the lookups works
+        // with the new indexed key       
+        Map<Integer, Pair<Boolean, Map<Integer, Integer>>> keyInfo = pk.getKeyInfo();
+        if (keyInfo != null && keyInfo.size() > 0) {      
+            byte b = (byte)(initial | 0x80);
+            keyInfo.put(new Integer(b), keyInfo.get(0));
+        }     
+        
+        if (pk instanceof POMultiQueryPackage) {
+            POMultiQueryPackage mpkg = (POMultiQueryPackage)pk;
+            setBaseIndexOnPackage(initial, mpkg);
+        }
+                                
+        PhysicalOperator root = from.getRoots().get(0);
+        if (root instanceof PODemux) {
+            PODemux demux = (PODemux)root;
+            setBaseIndexOnDemux(initial, demux);
+        }
+                    
+        POMultiQueryPackage pkg = (POMultiQueryPackage)to.getRoots().get(0);        
+        for (int i=initial; i<current; i++) {
+            pkg.addPackage(pk);
+        }
+        
+        PODemux demux = (PODemux)to.getLeaves().get(0);
+        for (int i=initial; i<current; i++) {
+            demux.addPlan(from);
+        }
+               
+        if (demux.isSameMapKeyType()) {
+            pkg.setKeyType(pk.getKeyType());
+        } else {
+            pkg.setKeyType(DataType.TUPLE);
+        }                
+    }
+    
+    private void mergeOneCombinePlanWithIndex(PhysicalPlan from,
+            PhysicalPlan to, int initial, int current) throws VisitorException {
+        POPackage cpk = (POPackage)from.getRoots().get(0);
+        from.remove(cpk);
+       
+        if (cpk instanceof POMultiQueryPackage) {
+            POMultiQueryPackage mpkg = (POMultiQueryPackage)cpk;
+            setBaseIndexOnPackage(initial, mpkg);
+        }
+        
+        PODemux demux = (PODemux)to.getLeaves().get(0);
+        
+        boolean isSameKeyType = demux.isSameMapKeyType();
+        
+        PhysicalOperator leaf = from.getLeaves().get(0);
+        if (leaf instanceof POLocalRearrange) {
+            POLocalRearrange clr = (POLocalRearrange)leaf;
+            try {
+                clr.setMultiQueryIndex(initial);            
+            } catch (ExecException e) {                                        
+                int errCode = 2136;
+                String msg = "Internal Error. Unable to set multi-query index for optimization.";
+                throw new OptimizerException(msg, errCode, PigException.BUG, e);
+            }
+            
+            // change the map key type to tuple when 
+            // multiple splittees have different map key types
+            if (!isSameKeyType) {
+                clr.setKeyType(DataType.TUPLE);
+            }
+        } else if (leaf instanceof PODemux) {
+            PODemux locDemux = (PODemux)leaf;
+            setBaseIndexOnDemux(initial, locDemux);
+        } 
+       
+        POMultiQueryPackage pkg = (POMultiQueryPackage)to.getRoots().get(0);     
+        for (int i=initial; i<current; i++) {
+            pkg.addPackage(cpk);
+        }
+        
+        // all packages should have the same key type
+        if (!isSameKeyType) {
+            cpk.setKeyType(DataType.TUPLE);          
+        } 
+        
+        pkg.setKeyType(cpk.getKeyType());
+                
+        for (int i=initial; i<current; i++) {
+            demux.addPlan(from);
+        }
+    }
+    
+    private boolean needCombiner(List<MapReduceOper> mapReducers) {
+        boolean needCombiner = false;
+        for (MapReduceOper mrOp : mapReducers) {
+            if (!mrOp.combinePlan.isEmpty()) {
+                needCombiner = true;
+                break;
+            }
+        }
+        return needCombiner;
+    }
+       
+    private PhysicalPlan createDemuxPlan(boolean sameKeyType, boolean isCombiner) 
+        throws VisitorException {
+        PODemux demux = getDemux(sameKeyType, isCombiner);
+        POMultiQueryPackage pkg= getMultiQueryPackage();
+        
+        PhysicalPlan pl = new PhysicalPlan();
+        pl.add(pkg);
+        try {
+            pl.addAsLeaf(demux);
+        } catch (PlanException e) {                   
+            int errCode = 2137;
+            String msg = "Internal Error. Unable to add demux to the plan as leaf for optimization.";
+            throw new OptimizerException(msg, errCode, PigException.BUG, e);
+        }
+        return pl;
+    }
+
+    private void mergeAllMapReduceSplittees(List<MapReduceOper> mergeList, 
+            MapReduceOper splitter, POSplit splitOp) throws VisitorException {
+       
+        boolean sameKeyType = hasSameMapKeyType(mergeList);
+        
+        log.info("Splittees have the same key type: " + sameKeyType);
+        
+        // create a new reduce plan that will be the container
+        // for the multiple reducer plans of the MROpers in the mergeList
+        PhysicalPlan redPl = createDemuxPlan(sameKeyType, false);
+        
+        // create a new combine plan that will be the container
+        // for the multiple combiner plans of the MROpers in the mergeList                
+        PhysicalPlan comPl = needCombiner(mergeList) ? 
+                createDemuxPlan(sameKeyType, true) : null;
+
+        log.info("Splittees have combiner: " + (comPl != null));
+                
+        int index = 0;             
+        
+        for (MapReduceOper mrOp : mergeList) {
+
+            // merge the map plan            
+            int incIndex = mergeOneMapPlanWithIndex(
+                    mrOp.mapPlan, splitOp, index, sameKeyType);
+                       
+            // merge the combiner plan
+            if (comPl != null) {
+                if (!mrOp.combinePlan.isEmpty()) {                    
+                    mergeOneCombinePlanWithIndex(
+                            mrOp.combinePlan, comPl, index, incIndex);
+                } else {         
+                    int errCode = 2141;
+                    String msg = "Internal Error. Cannot merge non-combiner with combiners for optimization.";
+                    throw new OptimizerException(msg, errCode, PigException.BUG);
+                }
+            }
+
+            // merge the reducer plan
+            mergeOneReducePlanWithIndex(
+                    mrOp.reducePlan, redPl, index, incIndex);
+           
+            index = incIndex;
+            
+            log.info("Merged MR job " + mrOp.getOperatorKey().getId() 
+                    + " into MR job " + splitter.getOperatorKey().getId());
+        }
+
+        PhysicalPlan splitterPl = splitter.mapPlan;
+        PhysicalOperator leaf = splitterPl.getLeaves().get(0);
+        PhysicalOperator storeOp = splitterPl.getLeaves().get(0);
+        List<PhysicalOperator> storePreds = splitterPl.getPredecessors(storeOp);  
+ 
+        // replace store operator in the splitter with split operator
+        if (leaf instanceof POStore) {                            
+            splitOp.setInputs(storePreds);
+            try {
+                splitterPl.replace(storeOp, splitOp);;
+            } catch (PlanException e) {                   
+                int errCode = 2132;
+                String msg = "Internal Error. Unable to replace store with split operator for optimization.";
+                throw new OptimizerException(msg, errCode, PigException.BUG, e);
+            }
+        }     
+        
+        splitter.setMapDone(true);
+        splitter.reducePlan = redPl;
+        splitter.setReduceDone(true);
+           
+        if (comPl != null) {
+            splitter.combinePlan = comPl;        
+        }
+        
+        for (MapReduceOper mrOp : mergeList) {
+            removeAndReconnect(mrOp, splitter);
+        }
+       
+        splitter.mapKeyType = sameKeyType ?
+                mergeList.get(0).mapKeyType : DataType.TUPLE;         
+                
+        log.info("Requested parallelism of splitter: " 
+                + splitter.getRequestedParallelism());               
     }
     
     private void mergeSingleMapReduceSplittee(MapReduceOper mapReduce, 
@@ -270,13 +690,8 @@
         PhysicalPlan pl = mapReduce.mapPlan;
         PhysicalOperator load = pl.getRoots().get(0);
         pl.remove(load);
-        try {
-            splitOp.addPlan(pl);
-        } catch (PlanException e) {
-            int errCode = 2130;
-            String msg = "Internal Error. Unable to merge split plans for optimization.";
-            throw new OptimizerException(msg, errCode, PigException.BUG, e);
-        }
+        
+        splitOp.addPlan(pl);
                               
         splitter.setMapDone(true);
         splitter.reducePlan = mapReduce.reducePlan;
@@ -301,6 +716,7 @@
     /**
      * Removes the specified MR operator from the plan after the merge. 
      * Connects its predecessors and successors to the merged MR operator
+     * 
      * @param mr the MR operator to remove
      * @param newMR the MR operator to be connected to the predecessors and 
      *              the successors of the removed operator
@@ -385,12 +801,21 @@
     }
     
     private boolean isSingleLoadMapperPlan(PhysicalPlan pl) {
-        List<PhysicalOperator> roots = pl.getRoots();
-        return (roots.size() == 1);
+        return (pl.getRoots().size() == 1);
     }
     
     private POSplit getSplit(){
-        POSplit sp = new POSplit(new OperatorKey(scope, nig.getNextNodeId(scope)));
-        return sp;
+        return new POSplit(new OperatorKey(scope, nig.getNextNodeId(scope)));
     } 
+    
+    private PODemux getDemux(boolean sameMapKeyType, boolean inCombiner){
+        PODemux demux = new PODemux(new OperatorKey(scope, nig.getNextNodeId(scope)));
+        demux.setSameMapKeyType(sameMapKeyType);
+        demux.setInCombiner(inCombiner);
+        return demux;
+    } 
+    
+    private POMultiQueryPackage getMultiQueryPackage(){
+        return new POMultiQueryPackage(new OperatorKey(scope, nig.getNextNodeId(scope)));
+    }  
 }
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java
index 660e965..6877bfb 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java
@@ -93,6 +93,12 @@
         super.visitSplit(spl);
         spl.setParentPlan(parent);
     }
+    
+    @Override
+    public void visitDemux(PODemux demux) throws VisitorException{
+        super.visitDemux(demux);
+        demux.setParentPlan(parent);
+    }
 
     @Override
     public void visitDistinct(PODistinct distinct) throws VisitorException {
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
index e399cf5..d336653 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
@@ -80,6 +80,10 @@
     public void visitCombinerPackage(POCombinerPackage pkg) throws VisitorException{
         //do nothing
     }
+ 
+    public void visitMultiQueryPackage(POMultiQueryPackage pkg) throws VisitorException{
+        //do nothing
+    }
     
     public void visitPOForEach(POForEach nfe) throws VisitorException {
         List<PhysicalPlan> inpPlans = nfe.getInputPlans();
@@ -103,6 +107,15 @@
         }
     }
 
+    public void visitDemux(PODemux demux) throws VisitorException{
+        List<PhysicalPlan> plans = demux.getPlans();
+        for (PhysicalPlan plan : plans) {
+            pushWalker(mCurrentWalker.spawnChildWalker(plan));
+            visit();
+            popWalker();
+        }
+    }
+    
 	public void visitDistinct(PODistinct distinct) throws VisitorException {
         //do nothing		
 	}
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java
index d2c6ebc..ad7f9ed 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java
@@ -163,6 +163,15 @@
           else if (node instanceof POSplit) {
               sb.append(planString(((POSplit)node).getPlans()));
           }
+          else if (node instanceof PODemux) {
+              sb.append(planString(((PODemux)node).getPlans()));
+          }
+          else if (node instanceof POMultiQueryPackage) {
+              List<POPackage> pkgs = ((POMultiQueryPackage)node).getPackages();
+              for (POPackage pkg : pkgs) {
+                  sb.append(LSep + pkg.name() + "\n");
+              }
+          }
           else if(node instanceof POFRJoin){
             POFRJoin frj = (POFRJoin)node;
             List<List<PhysicalPlan>> joinPlans = frj.getJoinPlans();
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java
new file mode 100644
index 0000000..9e90c3b
--- /dev/null
+++ b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * The MapReduce Demultiplexer operator.
+ * <p>
+ * This operator is used when merging multiple Map-Reduce splittees
+ * into a Map-only splitter during multi-query optimization. 
+ * The reduce physical plans of the splittees become the inner plans 
+ * of this operator.
+ * <p>
+ * Due to the recursive nature of multi-query optimization, this operator
+ * may be contained in another demux operator.
+ * <p>
+ * The predecessor of this operator must be a POMultiQueryPackage
+ * operator which passes the index (indicating which inner reduce plan to run)
+ * along with other data to this operator.    
+ */
+public class PODemux extends PhysicalOperator {
+
+    private static final long serialVersionUID = 1L;    
+    
+    private static int idxPart = 0x7F;
+    
+    private static Result empty = new Result(POStatus.STATUS_NULL, null);
+    
+    private static Result eop = new Result(POStatus.STATUS_EOP, null);
+    
+    private Log log = LogFactory.getLog(getClass());
+    
+    /*
+     * The base index of this demux. In the case of
+     * a demux contained in another demux, the index
+     * passed in must be shifted before it can be used.
+     */
+    private int baseIndex = 0;
+    
+    /*
+     * The list of sub-plans the inner plan is composed of
+     */
+    private ArrayList<PhysicalPlan> myPlans = new ArrayList<PhysicalPlan>();
+           
+    /*
+     * Flag indicating when a new pull should start 
+     */
+    private boolean getNext = true;
+    
+    /*
+     * Flag indicating when a new pull should start. 
+     * It's used only when receiving the call
+     * from reducer's close() method in the streaming case.
+     */
+    private boolean inpEOP = false;
+    
+    /*
+     * The leaf of the current pipeline
+     */
+    private PhysicalOperator curLeaf = null;
+    
+    /*
+     * Indicating if all the inner plans have the same
+     * map key type. If not, the keys passed in are 
+     * wrapped inside tuples and need to be extracted
+     * out during the reduce phase 
+     */
+    private boolean sameMapKeyType = true;
+    
+    /*
+     * Indicating if this operator is in a combiner. 
+     * If not, this operator is in a reducer and the key
+     * values must first be extracted from the tuple-wrap
+     * before writing out to the disk
+     */
+    private boolean inCombiner = false;
+    
+    BitSet processedSet = new BitSet();
+    
+    /**
+     * Constructs an operator with the specified key.
+     * 
+     * @param k the operator key
+     */
+    public PODemux(OperatorKey k) {
+        this(k, -1, null);
+    }
+
+    /**
+     * Constructs an operator with the specified key
+     * and degree of parallelism.
+     *  
+     * @param k the operator key
+     * @param rp the degree of parallelism requested
+     */
+    public PODemux(OperatorKey k, int rp) {
+        this(k, rp, null);
+    }
+
+    /**
+     * Constructs an operator with the specified key and inputs.
+     *  
+     * @param k the operator key
+     * @param inp the inputs that this operator will read data from
+     */
+    public PODemux(OperatorKey k, List<PhysicalOperator> inp) {
+        this(k, -1, inp);
+    }
+
+    /**
+     * Constructs an operator with the specified key,
+     * degree of parallelism and inputs.
+     * 
+     * @param k the operator key
+     * @param rp the degree of parallelism requested 
+     * @param inp the inputs that this operator will read data from
+     */
+    public PODemux(OperatorKey k, int rp, List<PhysicalOperator> inp) {
+        super(k, rp, inp);
+    }
+
+    @Override
+    public void visit(PhyPlanVisitor v) throws VisitorException {
+        v.visitDemux(this);
+    }
+
+    @Override
+    public String name() {
+        return "Demux - " + mKey.toString();
+    }
+
+    @Override
+    public boolean supportsMultipleInputs() {
+        return false;
+    }
+
+    @Override
+    public boolean supportsMultipleOutputs() {
+        return false;
+    }
+
+    /**
+     * Sets the base index of this demux. 
+     * 
+     * @param idx the base index
+     */
+    public void setBaseIndex(int idx) {
+        baseIndex = idx;
+    }
+    
+    /**
+     * Returns the base index of this demux
+     * 
+     * @return the base index
+     */
+    public int getBaseIndex() {
+        return baseIndex;
+    }
+    
+    /**
+     * Returns the list of inner plans.
+     *  
+     * @return the list of the nested plans
+     */
+    public List<PhysicalPlan> getPlans() {
+        return myPlans;
+    }
+    
+    /**
+     * Appends the specified plan at the end of the list.
+     * 
+     * @param inPlan plan to be appended to the inner plan list
+     */
+    public void addPlan(PhysicalPlan inPlan) {  
+        myPlans.add(inPlan);
+        processedSet.set(myPlans.size()-1);
+    }
+   
+    @Override
+    public Result getNext(Tuple t) throws ExecException {
+        
+        if (!inCombiner && this.parentPlan.endOfAllInput) {
+            
+            // If there is a stream in one of the inner plans, 
+            // there could potentially be more to process - the 
+            // reducer sets the flag stating that all map input has 
+            // been sent already and runs the pipeline one more time
+            // in its close() call.
+            return getStreamCloseResult();                 
+        
+        } else { 
+        
+            if (getNext) {
+                
+                Result inp = processInput();
+                
+                if (inp.returnStatus == POStatus.STATUS_EOP) {
+                    return inp;
+                }
+             
+                curLeaf = attachInputWithIndex((Tuple)inp.result);
+    
+                getNext = false;
+            }
+            
+            return runPipeline(curLeaf);      
+        }
+    }
+    
+    private Result runPipeline(PhysicalOperator leaf) throws ExecException {
+       
+        Result res = null;
+        
+        while (true) {
+            
+            res = leaf.getNext(dummyTuple);
+            
+            if (res.returnStatus == POStatus.STATUS_OK ||
+                    res.returnStatus == POStatus.STATUS_EOP || 
+                    res.returnStatus == POStatus.STATUS_ERR) {                
+                break;
+            } else if (res.returnStatus == POStatus.STATUS_NULL) {
+                continue;
+            } 
+        }   
+        
+        if (res.returnStatus == POStatus.STATUS_EOP) {
+            getNext = true;
+        }
+        
+        return (res.returnStatus == POStatus.STATUS_OK) ? res : empty;
+    }
+
+    private Result getStreamCloseResult() throws ExecException {
+        Result res = null;
+       
+        while (true) {
+                       
+            if (processedSet.cardinality() == myPlans.size()) {
+                curLeaf = null;
+                Result inp = processInput();
+                if (inp.returnStatus == POStatus.STATUS_OK) {                
+                    attachInputWithIndex((Tuple)inp.result);
+                    inpEOP = false;
+                } else if (inp.returnStatus == POStatus.STATUS_EOP){
+                    inpEOP = true;
+                } else if (inp.returnStatus == POStatus.STATUS_NULL) {
+                    inpEOP = false;
+                } else if (inp.returnStatus == POStatus.STATUS_ERR) {
+                    return inp;
+                }            
+                processedSet.clear();
+            }
+            
+            int idx = processedSet.nextClearBit(0);
+            PhysicalOperator leaf = myPlans.get(idx).getLeaves().get(0);
+            
+            // a nested demux object is stored in multiple positions 
+            // of the inner plan list, corresponding to the indexes of 
+            // its inner plans; skip the object if it's already processed.
+            if (curLeaf != null && leaf.getOperatorKey().equals(curLeaf.getOperatorKey())) {
+                processedSet.set(idx++); 
+                if (idx < myPlans.size()) {
+                    continue;
+                } else {
+                    res = eop;
+                }
+            } else {
+                curLeaf = leaf;
+                res = leaf.getNext(dummyTuple);
+               
+                if (res.returnStatus == POStatus.STATUS_EOP)  {
+                    processedSet.set(idx++);        
+                    if (idx < myPlans.size()) {
+                        continue;
+                    }
+                } else {
+                    break;
+                }
+            
+            }
+            
+            if (!inpEOP && res.returnStatus == POStatus.STATUS_EOP) {
+                continue;
+            } else {
+                break;
+            }
+        }
+        
+        return res;              
+    }    
+    
+    private PhysicalOperator attachInputWithIndex(Tuple res) throws ExecException {
+        
+        // unwrap the key to get the wrapped value which
+        // is expected by the inner plans
+        PigNullableWritable key = (PigNullableWritable)res.get(0);        
+    
+        // choose an inner plan to run based on the index set by
+        // the POLocalRearrange operator and passed to this operator
+        // by POMultiQueryPackage
+        int index = key.getIndex();
+        index &= idxPart;
+        index -= baseIndex;                         
+                           
+        PhysicalPlan pl = myPlans.get(index);
+        if (!(pl.getRoots().get(0) instanceof PODemux)) {                             
+            if (!sameMapKeyType & !inCombiner) {                                       
+                Tuple tup = (Tuple)key.getValueAsPigType();
+                res.set(0, tup.get(0));
+            } else {
+                res.set(0, key.getValueAsPigType());
+            }
+        }
+    
+        myPlans.get(index).attachInput(res);
+        return myPlans.get(index).getLeaves().get(0);
+    }
+    
+    /**
+     * Sets a flag indicating if all inner plans have 
+     * the same map key type. 
+     * 
+     * @param sameMapKeyType true if all inner plans have 
+     * the same map key type; otherwise false
+     */
+    public void setSameMapKeyType(boolean sameMapKeyType) {
+        this.sameMapKeyType = sameMapKeyType;
+    }
+
+    /**
+     * Returns a flag indicating if all inner plans 
+     * have the same map key type 
+     * 
+     * @return true if all inner plans have 
+     * the same map key type; otherwise false
+     */
+    public boolean isSameMapKeyType() {
+        return sameMapKeyType;
+    }
+
+    /**
+     * Sets a flag indicating if this operator is 
+     * in a combiner. 
+     * 
+     * @param inCombiner true if this operator is in
+     * a combiner; false if this operator is in a reducer
+     */
+    public void setInCombiner(boolean inCombiner) {
+        this.inCombiner = inCombiner;
+    }
+
+    /**
+     * Returns a flag indicating if this operator is 
+     * in a combiner.
+     * 
+     * @return true if this operator is in a combiner;
+     * otherwise this operator is in a reducer
+     */
+    public boolean isInCombiner() {
+        return inCombiner;
+    }
+        
+}
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
index 24fd257..fc17e9b 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
@@ -154,18 +154,41 @@
         return index;
     }
 
+    /**
+     * Sets the co-group index of this operator
+     * 
+     * @param index the position of this operator in 
+     * a co-group operation 
+     * @throws ExecException if the index value is bigger then 0x7F
+     */
     public void setIndex(int index) throws ExecException {
-        if (index > 0x7F) {
-            int errCode = 1082;
-            String msg = "Cogroups with more than 127 inputs "
-                + " not supported.";
-            throw new ExecException(msg, errCode, PigException.INPUT);
-        } else {
-            this.index = (byte)index;
-        }
-        lrOutput.set(0, new Byte(this.index));
+        setIndex(index, false);
     }
 
+    /**
+     * Sets the multi-query index of this operator
+     * 
+     * @param index the position of the parent plan of this operator
+     * in the enclosed split operator
+     * @throws ExecException if the index value is bigger then 0x7F
+     */
+    public void setMultiQueryIndex(int index) throws ExecException {
+        setIndex(index, true);
+    }
+    
+    private void setIndex(int index, boolean multiQuery) throws ExecException {
+        if (index > 0x7F) {
+            int errCode = 1082;
+            String msg = multiQuery? 
+                    "Merge more than 127 map-reduce jobs not supported."
+                  : "Cogroups with more than 127 inputs not supported.";
+            throw new ExecException(msg, errCode, PigException.INPUT);
+        } else {
+            this.index = multiQuery ? (byte)(index | 0x80) : (byte)index;
+        }            
+        lrOutput.set(0, new Byte(this.index));
+    }
+    
     public boolean isDistinct() { 
         return mIsDistinct;
     }
@@ -255,12 +278,26 @@
     protected Tuple constructLROutput(List<Result> resLst, Tuple value) throws ExecException{
         //Construct key
         Object key;
+        
         if(resLst.size()>1){
             Tuple t = mTupleFactory.newTuple(resLst.size());
             int i=-1;
             for(Result res : resLst)
                 t.set(++i, res.result);
-            key = t;
+            key = t;           
+        } else if (resLst.size() == 1 && keyType == DataType.TUPLE) {
+            
+            // We get here after merging multiple jobs that have different
+            // map key types into a single job during multi-query optimization.
+            // If the key isn't a tuple, it must be wrapped in a tuple.
+            Object obj = resLst.get(0).result;
+            if (obj instanceof Tuple) {
+                key = (Tuple)obj;
+            } else {
+                Tuple t = mTupleFactory.newTuple(1);
+                t.set(0, resLst.get(0).result);
+                key = t;
+            }        
         }
         else{
             key = resLst.get(0).result;
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java
new file mode 100644
index 0000000..c3df038
--- /dev/null
+++ b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * The package operator that packages the globally rearranged tuples 
+ * into output format as required by multi-query de-multiplexer.
+ * <p>
+ * This operator is used when merging multiple Map-Reduce splittees
+ * into a Map-only splitter during multi-query optimization. 
+ * The package operators of the reduce plans of the splittees form an
+ * indexed package list inside this operator. When this operator 
+ * receives an input, it extracts the index from the key and calls the 
+ * corresponding package to get the output data.
+ * <p>
+ * Due to the recursive nature of multi-query optimization, this operator
+ * may be contained in another multi-query packager.
+ * <p>
+ * The successor of this operator must be a PODemux operator which 
+ * knows how to consume the output of this operator.
+ */
+public class POMultiQueryPackage extends POPackage {
+    
+    private static final long serialVersionUID = 1L;
+    
+    private static int idxPart = 0x7F;
+
+    private final Log log = LogFactory.getLog(getClass());
+    
+    private List<POPackage> packages = new ArrayList<POPackage>();
+
+    private PigNullableWritable myKey;
+    
+    private int baseIndex = 0;      
+
+    /**
+     * Constructs an operator with the specified key.
+     * 
+     * @param k the operator key
+     */
+    public POMultiQueryPackage(OperatorKey k) {
+        this(k, -1, null);
+    }
+
+    /**
+     * Constructs an operator with the specified key
+     * and degree of parallelism.
+     *  
+     * @param k the operator key
+     * @param rp the degree of parallelism requested
+     */
+    public POMultiQueryPackage(OperatorKey k, int rp) {
+        this(k, rp, null);
+    }
+
+    /**
+     * Constructs an operator with the specified key and inputs.
+     *  
+     * @param k the operator key
+     * @param inp the inputs that this operator will read data from
+     */
+    public POMultiQueryPackage(OperatorKey k, List<PhysicalOperator> inp) {
+        this(k, -1, inp);
+    }
+
+    /**
+     * Constructs an operator with the specified key,
+     * degree of parallelism and inputs.
+     * 
+     * @param k the operator key
+     * @param rp the degree of parallelism requested 
+     * @param inp the inputs that this operator will read data from
+     */
+    public POMultiQueryPackage(OperatorKey k, int rp, List<PhysicalOperator> inp) {
+        super(k, rp, inp);
+    }
+
+    @Override
+    public String name() {
+        return "MultiQuery Package - " +  getOperatorKey().toString();
+    }
+
+    @Override
+    public boolean supportsMultipleInputs() {
+        return false;
+    }
+
+    @Override
+    public void visit(PhyPlanVisitor v) throws VisitorException {
+        v.visitMultiQueryPackage(this);
+    }
+
+    @Override
+    public boolean supportsMultipleOutputs() {
+        return false;
+    }
+    
+    @Override
+    public void attachInput(PigNullableWritable k, Iterator<NullableTuple> inp) {
+        tupIter = inp;
+        myKey = k;
+    }
+
+    @Override
+    public void detachInput() {
+        tupIter = null;
+        myKey = null;
+    }
+
+    /**
+     * Appends the specified package object to the end of 
+     * the package list.
+     * 
+     * @param pack package to be appended to the list
+     */
+    public void addPackage(POPackage pack) {
+        packages.add(pack);        
+    }
+
+    /**
+     * Returns the list of packages.
+     *  
+     * @return the list of the packages
+     */
+    public List<POPackage> getPackages() {
+        return packages;
+    }
+
+    /**
+     * Constructs the output tuple from the inputs.
+     * <p> 
+     * The output is consumed by for the demultiplexer operator 
+     * (PODemux) in the format (key, {bag of tuples}) where key 
+     * is an indexed WritableComparable, not the wrapped value as a pig type.
+     */
+    @Override
+    public Result getNext(Tuple t) throws ExecException {
+        
+        int index = myKey.getIndex();
+        index &= idxPart;
+        index -= baseIndex;
+        
+        if (index >= packages.size() || index < 0) {
+            int errCode = 2140;
+            String msg = "Invalid package index " + index 
+                + " should be in the range between 0 and " + packages.size();
+            throw new ExecException(msg, errCode, PigException.BUG);
+        }
+               
+        POPackage pack = packages.get(index);
+
+        pack.attachInput(myKey, tupIter);
+
+        Result res = pack.getNext(t);
+        
+        Tuple tuple = (Tuple)res.result;
+
+        // replace the wrapped value in the key with the key itself
+        tuple.set(0, myKey);
+
+        return res;
+    }
+
+    /**
+     * Sets the base index of this operator
+     * 
+     * @param baseIndex the base index of this operator
+     */
+    public void setBaseIndex(int baseIndex) {
+        this.baseIndex = baseIndex;
+    }
+
+    /**
+     * Returns the base index of this operator
+     * 
+     * @return the base index of this operator
+     */
+    public int getBaseIndex() {
+        return baseIndex;
+    }      
+}
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
index d148f45..afba7d9 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
@@ -206,8 +206,18 @@
             while (tupIter.hasNext()) {
                 NullableTuple ntup = tupIter.next();
                 int index = ntup.getIndex();
-                Tuple copy = getValueTuple(ntup, index);            
-                dbs[index].add(copy);
+                Tuple copy = getValueTuple(ntup, index);  
+                
+                if (numInputs == 1) {
+                    
+                    // this is for multi-query merge where 
+                    // the numInputs is always 1, but the index
+                    // (the position of the inner plan in the 
+                    // enclosed operator) may not be 1.
+                    dbs[0].add(copy);
+                } else {
+                    dbs[index].add(copy);
+                }
                 if(reporter!=null) reporter.progress();
             }
             
@@ -240,21 +250,15 @@
      // Need to make a copy of the value, as hadoop uses the same ntup
         // to represent each value.
         Tuple val = (Tuple)ntup.getValueAsPigType();
-        /*
-        Tuple copy = mTupleFactory.newTuple(val.size());
-        for (int i = 0; i < val.size(); i++) {
-            copy.set(i, val.get(i));
-        }
-        */
         
         Tuple copy = null;
         // The "value (val)" that we just got may not
         // be the complete "value". It may have some portions
         // in the "key" (look in POLocalRearrange for more comments)
         // If this is the case we need to stitch
-        // the "value" together.
-        Pair<Boolean, Map<Integer, Integer>> lrKeyInfo =
-            keyInfo.get(index);
+        // the "value" together.        
+        Pair<Boolean, Map<Integer, Integer>> lrKeyInfo = 
+            keyInfo.get(index);           
         boolean isProjectStar = lrKeyInfo.first;
         Map<Integer, Integer> keyLookup = lrKeyInfo.second;
         int keyLookupSize = keyLookup.size();
@@ -364,5 +368,4 @@
         this.distinct = distinct;
     }
 
-
 }
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java
index 773a376..e472265 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java
@@ -32,11 +32,11 @@
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
 
 /**
  * The MapReduce Split operator.
+ * <p>
  * The assumption here is that
  * the logical to physical translation
  * will create this dummy operator with
@@ -56,7 +56,7 @@
  * This is different than the existing implementation
  * where the POSplit writes to sidefiles after filtering
  * and then loads the appropriate file.
- * 
+ * <p>
  * The approach followed here is as good as the old
  * approach if not better in many cases because
  * of the availability of attachinInputs. An optimization
@@ -185,7 +185,7 @@
      * the nested input plan list
      * @param inPlan plan to be appended to the list
      */
-    public void addPlan(PhysicalPlan inPlan) throws PlanException {        
+    public void addPlan(PhysicalPlan inPlan) {        
         myPlans.add(inPlan);
         processedSet.set(myPlans.size()-1);
     }
diff --git a/src/org/apache/pig/impl/io/PigNullableWritable.java b/src/org/apache/pig/impl/io/PigNullableWritable.java
index 0495c94..00f0a95 100644
--- a/src/org/apache/pig/impl/io/PigNullableWritable.java
+++ b/src/org/apache/pig/impl/io/PigNullableWritable.java
@@ -36,12 +36,16 @@
  */
 public abstract class PigNullableWritable implements WritableComparable {
 
+    private static byte mqFlag = (byte)0x80;
+    
+    private static byte idxSpace = (byte)0x7F;
+    
     private boolean mNull;
 
     protected WritableComparable mValue;
 
     private byte mIndex;
-
+       
     /**
      * Compare two nullable objects.  Step one is to check if either or both
      * are null.  If one is null and the other is not, then the one that is
@@ -54,12 +58,19 @@
      */
     public int compareTo(Object o) {
         PigNullableWritable w = (PigNullableWritable)o;
+
+        if ((mIndex & mqFlag) != 0) { // this is a multi-query index
+            
+            if ((mIndex & idxSpace) < (w.mIndex & idxSpace)) return -1;
+            else if ((mIndex & idxSpace) > (w.mIndex & idxSpace)) return 1;
+        }
+        
         if (!mNull && !w.mNull) {
             return mValue.compareTo(w.mValue);
         } else if (mNull && w.mNull) {
             // If they're both null, compare the indicies
-            if (mIndex < w.mIndex) return -1;
-            else if (mIndex > w.mIndex) return 1;
+            if ((mIndex & idxSpace) < (w.mIndex & idxSpace)) return -1;
+            else if ((mIndex & idxSpace) > (w.mIndex & idxSpace)) return 1;
             else return 0;
         }
         else if (mNull) return -1; 
diff --git a/test/org/apache/pig/test/TestMultiQuery.java b/test/org/apache/pig/test/TestMultiQuery.java
index 8d6f369..c2ae3e0 100644
--- a/test/org/apache/pig/test/TestMultiQuery.java
+++ b/test/org/apache/pig/test/TestMultiQuery.java
@@ -181,6 +181,342 @@
     }
     
     @Test
+    public void testMultiQueryPhase3BaseCase() {
+
+        System.out.println("===== multi-query phase 3 base case =====");
+
+        try {
+            myPig.setBatchOn();
+
+            myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " +
+                                 "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+            myPig.registerQuery("b = filter a by uid < 5;");
+            myPig.registerQuery("c = filter a by uid >= 5 and uid < 10;");
+            myPig.registerQuery("d = filter a by uid >= 10;");
+            myPig.registerQuery("b1 = group b by gid;");
+            myPig.registerQuery("b2 = foreach b1 generate group, COUNT(b.uid);");
+            myPig.registerQuery("b3 = filter b2 by $1 > 5;");
+            myPig.registerQuery("store b3 into '/tmp/output1';");
+            myPig.registerQuery("c1 = group c by gid;");
+            myPig.registerQuery("c2 = foreach c1 generate group, SUM(c.uid);");
+            myPig.registerQuery("store c2 into '/tmp/output2';");
+            myPig.registerQuery("d1 = group d by gid;");
+            myPig.registerQuery("d2 = foreach d1 generate group, AVG(d.uid);");            
+            myPig.registerQuery("store d2 into '/tmp/output3';");
+             
+            LogicalPlan lp = checkLogicalPlan(1, 3, 19);
+
+            PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 25);
+
+            checkMRPlan(pp, 1, 1, 1);
+            
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } 
+    }     
+    
+    @Test
+    public void testMultiQueryPhase3BaseCase2() {
+
+        System.out.println("===== multi-query phase 3 base case (2) =====");
+
+        try {
+            myPig.setBatchOn();
+
+            myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " +
+                                 "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+            myPig.registerQuery("b = filter a by uid < 5;");
+            myPig.registerQuery("c = filter a by uid >= 5 and uid < 10;");
+            myPig.registerQuery("d = filter a by uid >= 10;");
+            myPig.registerQuery("b1 = group b by gid;");
+            myPig.registerQuery("b2 = foreach b1 generate group, COUNT(b.uid);");
+            myPig.registerQuery("b3 = filter b2 by $1 > 5;");
+            myPig.registerQuery("store b3 into '/tmp/output1';");
+            myPig.registerQuery("c1 = group c by gid;");
+            myPig.registerQuery("c2 = foreach c1 generate group, SUM(c.uid);");
+            myPig.registerQuery("store c2 into '/tmp/output2';");
+            myPig.registerQuery("d1 = group d by gid;");
+            myPig.registerQuery("d2 = foreach d1 generate group, AVG(d.uid);");            
+            myPig.registerQuery("store d2 into '/tmp/output3';");
+             
+            myPig.executeBatch();
+            
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } 
+    }         
+    
+    @Test
+    public void testMultiQueryPhase3WithoutCombiner() {
+
+        System.out.println("===== multi-query phase 3 without combiner =====");
+
+        try {
+            myPig.setBatchOn();
+
+            myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " +
+                                 "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+            myPig.registerQuery("b = filter a by uid < 5;");
+            myPig.registerQuery("c = filter a by uid >= 5 and uid < 10;");
+            myPig.registerQuery("d = filter a by uid >= 10;");
+            myPig.registerQuery("b1 = group b by gid;");
+            myPig.registerQuery("b2 = foreach b1 generate group, COUNT(b.uid) + SUM(b.uid);");
+            myPig.registerQuery("b3 = filter b2 by $1 > 5;");
+            myPig.registerQuery("store b3 into '/tmp/output1';");
+            myPig.registerQuery("c1 = group c by gid;");
+            myPig.registerQuery("c2 = foreach c1 generate group, SUM(c.uid) - COUNT(c.uid);");
+            myPig.registerQuery("store c2 into '/tmp/output2';");
+            myPig.registerQuery("d1 = group d by gid;");           
+            myPig.registerQuery("d2 = foreach d1 generate group, MAX(d.uid) - MIN(d.uid);");
+            myPig.registerQuery("store d2 into '/tmp/output3';");
+             
+            LogicalPlan lp = checkLogicalPlan(1, 3, 19);
+
+            PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 25);
+
+            checkMRPlan(pp, 1, 1, 1);
+            
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } 
+    }     
+    
+    @Test
+    public void testMultiQueryPhase3WithoutCombiner2() {
+
+        System.out.println("===== multi-query phase 3 without combiner (2) =====");
+
+        try {
+            myPig.setBatchOn();
+
+            myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " +
+                                 "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+            myPig.registerQuery("b = filter a by uid < 5;");
+            myPig.registerQuery("c = filter a by uid >= 5 and uid < 10;");
+            myPig.registerQuery("d = filter a by uid >= 10;");
+            myPig.registerQuery("b1 = group b by gid;");
+            myPig.registerQuery("b2 = foreach b1 generate group, COUNT(b.uid) + SUM(b.uid);");
+            myPig.registerQuery("b3 = filter b2 by $1 > 5;");
+            myPig.registerQuery("store b3 into '/tmp/output1';");
+            myPig.registerQuery("c1 = group c by gid;");
+            myPig.registerQuery("c2 = foreach c1 generate group, SUM(c.uid) - COUNT(c.uid);");
+            myPig.registerQuery("store c2 into '/tmp/output2';");
+            myPig.registerQuery("d1 = group d by gid;");           
+            myPig.registerQuery("d2 = foreach d1 generate group, MAX(d.uid) - MIN(d.uid);");
+            myPig.registerQuery("store d2 into '/tmp/output3';");
+             
+            myPig.executeBatch();
+            
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } 
+    }     
+    
+    @Test
+    public void testMultiQueryPhase3WithMixedCombiner() {
+
+        System.out.println("===== multi-query phase 3 with mixed combiner =====");
+
+        try {
+            myPig.setBatchOn();
+
+            myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " +
+                                 "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+            myPig.registerQuery("b = filter a by uid < 5;");
+            myPig.registerQuery("c = filter a by uid >= 5 and uid < 10;");
+            myPig.registerQuery("d = filter a by uid >= 10;");
+            myPig.registerQuery("b1 = group b by gid;");
+            myPig.registerQuery("b2 = foreach b1 generate group, COUNT(b.uid);");
+            myPig.registerQuery("b3 = filter b2 by $1 > 5;");
+            myPig.registerQuery("store b3 into '/tmp/output1';");
+            myPig.registerQuery("c1 = group c by gid;");
+            myPig.registerQuery("c2 = foreach c1 generate group, SUM(c.uid);");
+            myPig.registerQuery("store c2 into '/tmp/output2';");
+            myPig.registerQuery("d1 = group d by gid;");            
+            myPig.registerQuery("d2 = foreach d1 generate group, MAX(d.uid) - MIN(d.uid);");
+            myPig.registerQuery("store d2 into '/tmp/output3';");
+             
+            LogicalPlan lp = checkLogicalPlan(1, 3, 19);
+
+            PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 25);
+
+            checkMRPlan(pp, 1, 1, 2);
+            
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } 
+    }         
+    
+    @Test
+    public void testMultiQueryPhase3WithMixedCombiner2() {
+
+        System.out.println("===== multi-query phase 3 with mixed combiner (2) =====");
+
+        try {
+            myPig.setBatchOn();
+
+            myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " +
+                                 "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+            myPig.registerQuery("b = filter a by uid < 5;");
+            myPig.registerQuery("c = filter a by uid >= 5 and uid < 10;");
+            myPig.registerQuery("d = filter a by uid >= 10;");
+            myPig.registerQuery("b1 = group b by gid;");
+            myPig.registerQuery("b2 = foreach b1 generate group, COUNT(b.uid);");
+            myPig.registerQuery("b3 = filter b2 by $1 > 5;");
+            myPig.registerQuery("store b3 into '/tmp/output1';");
+            myPig.registerQuery("c1 = group c by gid;");
+            myPig.registerQuery("c2 = foreach c1 generate group, SUM(c.uid);");
+            myPig.registerQuery("store c2 into '/tmp/output2';");
+            myPig.registerQuery("d1 = group d by gid;");            
+            myPig.registerQuery("d2 = foreach d1 generate group, MAX(d.uid) - MIN(d.uid);");
+            myPig.registerQuery("store d2 into '/tmp/output3';");
+             
+            myPig.executeBatch();
+            
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } 
+    }         
+    
+    @Test
+    public void testMultiQueryPhase3WithDifferentMapDataTypes() {
+
+        System.out.println("===== multi-query phase 3 with different map datatypes =====");
+
+        try {
+            myPig.setBatchOn();
+
+            myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " +
+                                 "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+            myPig.registerQuery("b = filter a by uid < 5;");
+            myPig.registerQuery("c = filter a by uid >= 5 and uid < 10;");
+            myPig.registerQuery("d = filter a by uid >= 10;");
+            myPig.registerQuery("b1 = group b by gid parallel 2;");
+            myPig.registerQuery("b2 = foreach b1 generate group, COUNT(b.uid);");
+            myPig.registerQuery("b3 = filter b2 by $1 > 5;");
+            myPig.registerQuery("store b3 into '/tmp/output1';");
+            myPig.registerQuery("c1 = group c by $1 parallel 3;");
+            myPig.registerQuery("c2 = foreach c1 generate group, SUM(c.uid);");
+            myPig.registerQuery("store c2 into '/tmp/output2';");
+            myPig.registerQuery("d1 = group d by $1 parallel 4;");
+            myPig.registerQuery("d2 = foreach d1 generate group, COUNT(d.uid);");
+            myPig.registerQuery("store d2 into '/tmp/output3';");
+             
+            LogicalPlan lp = checkLogicalPlan(1, 3, 19);
+
+            PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 25);
+
+            checkMRPlan(pp, 1, 1, 1);
+            
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } 
+    }         
+
+    @Test
+    public void testMultiQueryPhase3WithDifferentMapDataTypes2() {
+
+        System.out.println("===== multi-query phase 3 with different map datatypes (2) =====");
+
+        try {
+            myPig.setBatchOn();
+
+            myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " +
+                                 "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+            myPig.registerQuery("b = filter a by uid < 5;");
+            myPig.registerQuery("c = filter a by uid >= 5 and uid < 10;");
+            myPig.registerQuery("d = filter a by uid >= 10;");
+            myPig.registerQuery("b1 = group b by gid;");
+            myPig.registerQuery("b2 = foreach b1 generate group, COUNT(b.uid);");
+            myPig.registerQuery("b3 = filter b2 by $1 > 5;");
+            myPig.registerQuery("store b3 into '/tmp/output1';");
+            myPig.registerQuery("c1 = group c by $1;");
+            myPig.registerQuery("c2 = foreach c1 generate group, SUM(c.uid);");
+            myPig.registerQuery("store c2 into '/tmp/output2';");
+            myPig.registerQuery("d1 = group d by $1;");
+            myPig.registerQuery("d2 = foreach d1 generate group, COUNT(d.uid);");
+            myPig.registerQuery("store d2 into '/tmp/output3';");
+             
+            myPig.executeBatch();
+            
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } 
+    }         
+    
+    @Test
+    public void testMultiQueryPhase3StreamingInReducer() {
+
+        System.out.println("===== multi-query phase 3 with streaming in reducer =====");
+
+        try {
+            myPig.setBatchOn();
+
+            myPig.registerQuery("A = load 'file:test/org/apache/pig/test/data/passwd' split by 'file';");
+            myPig.registerQuery("Split A into A1 if $2 > 5, A2 if $2 >= 5;");
+            myPig.registerQuery("Split A1 into A3 if $0 > 'm', A4 if $0 >= 'm';");
+            myPig.registerQuery("B = group A3 by $2;");
+            myPig.registerQuery("C = foreach B generate flatten(A3);");
+            myPig.registerQuery("D = stream B through `cat`;");
+            myPig.registerQuery("store D into '/tmp/output1';");
+            myPig.registerQuery("E = group A4 by $2;");
+            myPig.registerQuery("F = foreach E generate group, COUNT(A4);");
+            myPig.registerQuery("store F into '/tmp/output2';");            
+            myPig.registerQuery("G = group A1 by $2;");
+            myPig.registerQuery("H = foreach G generate group, COUNT(A1);");          
+            myPig.registerQuery("store H into '/tmp/output3';");
+             
+            LogicalPlan lp = checkLogicalPlan(1, 3, 16);
+
+            PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 24);
+
+            checkMRPlan(pp, 1, 1, 2);
+            
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } 
+    }         
+    
+    @Test
+    public void testMultiQueryPhase3StreamingInReducer2() {
+
+        System.out.println("===== multi-query phase 3 with streaming in reducer (2) =====");
+
+        try {
+            myPig.setBatchOn();
+
+            myPig.registerQuery("A = load 'file:test/org/apache/pig/test/data/passwd' split by 'file';");
+            myPig.registerQuery("Split A into A1 if $2 > 5, A2 if $2 >= 5;");
+            myPig.registerQuery("Split A1 into A3 if $0 > 'm', A4 if $0 >= 'm';");
+            myPig.registerQuery("B = group A3 by $2;");
+            myPig.registerQuery("C = foreach B generate flatten(A3);");
+            myPig.registerQuery("D = stream B through `cat`;");
+            myPig.registerQuery("store D into '/tmp/output1';");
+            myPig.registerQuery("E = group A4 by $2;");
+            myPig.registerQuery("F = foreach E generate group, COUNT(A4);");
+            myPig.registerQuery("store F into '/tmp/output2';");            
+            myPig.registerQuery("G = group A1 by $2;");
+            myPig.registerQuery("H = foreach G generate group, COUNT(A1);");          
+            myPig.registerQuery("store H into '/tmp/output3';");
+             
+            myPig.executeBatch();
+            
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } 
+    }       
+    
+    @Test
     public void testMultiQueryWithPigMixL12() {
 
         System.out.println("===== multi-query with PigMix L12 =====");
@@ -207,7 +543,7 @@
 
             PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 23);
 
-            checkMRPlan(pp, 1, 2, 3); 
+            checkMRPlan(pp, 1, 1, 1); 
             
         } catch (Exception e) {
             e.printStackTrace();
@@ -695,7 +1031,7 @@
 
             PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 25);
 
-            checkMRPlan(pp, 1, 2, 3);
+            checkMRPlan(pp, 1, 1, 1);
             
         } catch (Exception e) {
             e.printStackTrace();