PIG-4120: Broadcast the index file in case of POMergeCoGroup and POMergeJoin (satishsaley via rohini)

git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1811015 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 99df4cb..09c360c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -26,6 +26,8 @@
  
 IMPROVEMENTS
 
+PIG-4120: Broadcast the index file in case of POMergeCoGroup and POMergeJoin (satishsaley via rohini)
+
 PIG-5306: REGEX_EXTRACT() logs every line that doesn't match (satishsaley via rohini)
 
 PIG-5298: Verify if org.mortbay.jetty is removable (nkollar via szita)
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java
index f18d47a..24a52d5 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java
@@ -21,6 +21,7 @@
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
 import java.util.PriorityQueue;
@@ -41,6 +42,7 @@
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ExpressionOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.data.DataBag;
@@ -53,6 +55,7 @@
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.Pair;
 
@@ -78,8 +81,6 @@
     // relation is also included in the count.
     private transient int relationCnt;
 
-    private transient TupleFactory mTupleFactory;
-
     private String indexFileName;
 
     private FuncSpec idxFuncSpec; 
@@ -113,7 +114,19 @@
         for(int i=0; i < lrs.length; i++)
             LRs[i].setStripKeyFromValue(false);
     }
-    
+
+    public POMergeCogroup(POMergeCogroup copy) {
+        super(copy);
+        this.sidFuncSpecs = copy.sidFuncSpecs;
+        this.sideFileSpecs = copy.sideFileSpecs;
+        this.LRs = copy.LRs;
+        this.indexFileName = copy.indexFileName;
+        this.idxFuncSpec = copy.idxFuncSpec;
+        this.loaderSignatures = copy.loaderSignatures;
+        this.endOfRecordMark = copy.endOfRecordMark;
+        this.counter = copy.counter;
+    }
+
     // Set to POStatus.STATUS_EOP (default) for MR and POStatus.STATUS_NULL for Tez.
     // This is because:
     // For MR, we send EOP at the end of every record
@@ -413,7 +426,7 @@
         }
     }
 
-    private List<Pair<Integer,Tuple>> readIndex() throws ExecException{
+    protected List<Pair<Integer, Tuple>> readIndex() throws ExecException {
 
         // Assertions on index we are about to read:
         // We are reading index from a file through POLoad which will return tuples.
@@ -438,20 +451,28 @@
         List<Pair<Integer,Tuple>> index = new ArrayList<Pair<Integer,Tuple>>();
 
         for(Result res = ld.getNextTuple(); res.returnStatus != POStatus.STATUS_EOP; res = ld.getNextTuple()){
-
-            Tuple  idxTuple = (Tuple)res.result;
-            int colCnt = idxTuple.size()-2;
-            Tuple keyTuple = mTupleFactory.newTuple(colCnt);
-
-            for (int i=0; i< colCnt; i++)
-                keyTuple.set(i, idxTuple.get(i));
-
-            index.add(new Pair<Integer, Tuple>((Integer)idxTuple.get(colCnt+1), keyTuple));
+            addTupleToIndex((Tuple) res.result, index);
         }
 
         return index;
     }
 
+    /**
+     * Separates out key tuple from given tuple and adds it to index.
+     *
+     * @param tuple
+     * @param index
+     * @throws ExecException
+     */
+    protected void addTupleToIndex(Tuple tuple, List<Pair<Integer, Tuple>> index) throws ExecException {
+        int colCnt = tuple.size() - 2;
+        Tuple keyTuple = mTupleFactory.newTuple(colCnt);
+        for (int i = 0; i < colCnt; i++) {
+            keyTuple.set(i, tuple.get(i));
+        }
+        index.add(new Pair<Integer, Tuple>((Integer) tuple.get(colCnt + 1), keyTuple));
+    }
+
     @SuppressWarnings("unchecked")
     private Comparable<Object> getFirstKeyOfNextSplit(final int curSplitIdx, final List<Pair<Integer,Tuple>> index) throws IOException{
 
@@ -548,7 +569,6 @@
     ClassNotFoundException, ExecException {
 
         is.defaultReadObject();
-        mTupleFactory = TupleFactory.getInstance();
         this.heap = new PriorityQueue<Tuple>(11, new Comparator<Tuple>() {
 
             @SuppressWarnings("unchecked")
@@ -622,4 +642,21 @@
     public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
         return null;
     }
+
+    @Override
+    public POMergeCogroup clone() throws CloneNotSupportedException {
+        POMergeCogroup clone = (POMergeCogroup) super.clone();
+        clone.sidFuncSpecs = new ArrayList<FuncSpec>();
+        for (FuncSpec f : this.sidFuncSpecs) {
+            clone.sidFuncSpecs.add(f.clone());
+        }
+        clone.sideFileSpecs = new ArrayList<String>(this.sideFileSpecs);
+        clone.LRs = new POLocalRearrange[this.LRs.length];
+        for (int i = 0; i < this.LRs.length; i++) {
+            clone.LRs[i] = this.LRs[i].clone();
+        }
+        clone.idxFuncSpec = this.idxFuncSpec.clone();
+        clone.loaderSignatures = new ArrayList<String>(this.loaderSignatures);
+        return clone;
+    }
 }
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroupTez.java b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroupTez.java
new file mode 100644
index 0000000..d8a1078
--- /dev/null
+++ b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroupTez.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.tez.runtime.ObjectCache;
+import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.util.Pair;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+
+public class POMergeCogroupTez extends POMergeCogroup implements TezInput {
+
+    private static final Log LOG = LogFactory.getLog(POMergeJoinTez.class);
+    private static final long serialVersionUID = 1L;
+    private String inputKey;
+    private transient String cacheKey;
+    private transient KeyValueReader reader;
+    private transient List<Pair<Integer, Tuple>> index;
+
+    public POMergeCogroupTez(OperatorKey k, List<PhysicalOperator> inpPOs, POLocalRearrange[] lrs, int parallel) {
+        super(k, inpPOs, lrs, parallel);
+    }
+
+    public POMergeCogroupTez(POMergeCogroup copy) {
+        super(copy);
+    }
+
+    public void setInputKey(String inputKey) {
+        this.inputKey = inputKey;
+    }
+
+    @Override
+    public String[] getTezInputs() {
+        return new String[] { this.inputKey };
+    }
+
+    @Override
+    public void replaceInput(String oldInputKey, String newInputKey) {
+        this.inputKey = newInputKey;
+    }
+
+    @Override
+    public void addInputsToSkip(Set<String> inputsToSkip) {
+        cacheKey = "mergecogrp-" + inputKey;
+        Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey);
+        if (cacheValue != null) {
+            inputsToSkip.add(inputKey);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void attachInputs(Map<String, LogicalInput> inputs, Configuration conf) throws ExecException {
+        Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey);
+        if (cacheValue != null) {
+            this.index = (LinkedList<Pair<Integer, Tuple>>) cacheValue;
+            return;
+        }
+
+        LogicalInput input = inputs.get(inputKey);
+        if (input == null) {
+            throw new ExecException("Input from vertex " + inputKey + " is missing");
+        }
+
+        try {
+            input = inputs.get(this.inputKey);
+            reader = (KeyValueReader) input.getReader();
+            LOG.info(
+                    "Attached input from vertex " + this.inputKey + " : input=" + input + ", reader=" + reader);
+            index = new LinkedList<>();
+            while (reader.next()) {
+                Tuple origTuple = (Tuple) reader.getCurrentValue();
+                Tuple copyTuple = mTupleFactory.newTuple(origTuple.getAll());
+                addTupleToIndex(copyTuple, index);
+            }
+            ObjectCache.getInstance().cache(cacheKey, this.index);
+        }
+        catch (Exception e) {
+            throw new ExecException(e);
+        }
+    }
+
+    @Override
+    public String name() {
+        return super.name().replace("MergeCogroup", "MergeCogroupTez") + "\t<-\t " + this.inputKey;
+    }
+
+    @Override
+    protected List<Pair<Integer, Tuple>> readIndex() {
+        return this.index;
+    }
+
+    @Override
+    public POMergeCogroupTez clone() throws CloneNotSupportedException {
+        return (POMergeCogroupTez) super.clone();
+    }
+}
\ No newline at end of file
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
index 815a325..17d27b6 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
@@ -43,7 +43,6 @@
 import org.apache.pig.data.SchemaTupleClassGenerator.GenContext;
 import org.apache.pig.data.SchemaTupleFactory;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
 import org.apache.pig.data.TupleMaker;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.builtin.DefaultIndexableLoader;
@@ -55,6 +54,7 @@
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.MultiMap;
 import org.apache.pig.newplan.logical.relational.LOJoin;
+import org.apache.pig.newplan.logical.relational.LOJoin.JOINTYPE;
 
 /** This operator implements merge join algorithm to do map side joins.
  *  Currently, only two-way joins are supported. One input of join is identified as left
@@ -82,21 +82,21 @@
     //The Local Rearrange operators modeling the join key
     private POLocalRearrange[] LRs;
 
-    private transient LoadFunc rightLoader;
+    protected transient LoadFunc rightLoader;
     private OperatorKey opKey;
 
-    private Object prevLeftKey;
+    private transient Object prevLeftKey;
 
-    private Result prevLeftInp;
+    private transient Result prevLeftInp;
 
-    private Object prevRightKey = null;
+    private transient Object prevRightKey = null;
 
-    private Result prevRightInp;
+    private transient Result prevRightInp;
 
     //boolean denoting whether we are generating joined tuples in this getNext() call or do we need to read in more data.
-    private boolean doingJoin;
+    private transient boolean doingJoin;
 
-    private FuncSpec rightLoaderFuncSpec;
+    protected FuncSpec rightLoaderFuncSpec;
 
     private String rightInputFileName;
 
@@ -113,17 +113,17 @@
 
     private boolean noInnerPlanOnRightSide;
 
-    private Object curJoinKey;
+    private transient Object curJoinKey;
 
-    private Tuple curJoiningRightTup;
+    private transient Tuple curJoiningRightTup;
 
     private int counter; // # of tuples on left side with same key.
 
-    private int leftTupSize = -1;
+    private transient int leftTupSize;
 
-    private int rightTupSize = -1;
+    private transient int rightTupSize;
 
-    private int arrayListSize = 1024;
+    private static int ARRAY_LIST_SIZE = 1024;
 
     private LOJoin.JOINTYPE joinType;
 
@@ -147,10 +147,9 @@
     // Only for spark.
     // it means that current operator reaches at its end and the last left input was
     // added into 'leftTuples', ready for join.
-    private boolean leftInputConsumedInSpark = false;
+    private transient boolean leftInputConsumedInSpark = false;
 
     // This serves as the default TupleFactory
-    private transient TupleFactory mTupleFactory;
 
     /**
      * These TupleFactories are used for more efficient Tuple generation. This should
@@ -185,6 +184,25 @@
         this.mergedInputSchema = mergedInputSchema;
     }
 
+    public POMergeJoin(POMergeJoin copy) {
+        super(copy);
+        this.firstTime = copy.firstTime;
+        this.LRs = copy.LRs;
+        this.rightLoaderFuncSpec = copy.rightLoaderFuncSpec;
+        this.rightInputFileName = copy.rightInputFileName;
+        this.indexFile = copy.indexFile;
+        this.inpPlans = copy.inpPlans;
+        this.rightPipelineLeaf = copy.rightPipelineLeaf;
+        this.rightPipelineRoot = copy.rightPipelineRoot;
+        this.noInnerPlanOnRightSide = copy.noInnerPlanOnRightSide;
+        this.counter = copy.counter;
+        this.joinType = copy.joinType;
+        this.signature = copy.signature;
+        this.endOfRecordMark = copy.endOfRecordMark;
+        this.leftInputSchema = copy.leftInputSchema;
+        this.mergedInputSchema = copy.mergedInputSchema;
+    }
+
     /**
      * Configures the Local Rearrange operators to get keys out of tuple.
      * @throws ExecException
@@ -211,8 +229,6 @@
      * This is a helper method that sets up all of the TupleFactory members.
      */
     private void prepareTupleFactories() {
-        mTupleFactory = TupleFactory.getInstance();
-
         if (leftInputSchema != null) {
             leftTupleMaker = SchemaTupleBackend.newSchemaTupleFactory(leftInputSchema, false, GenContext.MERGE_JOIN);
         }
@@ -241,7 +257,7 @@
      * @return the list object to store Tuples in
      */
     private TuplesToSchemaTupleList newLeftTupleArray() {
-        return new TuplesToSchemaTupleList(arrayListSize, leftTupleMaker);
+        return new TuplesToSchemaTupleList(ARRAY_LIST_SIZE, leftTupleMaker);
     }
 
     /**
@@ -546,14 +562,8 @@
         }
     }
 
-    private void seekInRightStream(Object firstLeftKey) throws IOException{
-        rightLoader = (LoadFunc)PigContext.instantiateFuncFromSpec(rightLoaderFuncSpec);
-
-        // check if hadoop distributed cache is used
-        if (indexFile != null && rightLoader instanceof DefaultIndexableLoader) {
-            DefaultIndexableLoader loader = (DefaultIndexableLoader)rightLoader;
-            loader.setIndexFile(indexFile);
-        }
+    private void seekInRightStream(Object firstLeftKey) throws IOException {
+        rightLoader = getRightLoader();
 
         // Pass signature of the loader to rightLoader
         // make a copy of the conf to use in calls to rightLoader.
@@ -565,6 +575,23 @@
                 firstLeftKey instanceof Tuple ? (Tuple)firstLeftKey : mTupleFactory.newTuple(firstLeftKey));
     }
 
+    /**
+     * Instantiate right loader
+     *
+     * @return
+     * @throws IOException
+     * @throws ExecException
+     */
+    protected LoadFunc getRightLoader() throws ExecException, IOException {
+        LoadFunc loader = (LoadFunc) PigContext.instantiateFuncFromSpec(rightLoaderFuncSpec);
+        // check if hadoop distributed cache is used
+        if (indexFile != null && loader instanceof DefaultIndexableLoader) {
+            DefaultIndexableLoader defLoader = (DefaultIndexableLoader) loader;
+            defLoader.setIndexFile(indexFile);
+        }
+        return loader;
+    }
+
     private Result getNextRightInp(Object leftKey) throws ExecException{
 
         /*
@@ -668,7 +695,6 @@
     private void readObject(ObjectInputStream is) throws IOException, ClassNotFoundException, ExecException{
 
         is.defaultReadObject();
-        mTupleFactory = TupleFactory.getInstance();
     }
 
 
@@ -746,4 +772,26 @@
     public POLocalRearrange[] getLRs() {
         return LRs;
     }
+
+    @Override
+    public POMergeJoin clone() throws CloneNotSupportedException {
+        POMergeJoin clone = (POMergeJoin) super.clone();
+        clone.LRs = new POLocalRearrange[this.LRs.length];
+        for (int i = 0; i < this.LRs.length; i++) {
+            clone.LRs[i] = this.LRs[i].clone();
+        }
+        clone.rightLoaderFuncSpec = this.rightLoaderFuncSpec.clone();
+        clone.inpPlans = new MultiMap<PhysicalOperator, PhysicalPlan>();
+        for (PhysicalOperator op : this.inpPlans.keySet()) {
+            PhysicalOperator cloneOp = op.clone();
+            for (PhysicalPlan phyPlan : this.inpPlans.get(op)) {
+                clone.inpPlans.put(cloneOp, phyPlan.clone());
+            }
+        }
+        clone.rightPipelineLeaf = this.rightPipelineLeaf.clone();
+        clone.rightPipelineRoot = this.rightPipelineRoot.clone();
+        clone.leftInputSchema = this.leftInputSchema.clone();
+        clone.mergedInputSchema = this.mergedInputSchema.clone();
+        return clone;
+    }
 }
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoinTez.java b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoinTez.java
new file mode 100644
index 0000000..f2a5191
--- /dev/null
+++ b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoinTez.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.tez.runtime.ObjectCache;
+import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.builtin.TezIndexableLoader;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+
+public class POMergeJoinTez extends POMergeJoin implements TezInput {
+
+    private static final Log LOG = LogFactory.getLog(POMergeJoinTez.class);
+    private static final long serialVersionUID = 1L;
+    private String inputKey;
+    private transient String cacheKey;
+    private transient KeyValueReader reader;
+    private LinkedList<Tuple> index;
+
+    public POMergeJoinTez(POMergeJoin joinOp) {
+        super(joinOp);
+    }
+
+    public void setInputKey(String inputKey) {
+        this.inputKey = inputKey;
+    }
+
+    @Override
+    public String[] getTezInputs() {
+        return new String[] { this.inputKey };
+    }
+
+    @Override
+    public void replaceInput(String oldInputKey, String newInputKey) {
+        this.inputKey = newInputKey;
+    }
+
+    @Override
+    public void addInputsToSkip(Set<String> inputsToSkip) {
+        cacheKey = "merge-" + inputKey;
+        Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey);
+        if (cacheValue != null) {
+            inputsToSkip.add(inputKey);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void attachInputs(Map<String, LogicalInput> inputs, Configuration conf) throws ExecException {
+        Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey);
+        if (cacheValue != null) {
+            this.index = (LinkedList<Tuple>) cacheValue;
+            rightLoader = getRightLoader();
+            return;
+        }
+
+        LogicalInput input = inputs.get(inputKey);
+        if (input == null) {
+            throw new ExecException("Input from vertex " + inputKey + " is missing");
+        }
+
+        try {
+            reader = (KeyValueReader) input.getReader();
+            LOG.info(
+                    "Attached input from vertex " + this.inputKey + " : input=" + input + ", reader=" + reader);
+            this.index = new LinkedList<Tuple>();
+            while (reader.next()) {
+                Tuple origTuple = (Tuple) reader.getCurrentValue();
+                Tuple copy = mTupleFactory.newTuple(origTuple.getAll());
+                this.index.add(copy);
+            }
+            ObjectCache.getInstance().cache(cacheKey, this.index);
+            rightLoader = getRightLoader();
+        }
+        catch (Exception e) {
+            throw new ExecException(e);
+        }
+    }
+
+    @Override
+    public String name() {
+        return super.name().replace("MergeJoin", "MergeJoinTez") + "\t<-\t " + this.inputKey;
+    }
+
+    @Override
+    protected LoadFunc getRightLoader() throws ExecException {
+        LoadFunc loader = (LoadFunc) PigContext.instantiateFuncFromSpec(rightLoaderFuncSpec);
+        if (loader instanceof TezIndexableLoader) {
+            ((TezIndexableLoader) loader).setIndex(index);
+        }
+        return loader;
+    }
+
+    @Override
+    public POMergeJoinTez clone() throws CloneNotSupportedException {
+        return (POMergeJoinTez) super.clone();
+    }
+}
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
index 79739e9..2da8629 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
@@ -72,7 +72,9 @@
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroupTez;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoinTez;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PONative;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPoissonSample;
@@ -112,9 +114,9 @@
 import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.builtin.DefaultIndexableLoader;
 import org.apache.pig.impl.builtin.GetMemNumRows;
 import org.apache.pig.impl.builtin.PartitionSkewedKeys;
+import org.apache.pig.impl.builtin.TezIndexableLoader;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.io.NullableIntWritable;
@@ -925,7 +927,9 @@
     }
 
     @Override
-    public void visitMergeCoGroup(POMergeCogroup poCoGrp) throws VisitorException {
+    public void visitMergeCoGroup(POMergeCogroup poCoGroup) throws VisitorException {
+
+        POMergeCogroupTez poCoGrp = new POMergeCogroupTez(poCoGroup);
         if(compiledInputs.length < 2){
             int errCode=2251;
             String errMsg = "Merge Cogroup work on two or more relations." +
@@ -998,43 +1002,53 @@
 
             // Create new map-reduce operator for indexing job and then configure it.
             TezOperator indexerTezOp = getTezOp();
-            FileSpec idxFileSpec = getIndexingJob(indexerTezOp, baseMROp, poCoGrp.getLRInnerPlansOf(0));
-            poCoGrp.setIdxFuncSpec(idxFileSpec.getFuncSpec());
-            poCoGrp.setIndexFileName(idxFileSpec.getFileName());
+            configureIndexerOp(indexerTezOp, baseMROp, poCoGrp);
 
             baseMROp.plan.addAsLeaf(poCoGrp);
+            baseMROp.markMergeCogroup();
             for (FuncSpec funcSpec : funcSpecs)
                 baseMROp.UDFs.add(funcSpec.toString());
 
-            phyToTezOpMap.put(poCoGrp,baseMROp);
+            phyToTezOpMap.put(poCoGrp, baseMROp);
             // Going forward, new operators should be added in baseMRop. To make
             // sure, reset curMROp.
             curTezOp = baseMROp;
         }
-        catch (ExecException e){
-           throw new TezCompilerException(e.getDetailedMessage(),e.getErrorCode(),e.getErrorSource(),e);
+        catch (ExecException e) {
+            throw new TezCompilerException(e.getDetailedMessage(), e.getErrorCode(), e.getErrorSource(), e);
         }
-        catch (TezCompilerException mrce){
-            throw(mrce);
+        catch (TezCompilerException mrce) {
+            throw (mrce);
         }
         catch (CloneNotSupportedException e) {
             throw new TezCompilerException(e);
         }
-        catch(PlanException e){
+        catch (PlanException e) {
             int errCode = 2034;
             String msg = "Error compiling operator " + poCoGrp.getClass().getCanonicalName();
             throw new TezCompilerException(msg, errCode, PigException.BUG, e);
         }
-        catch (IOException e){
+        catch (IOException e) {
             int errCode = 3000;
             String errMsg = "IOException caught while compiling POMergeCoGroup";
-            throw new TezCompilerException(errMsg, errCode,e);
+            throw new TezCompilerException(errMsg, errCode, e);
         }
     }
 
-    // Sets up the indexing job for map-side cogroups.
-    private FileSpec getIndexingJob(TezOperator indexerTezOp,
-            final TezOperator baseTezOp, final List<PhysicalPlan> mapperLRInnerPlans)
+    /**
+     * Sets up the indexing vertex for map-side cogroups.
+     * 
+     * @param indexerTezOp
+     * @param baseTezOp
+     * @param poCoGrp
+     * @throws TezCompilerException
+     * @throws PlanException
+     * @throws ExecException
+     * @throws IOException
+     * @throws CloneNotSupportedException
+     */
+    private void configureIndexerOp(TezOperator indexerTezOp,
+            final TezOperator baseTezOp, final POMergeCogroupTez poCoGrp)
         throws TezCompilerException, PlanException, ExecException, IOException, CloneNotSupportedException {
 
         // First replace loader with  MergeJoinIndexer.
@@ -1054,7 +1068,7 @@
 
         String[] indexerArgs = new String[6];
         indexerArgs[0] = funcSpec.toString();
-        indexerArgs[1] = ObjectSerializer.serialize((Serializable)mapperLRInnerPlans);
+        indexerArgs[1] = ObjectSerializer.serialize((Serializable) poCoGrp.getLRInnerPlansOf(0));
         indexerArgs[3] = baseLoader.getSignature();
         indexerArgs[4] = baseLoader.getOperatorKey().scope;
         indexerArgs[5] = Boolean.toString(false); // we care for nulls.
@@ -1076,6 +1090,7 @@
         indexerArgs[2] = ObjectSerializer.serialize(phyPlan);
 
         POLoad idxJobLoader = new POLoad(new OperatorKey(scope,nig.getNextNodeId(scope)));
+        idxJobLoader.copyAliasFrom(baseLoader);
         idxJobLoader.setPc(pigContext);
         idxJobLoader.setIsTmpLoad(true);
         idxJobLoader.setLFile(new FileSpec(origLoaderFileSpec.getFileName(),
@@ -1092,19 +1107,19 @@
         tezPlan.add(indexAggrOper);
         tezPlan.add(indexerTezOp);
         TezCompilerUtil.simpleConnectTwoVertex(tezPlan, indexerTezOp, indexAggrOper, scope, nig);
-        TezCompilerUtil.connect(tezPlan, indexAggrOper, baseTezOp);
-        indexAggrOper.segmentBelow = true;
-
         indexerTezOp.setRequestedParallelism(1); // we need exactly one reducer for indexing job.
         indexerTezOp.setDontEstimateParallelism(true);
 
-        POStore st = TezCompilerUtil.getStore(scope, nig);
-        FileSpec strFile = getTempFileSpec(pigContext);
-        st.setSFile(strFile);
-        indexAggrOper.plan.addAsLeaf(st);
+        // Convert the index as a broadcast input
+        POValueOutputTez indexAggrOperOutput = new POValueOutputTez(OperatorKey.genOpKey(scope));
+        indexAggrOper.plan.addAsLeaf(indexAggrOperOutput);
+        indexAggrOperOutput.addOutputKey(baseTezOp.getOperatorKey().toString());
+        indexAggrOper.markIndexer();
         indexAggrOper.setClosed(true);
-
-        return strFile;
+        TezEdgeDescriptor edge = new TezEdgeDescriptor();
+        TezCompilerUtil.configureValueOnlyTupleOutput(edge, DataMovementType.BROADCAST);
+        TezCompilerUtil.connect(tezPlan, indexAggrOper, baseTezOp, edge);
+        poCoGrp.setInputKey(indexAggrOper.getOperatorKey().toString());
     }
 
     /** Since merge-join works on two inputs there are exactly two TezOper predecessors identified  as left and right.
@@ -1118,27 +1133,23 @@
      *                  in physical plan, that is yanked and set as inner plans of joinOp.
      *  2) LeftTezOper:  add the Join operator in it.
      *
-     *  We also need to segment the DAG into two, because POMergeJoin depends on the index file which loads with
-     *  DefaultIndexableLoader. It is possible to convert the index as a broadcast input, but that is costly
-     *  because a lot of logic is built into DefaultIndexableLoader. We can revisit it later.
      */
     @Override
     public void visitMergeJoin(POMergeJoin joinOp) throws VisitorException {
-
         try{
-            if(compiledInputs.length != 2 || joinOp.getInputs().size() != 2){
+            if (compiledInputs.length != 2 || joinOp.getInputs().size() != 2) {
                 int errCode=1101;
                 throw new TezCompilerException("Merge Join must have exactly two inputs. Found : "+compiledInputs.length, errCode);
             }
 
             curTezOp = phyToTezOpMap.get(joinOp.getInputs().get(0));
-
             TezOperator rightTezOpr = null;
             TezOperator rightTezOprAggr = null;
-            if(curTezOp.equals(compiledInputs[0]))
+            if (curTezOp.equals(compiledInputs[0])) {
                 rightTezOpr = compiledInputs[1];
-            else
+            } else {
                 rightTezOpr = compiledInputs[0];
+            }
 
             // We will first operate on right side which is indexer job.
             // First yank plan of the compiled right input and set that as an inner plan of right operator.
@@ -1220,9 +1231,11 @@
                     }
                 }
             } else {
+                joinOp = new POMergeJoinTez(joinOp);
                 LoadFunc loadFunc = rightLoader.getLoadFunc();
                 //Replacing POLoad with indexer is disabled for 'merge-sparse' joins.  While
-                //this feature would be useful, the current implementation of DefaultIndexableLoader
+                // this feature would be useful, the current implementation of
+                // DefaultIndexableLoader
                 //is not designed to handle multiple calls to seekNear.  Specifically, it rereads the entire index
                 //for each call.  Some refactoring of this class is required - and then the check below could be removed.
                 if (joinOp.getJoinType() == LOJoin.JOINTYPE.MERGESPARSE) {
@@ -1265,32 +1278,37 @@
                 rightTezOprAggr.setRequestedParallelism(1); // we need exactly one task for indexing job.
                 rightTezOprAggr.setDontEstimateParallelism(true);
 
-                POStore st = TezCompilerUtil.getStore(scope, nig);
-                FileSpec strFile = getTempFileSpec(pigContext);
-                st.setSFile(strFile);
-                rightTezOprAggr.plan.addAsLeaf(st);
-                rightTezOprAggr.setClosed(true);
-                rightTezOprAggr.segmentBelow = true;
+                // Convert the index as a broadcast input
+                POValueOutputTez rightTezOprAggrOutput = new POValueOutputTez(OperatorKey.genOpKey(scope));
+                rightTezOprAggr.plan.addAsLeaf(rightTezOprAggrOutput);
+                rightTezOprAggrOutput.addOutputKey(curTezOp.getOperatorKey().toString());
 
-                // set up the DefaultIndexableLoader for the join operator
-                String[] defaultIndexableLoaderArgs = new String[5];
-                defaultIndexableLoaderArgs[0] = origRightLoaderFileSpec.getFuncSpec().toString();
-                defaultIndexableLoaderArgs[1] = strFile.getFileName();
-                defaultIndexableLoaderArgs[2] = strFile.getFuncSpec().toString();
-                defaultIndexableLoaderArgs[3] = joinOp.getOperatorKey().scope;
-                defaultIndexableLoaderArgs[4] = origRightLoaderFileSpec.getFileName();
-                joinOp.setRightLoaderFuncSpec((new FuncSpec(DefaultIndexableLoader.class.getName(), defaultIndexableLoaderArgs)));
+                TezEdgeDescriptor edge = new TezEdgeDescriptor();
+                TezCompilerUtil.configureValueOnlyTupleOutput(edge, DataMovementType.BROADCAST);
+                TezCompilerUtil.connect(tezPlan, rightTezOprAggr, curTezOp, edge);
+
+                ((POMergeJoinTez) joinOp).setInputKey(rightTezOprAggr.getOperatorKey().toString());
+                // set up the TezIndexableLoader for the join operator
+                String[] tezIndexableLoaderArgs = new String[3];
+                tezIndexableLoaderArgs[0] = origRightLoaderFileSpec.getFuncSpec().toString();
+                tezIndexableLoaderArgs[1] = joinOp.getOperatorKey().scope;
+                tezIndexableLoaderArgs[2] = origRightLoaderFileSpec.getFileName();
+                joinOp.setRightLoaderFuncSpec(
+                        (new FuncSpec(TezIndexableLoader.class.getName(), tezIndexableLoaderArgs)));
                 joinOp.setRightInputFileName(origRightLoaderFileSpec.getFileName());
-
-                joinOp.setIndexFile(strFile.getFileName());
                 udfs.add(origRightLoaderFileSpec.getFuncSpec().toString());
             }
 
+            if(joinOp.getJoinType() == LOJoin.JOINTYPE.MERGESPARSE) {
+                curTezOp.markMergeSparseJoin();
+            } else {
+                curTezOp.markMergeJoin();
+            }
             // We are done with right side. Lets work on left now.
             // Join will be materialized in leftTezOper.
-            if(!curTezOp.isClosed()) // Life is easy
+            if (!curTezOp.isClosed()) {// Life is easy
                 curTezOp.plan.addAsLeaf(joinOp);
-
+            }
             else{
                 int errCode = 2022;
                 String msg = "Input plan has been closed. This is unexpected while compiling.";
@@ -1298,14 +1316,14 @@
             }
             if(rightTezOprAggr != null) {
                 rightTezOprAggr.markIndexer();
-                // We want to ensure indexing job runs prior to actual join job. So, connect them in order.
-                TezCompilerUtil.connect(tezPlan, rightTezOprAggr, curTezOp);
             }
+
             phyToTezOpMap.put(joinOp, curTezOp);
             // no combination of small splits as there is currently no way to guarantee the sortness
             // of the combined splits.
             curTezOp.noCombineSmallSplits();
             curTezOp.UDFs.addAll(udfs);
+
         }
         catch(PlanException e){
             int errCode = 2034;
@@ -2661,5 +2679,6 @@
     private TezOperator getTezOp() {
         return new TezOperator(OperatorKey.genOpKey(scope));
     }
+
 }
 
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
index 6e99d39..da95f04 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
@@ -185,7 +185,13 @@
         // Indicate if this job constructs bloom filter
         BUILDBLOOM,
         // Indicate if this job applies bloom filter
-        FILTERBLOOM;
+        FILTERBLOOM,
+        // Indicate if this job is a merge join job
+        MERGE_JOIN,
+        // Indicate if this job is a merge sparse join job
+        MERGE_SPARSE_JOIN,
+        // Indicate if this job is a merge cogroup job
+        MERGE_COGROUP;
     };
 
     // Features in the job/vertex. Mostly will be only one feature.
@@ -393,6 +399,14 @@
         feature.set(OPER_FEATURE.COGROUP.ordinal());
     }
 
+    public boolean isMergeCogroup() {
+        return feature.get(OPER_FEATURE.MERGE_COGROUP.ordinal());
+    }
+
+    public void markMergeCogroup() {
+        feature.set(OPER_FEATURE.MERGE_COGROUP.ordinal());
+    }
+
     public boolean isRegularJoin() {
         return feature.get(OPER_FEATURE.HASHJOIN.ordinal());
     }
@@ -473,6 +487,22 @@
         feature.set(OPER_FEATURE.FILTERBLOOM.ordinal());
     }
 
+    public boolean isMergeJoin() {
+        return feature.get(OPER_FEATURE.MERGE_JOIN.ordinal());
+    }
+
+    public void markMergeJoin() {
+        feature.set(OPER_FEATURE.MERGE_JOIN.ordinal());
+    }
+
+    public boolean isMergeSparseJoin() {
+        return feature.get(OPER_FEATURE.MERGE_SPARSE_JOIN.ordinal());
+    }
+
+    public void markMergeSparseJoin() {
+        feature.set(OPER_FEATURE.MERGE_SPARSE_JOIN.ordinal());
+    }
+
     public void copyFeatures(TezOperator copyFrom, List<OPER_FEATURE> excludeFeatures) {
         for (OPER_FEATURE opf : OPER_FEATURE.values()) {
             if (excludeFeatures != null && excludeFeatures.contains(opf)) {
diff --git a/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java b/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java
index a4688e4..f6d95df 100644
--- a/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java
+++ b/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java
@@ -71,8 +71,8 @@
 
     private LoadFunc loader;
     // Index is modeled as FIFO queue and LinkedList implements java Queue interface.
-    private LinkedList<Tuple> index;
-    private FuncSpec rightLoaderFuncSpec;
+    protected LinkedList<Tuple> index;
+    protected FuncSpec rightLoaderFuncSpec;
 
     private String scope;
     private Tuple dummyTuple = null;
@@ -94,6 +94,9 @@
         this.inpLocation = inputLocation;
     }
 
+    public DefaultIndexableLoader() {
+    }
+
     @SuppressWarnings("unchecked")
     @Override
     public void seekNear(Tuple keys) throws IOException{
@@ -113,16 +116,8 @@
         // there are multiple Join keys, the tuple itself represents
         // the join key
         Object firstLeftKey = (keys.size() == 1 ? keys.get(0): keys);
-        POLoad ld = new POLoad(genKey(), new FileSpec(indexFile, new FuncSpec(indexFileLoadFuncSpec)));
 
-        Properties props = ConfigurationUtil.getLocalFSProperties();
-        PigContext pc = new PigContext(ExecType.LOCAL, props);
-        ld.setPc(pc);
-        index = new LinkedList<Tuple>();
-        for(Result res=ld.getNextTuple();res.returnStatus!=POStatus.STATUS_EOP;res=ld.getNextTuple())
-            index.offer((Tuple) res.result);
-
-
+        loadIndex();
         Tuple prevIdxEntry = null;
         Tuple matchedEntry;
 
@@ -193,6 +188,22 @@
         initRightLoader(splitsAhead);
     }
 
+    /**
+     * Set indices as LinkedList from index file
+     *
+     * @throws ExecException
+     */
+    protected void loadIndex() throws ExecException {
+        POLoad ld = new POLoad(genKey(), new FileSpec(indexFile, new FuncSpec(indexFileLoadFuncSpec)));
+
+        Properties props = ConfigurationUtil.getLocalFSProperties();
+        PigContext pc = new PigContext(ExecType.LOCAL, props);
+        ld.setPc(pc);
+        index = new LinkedList<Tuple>();
+        for (Result res = ld.getNextTuple(); res.returnStatus != POStatus.STATUS_EOP; res = ld.getNextTuple())
+            index.offer((Tuple) res.result);
+    }
+
     private void initRightLoader(int [] splitsToBeRead) throws IOException{
         Properties properties = (Properties) ObjectSerializer
                 .deserialize(PigMapReduce.sJobConfInternal.get().get("pig.client.sys.props"));
diff --git a/src/org/apache/pig/impl/builtin/TezIndexableLoader.java b/src/org/apache/pig/impl/builtin/TezIndexableLoader.java
new file mode 100644
index 0000000..09069e0
--- /dev/null
+++ b/src/org/apache/pig/impl/builtin/TezIndexableLoader.java
@@ -0,0 +1,34 @@
+package org.apache.pig.impl.builtin;
+
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
+import org.apache.pig.data.Tuple;
+import org.apache.tez.runtime.api.LogicalInput;
+
+public class TezIndexableLoader extends DefaultIndexableLoader {
+
+    public TezIndexableLoader(String loaderFuncSpec, String scope, String inputLocation) {
+        super(loaderFuncSpec, null, null, scope, inputLocation);
+    }
+
+    @Override
+    public void loadIndex() throws ExecException {
+        // no op
+    }
+
+    /**
+     * Loads indices from provided LinkedList
+     *
+     * @param index
+     * @throws ExecException
+     */
+    public void setIndex(LinkedList<Tuple> index) throws ExecException {
+            this.index = index;
+    }
+}
diff --git a/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java b/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java
index 716f079..d9725df 100644
--- a/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java
+++ b/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java
@@ -290,6 +290,15 @@
                 if (tezOp.isLimit() || tezOp.isLimitAfterSort()) {
                     feature.set(PIG_FEATURE.LIMIT.ordinal());
                 }
+                if (tezOp.isMergeJoin()) {
+                    feature.set(PIG_FEATURE.MERGE_JOIN.ordinal());
+                }
+                if (tezOp.isMergeSparseJoin()) {
+                    feature.set(PIG_FEATURE.MERGE_SPARSE_JOIN.ordinal());
+                }
+                if (tezOp.isMergeCogroup()) {
+                    feature.set(PIG_FEATURE.MERGE_COGROUP.ordinal());
+                }
                 try {
                     new FeatureVisitor(tezOp.plan, feature).visit();
                 } catch (VisitorException e) {
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MergeCogroup-1.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MergeCogroup-1.gld
new file mode 100644
index 0000000..15d9716
--- /dev/null
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MergeCogroup-1.gld
@@ -0,0 +1,43 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-24	->	Tez vertex scope-32,
+Tez vertex scope-32	->	Tez vertex scope-22,
+Tez vertex scope-22
+
+Tez vertex scope-24
+# Plan on vertex
+Local Rearrange[tuple]{tuple}(false) - scope-34	->	 scope-32
+|   |
+|   Project[tuple][*] - scope-33
+|
+|---a: Load(file:///tmp/input1:org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MergeJoinIndexer('org.apache.pig.test.TestMapSideCogroup$DummyCollectableLoader','eNqtVb9v00AUfk2aNoRSWqAsVQfEr81ekTpAC60IuE1Eu5CJV/vqGM6+6/lcHAakLjCwwsCAxMDYfwIxMMPIhNiZGeHdOWncFIRE8WD53rt7P7573+f971BLFZx9iLvoZDrizpJS2POiVOd7nxdef8Q3VRhrwngaPWG5BICxx+PmTYfWhQodlOh3mSOj0NlC/xFLAqeLgRDSYTnzKaJIWBJGCW3p9tLIR+5hjylHckxSp923tWkFxTNWgUoHTlGg1vYS581EZtqDmpAxSg0XPUrqFkldSupGseSuKdxdy7iO1lAu5gouj5RmdtmUTksyhVqowxnrHjTiVSXilSBk6Q48hSrljO+yXqrhtGfQ6Seh+B5Mxh7DXUa+mZLPgEbO8bglbYiaBxPxPSG0XU14MB1viG09mmbKWjdF2VaPy+tcEtoXfteSvbFB4/12KhUYM1WQxVZhT88OL/g2pl3y1Sa/vP9w/sGnKlRWocEFBqvoEzBNOKG7iqVdwYNcXr9RBH1cp9eM+cptTY2+Y+rAYSbi0l9hJ0RL93wHKlFAQKe+kEzDmQJM2h+6G1pFSbiYD7qa1f1tlOb+cQaPppi6S8k9qInGsNVW4iHz9bC2iQ5MR2nfTJeRBB1oiF2mDFKMFnNSCd8ESsJlDFvbm5nkLO3AOYpOF7JBds6skdxNqKcalb4pOE2PL3gWJzQ9c6XpOeCdmd+t/93hyhHbsFWghN5xEg5YfDT0VAdq6PtZTIyODJOXtDY5gqbBaScjCrGgjQo5ZzxK42VoFPBt9iSjwUAeoZ23aVrYABrmRyUgQI2uRZrYN2F3DRh3ilO1GLJNhT5TGq6OnpUsGZC3tJHizAoVUavUovDRdD4IOSkyXcrQkKhYoo2caGiPhu/D6BYwuiMwuodgdK0kumVJpDqqysjMrWMFvmchNXN14a8EHd6dFRHiq4YrvxVdc8wt8Zq4ajA5DfCTnoakx4JU6Me41Q5ptGj5OLNW9FLSkGWYUkxnKtnQqLOUJqCYoMNy0toyPKYSKwNBgYPihn+0QiXNAaeZaBYydebb23c/9p5fq5ifYG0XecYIxpnhvvUs3mLq2f6rhZMvv74wZLIJ8nw0vFnO/1E8rdfClx8BzRrqh04e0eN/doPMfwFKyneZ','','a_1-0','scope','false')) - scope-31
+Tez vertex scope-32
+# Plan on vertex
+POValueOutputTez - scope-38	->	 [scope-22]
+|
+|---New For Each(true)[bag] - scope-37
+    |   |
+    |   Project[tuple][1] - scope-36
+    |
+    |---Package(Packager)[tuple]{tuple} - scope-35
+Tez vertex scope-22
+# Plan on vertex
+c: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-21
+|
+|---c: MergeCogroupTez[tuple] - scope-20	<-	 scope-32
+    |
+    |---a: New For Each(false,false)[bag] - scope-7
+        |   |
+        |   Cast[int] - scope-2
+        |   |
+        |   |---Project[bytearray][0] - scope-1
+        |   |
+        |   Cast[int] - scope-5
+        |   |
+        |   |---Project[bytearray][1] - scope-4
+        |
+        |---a: Load(file:///tmp/input1:org.apache.pig.test.TestMapSideCogroup$DummyCollectableLoader) - scope-0
\ No newline at end of file
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MergeJoin-1.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MergeJoin-1.gld
new file mode 100644
index 0000000..5111019
--- /dev/null
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MergeJoin-1.gld
@@ -0,0 +1,51 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-30	->	Tez vertex scope-37,
+Tez vertex scope-37	->	Tez vertex scope-29,
+Tez vertex scope-29
+
+Tez vertex scope-30
+# Plan on vertex
+Local Rearrange[tuple]{tuple}(false) - scope-39	->	 scope-37
+|   |
+|   Project[tuple][*] - scope-38
+|
+|---b: Load(file:///tmp/input2:org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MergeJoinIndexer('org.apache.pig.builtin.PigStorage','eNqtVb9v00AUfk2aNoRSWqAsVQfEr81ekTpAC60IuE1Eu5CJV/vqGM6+6/lcHAakLjCwwsCAxMDYfwIxMMPIhNiZGeHdOWncFIRE8WD53rt7P7573+f971BLFZx9iLvoZDrizpJS2POiVOd7nxdef8Q3VRhrwngaPWG5BICxx+PmTYfWhQodlOh3mSOj0NlC/xFLAqeLgRDSYTnzKaJIWBJGCW3p9tLIR+5hjylHckxSp923tWkFxTNWgUoHTlGg1vYS581EZtqDmpAxSg0XPUrqFkldSupGseSuKdxdy7iO1lAu5gouj5RmdtmUTksyhVqowxnrHjTiVSXilSBk6Q48hSrljO+yXqrhtGfQ6Seh+B5Mxh7DXUa+mZLPgEbO8bglbYiaBxPxPSG0XU14MB1viG09mmbKWjdF2VaPy+tcEtoXfteSvbFB4/12KhUYM1WQxVZhT88OL/g2pl3y1Sa/vP9w/sGnKlRWocEFBqvoEzBNOKG7iqVdwYNcXr9RBH1cp9eM+cptTY2+Y+rAYSbi0l9hJ0RL93wHKlFAQKe+kEzDmQJM2h+6G1pFSbiYD7qa1f1tlOb+cQaPppi6S8k9qInGsNVW4iHz9bC2iQ5MR2nfTJeRBB1oiF2mDFKMFnNSCd8ESsJlDFvbm5nkLO3AOYpOF7JBds6skdxNqKcalb4pOE2PL3gWJzQ9c6XpOeCdmd+t/93hyhHbsFWghN5xEg5YfDT0VAdq6PtZTIyODJOXtDY5gqbBaScjCrGgjQo5ZzxK42VoFPBt9iSjwUAeoZ23aVrYABrmRyUgQI2uRZrYN2F3DRh3ilO1GLJNhT5TGq6OnpUsGZC3tJHizAoVUavUovDRdD4IOSkyXcrQkKhYoo2caGiPhu/D6BYwuiMwuodgdK0kumVJpDqqysjMrWMFvmchNXN14a8EHd6dFRHiq4YrvxVdc8wt8Zq4ajA5DfCTnoakx4JU6Me41Q5ptGj5OLNW9FLSkGWYUkxnKtnQqLOUJqCYoMNy0toyPKYSKwNBgYPihn+0QiXNAaeZaBYydebb23c/9p5fq5ifYG0XecYIxpnhvvUs3mLq2f6rhZMvv74wZLIJ8nw0vFnO/1E8rdfClx8BzRrqh04e0eN/doPMfwFKyneZ','eNq9V01sG0UUftk4ieukP2nTVgiFOjTApfKWn0NRiiAusTC4cVRboDqqlPF67Gy7uzOdnU3XPSB6gQMSXEACCSSEOPbEiSviwIUeQEJCnKre4QISCAmVN7O79tpxk6IEfFh7Zt6+3+9983z7F5jwBawy0SkQTqxNWuB2p9Ak1jXqtQqbpMUYL9CQWoG0mUe9ju2hyGbXty3iVEiXigJ3iOcX1uK9NVxB9BkzwGjAQVRUbS87TtnjgazABOMu4RJOV9CoGRk10ahpu9wx0YxjXgwcaV8kfCkU8MSQa0pKmyxUORVEMjFoMVuBnFsSzF1pdah/Hd6EMbTpvka7voTDlatki8RGUH8FptwKJVsUz46kziq2L/Ew41a5VjFegUn3EmNSrzIVOOTWWFsOm5nRu3WW3su66XXIMdsLo0JSdgtJ4HE4hqE0ZFzc0V7ot2eVm5H4K8TfxLOJqZ+//ubExvfjYJQg5zDSKhELE1OGA3JTUH+TOa2Qv/iS1jlzI4vPI+pnqH3KbjsYQyuLu6YdM5qq86tg2C1MtG8xTiUcjZKJ8h2zJoXtdZbCJKpFGYuhmct7AZ6gDlGHxEl8QhhWS0ysoL6+bzMNmLX9FSKcbp0K1/aIpK0GHLjmsRtezb5JGzCNiKzZLVr1nG4ZpjxWltT1K3ByALtrglnU9zEYCcdSARYZcyjxEDA5W8uphtBAWYc526+zIi2hp5J6tLUsBOlKMNYbCBfG9eEl6lMZy0+rHCMk24g8CdX14SaJE2RGCTKHEmQOJMhMejJJzxLqRzwgyOpdrjBvrBcR2T4qd4mEsyM70mGdvsZI1KzpLxWwDLhDL5JrVEjID7/fIpKY9Z6EaufKXgo+HM9AjSeIZQUu8o2uwbKUykarDHOCXg8wWNpaI4I4DnVs3y1Crp8IhC1xbKJLNoMLrUDCow8OByOf1FIJHxx00FvSoXVBLJWKp4bf5dRLqCUliHpmmbAxVAyRWRrNicopFsiUhRwngnoaWxLW9gQLTdhmmrDRj3GhAPHynhRf0ilVZV7YlT76tdMUh2wi4cmRAFSvmSnWQSZROTkIcB8/oYSxJueKGY/1mVG3mWLx8NYP8x9/Sz4dh7EyZHzsds1DYzcyMdFd2U9ALlaHirn83p3V2Z/+/siAcTSvQFKGSdZuY8MPoC7k9+OPWj4W6niKe3EtqkWKoYswI6gMhFeTRAZIbpNRBwySdbV5lVpYQm7EdG30rhx1Xoi57k5evPXjJ3/9itXDztsiTqDyqmM5pV7S6TX0Otqd3OGu2dMlgHaFImXmpS8BpGoVRz/8yQYcsv14G29kD2+AHNuiQl2X6jo4znvsXiSdalv3ud9Q9KGyVMN9h+pNPC5D1pdEyAvMwUa1mBO4HnbP8dQI0UOgaofmfke4sm2vHyqEKtUndMKn4+28WhyKOuYZrtolKRXoUkEYb51O6g79avabJQWEsidph4qj9z774o9b75wzVH/FQBBwpC+3GrhNKt6+/eH89Ad331Xe8f7sMaj99f3HwQXip0CQwdvBwp1VJlcpxbJXYLptU6dVi2/AZ4cJCLuIBcKikcBisiz1X0LqzLYDz6pxhbdHhhWU4iMlJihx1H3TmzWj6aEr6ZIuWWGgZAupkuVSBYvzpZZmuG1brxcSMkFuDNOVxSQ/PZTk3SNM5a8IGakvzOkW9S1hc1WO+NrMeMSl8e/eSHFq54Qiz+TwnwAJJOvgdCTUZJZv41SdX7M7ee1DPpJUipcVKZ4c8j9JcH9kxmH9gOUQ319NHFqHrJqHl0UH0XBsfftsWoHD0dSAEn17cyH+TRm+yZqBjTO6V0AHa6gTr3LspvBBo7Shts/qWqnHC+pxPhxVy/M7qjB7es5rPeG2/h3BsjA86v+r4xRHa7ujARgCfyiWV/tnBvD9+H5Qklo/p36FI4/ODVg8vXtHXX6Ijro50FHqx4WcepbUg+rZpLwLIhralnpooG2MRMTGjiou9/RsaD3/PyI2RicQEREGQv3DeeP9/PNzX125l5C+kSpg4iRKRn9y9m0GW/r8zInm753vvuyZ7bXNhvarePu3k39OZut3ewI5RMPIEsyHOyVgXo+hO73432QfMMX/ACAlM/I=','b_1-1','scope','true')) - scope-8
+Tez vertex scope-37
+# Plan on vertex
+POValueOutputTez - scope-43	->	 [scope-29]
+|
+|---New For Each(true)[bag] - scope-42
+    |   |
+    |   Project[tuple][1] - scope-41
+    |
+    |---Package(Packager)[tuple]{tuple} - scope-40
+Tez vertex scope-29
+# Plan on vertex
+d: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-28
+|
+|---d: New For Each(false,false,false)[bag] - scope-27
+    |   |
+    |   Project[int][0] - scope-21
+    |   |
+    |   Project[int][1] - scope-23
+    |   |
+    |   Project[int][3] - scope-25
+    |
+    |---c: MergeJoinTez[tuple] - scope-18	<-	 scope-37
+        |
+        |---a: New For Each(false,false)[bag] - scope-7
+            |   |
+            |   Cast[int] - scope-2
+            |   |
+            |   |---Project[bytearray][0] - scope-1
+            |   |
+            |   Cast[int] - scope-5
+            |   |
+            |   |---Project[bytearray][1] - scope-4
+            |
+            |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
\ No newline at end of file
diff --git a/test/org/apache/pig/tez/TestTezCompiler.java b/test/org/apache/pig/tez/TestTezCompiler.java
index f99d6f3..d1617e3 100644
--- a/test/org/apache/pig/tez/TestTezCompiler.java
+++ b/test/org/apache/pig/tez/TestTezCompiler.java
@@ -49,6 +49,8 @@
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.test.TestMultiQueryBasic.DummyStoreWithOutputFormat;
 import org.apache.pig.test.Util;
+import org.apache.pig.test.TestMapSideCogroup.DummyCollectableLoader;
+import org.apache.pig.test.TestMapSideCogroup.DummyIndexableLoader;
 import org.apache.pig.test.utils.TestHelper;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -269,6 +271,30 @@
     }
 
     @Test
+    public void testMergeJoin() throws Exception {
+        String query =
+                "a = load 'file:///tmp/input1' as (x:int, y:int);" +
+                "b = load 'file:///tmp/input2' as (x:int, z:int);" +
+                "c = join a by x, b by x using 'merge';" +
+                "d = foreach c generate a::x as x, y, z;" +
+                "store d into 'file:///tmp/pigoutput';";
+
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MergeJoin-1.gld");
+    }
+
+    @Test
+    public void testMergeCogroup() throws Exception {
+        String query =
+                "a = load 'file:///tmp/input1' using "+ DummyCollectableLoader.class.getName() +"() as (x:int, y:int);" +
+                "b = load 'file:///tmp/input2' using " + DummyIndexableLoader.class.getName()+"() as (x:int, z:int);" +
+                "c = cogroup a by x, b by x using 'merge';" +
+                "store c into 'file:///tmp/pigoutput';";
+
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MergeCogroup-1.gld");
+    }
+
+
+    @Test
     public void testBloomJoin() throws Exception {
         String query =
                 "a = load 'file:///tmp/input1' as (x, y:int);" +
@@ -1429,6 +1455,7 @@
 
         String goldenPlanClean = Util.standardizeNewline(goldenPlan).trim();
         String compiledPlanClean = Util.standardizeNewline(compiledPlan).trim();
+
         assertEquals(TestHelper.sortUDFs(Util.removeSignature(goldenPlanClean)),
                 TestHelper.sortUDFs(Util.removeSignature(compiledPlanClean)));
     }