PIG-5342: Add setting to turn off bloom join combiner (satishsaley via rohini)

git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1842768 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 16d4553..bd52721 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -26,6 +26,8 @@
  
 IMPROVEMENTS
 
+PIG-5342: Add setting to turn off bloom join combiner (satishsaley via rohini)
+
 PIG-5349: Log stderr output when shell command fail (knoguchi)
 
 PIG-3038: Support for Credentials for UDF,Loader and Storer (satishsaley via rohini)
diff --git a/src/docs/src/documentation/content/xdocs/perf.xml b/src/docs/src/documentation/content/xdocs/perf.xml
index 2b70365..d6fd012 100644
--- a/src/docs/src/documentation/content/xdocs/perf.xml
+++ b/src/docs/src/documentation/content/xdocs/perf.xml
@@ -1210,7 +1210,8 @@
 used to filter records of the other relations before doing a regular hash join.
 The amount of data sent to the reducers will be a lot less depending up on the numbers of records that are filtered on the map side.
 Bloom join is very useful in cases where the number of matching records between relations in a join are comparatively less
-compared to the total records allowing many to be filtered before the join.
+compared to the total records allowing many to be filtered before the join. Bloom join is also ideal in cases of right outer join
+with smaller dataset on the right which is not supported by replicated join.
 Before bloom join was added as a type of join, same functionality was achieved by users by using
 the <a href="func.html#bloom">builtin bloom udfs</a> which is not as efficient and required more lines of code as well.
 Currently bloom join is only implemented in Tez execution mode. Builtin bloom udfs have to be used for other execution modes.</p>
@@ -1287,9 +1288,10 @@
 <li>pig.bloomjoin.num.filters - The number of bloom filters that will be created. Default is 1 for map strategy and 11 for reduce strategy.</li>
 <li>pig.bloomjoin.vectorsize.bytes - The size in bytes of the bit vector to be used for the bloom filter.
 A bigger vector size will be needed when the number of distinct keys is higher. Default value is 1048576 (1MB).</li>
-<li>pig.bloomjoin.hash.functions - The type of hash function to use. Valid values are 'jenkins' and 'murmur'. Default is murmur.</li>
-<li>pig.bloomjoin.hash.types - The number of hash functions to be used in bloom computation. It determines the probability of false positives.
+<li>pig.bloomjoin.hash.type - The type of hash function to use. Valid values are 'jenkins' and 'murmur'. Default is murmur.</li>
+<li>pig.bloomjoin.hash.functions - The number of hash functions to be used in bloom computation. It determines the probability of false positives.
 Higher the number lower the false positives. Too high a value can increase the cpu time. Default value is 3.</li>
+<li>pig.bloomjoin.nocombiner - To turn off combiner when most of the keys are unique. Default is false.</li>
 </ul>
 </section>
 
diff --git a/src/docs/src/documentation/content/xdocs/udf.xml b/src/docs/src/documentation/content/xdocs/udf.xml
index 5052c98..9c4e076 100644
--- a/src/docs/src/documentation/content/xdocs/udf.xml
+++ b/src/docs/src/documentation/content/xdocs/udf.xml
@@ -186,7 +186,7 @@
         public Tuple exec(Tuple input) throws IOException {return TupleFactory.getInstance().newTuple(sum(input));}
     }
     static public class Final extends EvalFunc&lt;Long&gt; {
-        public Tuple exec(Tuple input) throws IOException {return sum(input);}
+        public Long exec(Tuple input) throws IOException {return sum(input);}
     }
     static protected Long count(Tuple input) throws ExecException {
         Object values = input.get(0);
diff --git a/src/org/apache/pig/PigConfiguration.java b/src/org/apache/pig/PigConfiguration.java
index ac32a5b..1e3fde2 100644
--- a/src/org/apache/pig/PigConfiguration.java
+++ b/src/org/apache/pig/PigConfiguration.java
@@ -204,6 +204,10 @@
     public static final String PIG_BLOOMJOIN_HASH_FUNCTIONS = "pig.bloomjoin.hash.functions";
 
     /**
+     * To turn off combiner when most of the keys are unique.
+     */
+    public static final String PIG_BLOOMJOIN_NOCOMBINER = "pig.bloomjoin.nocombiner";
+    /**
      * This key used to control the maximum size loaded into
      * the distributed cache when doing fragment-replicated join
      */
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java
index c082044..89106e1 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java
@@ -26,10 +26,12 @@
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPoissonSample;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POReservoirSample;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.BloomPackager;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POBuildBloomRearrangeTez;
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.VisitorException;
@@ -132,6 +134,12 @@
             super.visitLocalRearrange(lr);
         }
 
+        @Override
+        public void visitPackage(POPackage pkg) throws VisitorException {
+            if (pkg.getPkgr() instanceof BloomPackager) {
+                endOfAllInputFlag = true;
+            }
+        }
         /**
          * @return if end of all input is present
          */
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
index 2da8629..26ec5d7 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
@@ -49,6 +49,7 @@
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigTupleWritableComparator;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MergeJoinIndexer;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSecondaryKeyComparator;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigWritableComparators;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.ScalarPhyFinder;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.UDFFinder;
@@ -109,6 +110,7 @@
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.IsFirstReduceOfKeyTez;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.PartitionSkewedKeysTez;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez;
+import org.apache.pig.backend.hadoop.executionengine.tez.runtime.BloomFilterPartitioner;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.SkewedPartitionerTez;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.WeightedRangePartitionerTez;
 import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil;
@@ -119,6 +121,7 @@
 import org.apache.pig.impl.builtin.TezIndexableLoader;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.io.NullableBytesWritable;
 import org.apache.pig.impl.io.NullableIntWritable;
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.NodeIdGenerator;
@@ -134,6 +137,7 @@
 import org.apache.pig.impl.util.Utils;
 import org.apache.pig.newplan.logical.relational.LOJoin;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.input.OrderedGroupedKVInput;
 import org.apache.tez.runtime.library.input.UnorderedKVInput;
 import org.apache.tez.runtime.library.output.UnorderedKVOutput;
@@ -1452,46 +1456,45 @@
 
         POPackage pkg = new POPackage(OperatorKey.genOpKey(scope));
         pkg.setNumInps(1);
-        BloomPackager pkgr = new BloomPackager(createBloomInMap, vectorSizeBytes, numHash, hashType);;
-        pkgr.setKeyType(DataType.INTEGER);
+        BloomPackager pkgr = new BloomPackager(createBloomInMap, numBloomFilters, vectorSizeBytes, numHash, hashType);
         pkg.setPkgr(pkgr);
         POValueOutputTez combineBloomOutput = new POValueOutputTez(OperatorKey.genOpKey(scope));
         combineBloomOp.plan.addAsLeaf(pkg);
         combineBloomOp.plan.addAsLeaf(combineBloomOutput);
 
-        edge.setIntermediateOutputKeyClass(NullableIntWritable.class.getName());
-        edge.setIntermediateOutputKeyComparatorClass(PigWritableComparators.PigIntRawBytesComparator.class.getName());
-
-        // Add combiner as well.
-        POPackage pkg_c = new POPackage(OperatorKey.genOpKey(scope));
-        BloomPackager combinerPkgr = new BloomPackager(createBloomInMap, vectorSizeBytes, numHash, hashType);
-        combinerPkgr.setCombiner(true);
-        combinerPkgr.setKeyType(DataType.INTEGER);
-        pkg_c.setPkgr(combinerPkgr);
-        pkg_c.setNumInps(1);
-        edge.combinePlan.addAsLeaf(pkg_c);
-        POProject prjKey = new POProject(OperatorKey.genOpKey(scope));
-        prjKey.setResultType(DataType.INTEGER);
-        List<PhysicalPlan> clrInps = new ArrayList<PhysicalPlan>();
-        PhysicalPlan pp = new PhysicalPlan();
-        pp.add(prjKey);
-        clrInps.add(pp);
-        POLocalRearrangeTez clr = localRearrangeFactory.create(0, LocalRearrangeType.WITHPLAN, clrInps, DataType.INTEGER);
-        clr.setOutputKey(combineBloomOpKey);
-        edge.combinePlan.addAsLeaf(clr);
-
         if (createBloomInMap) {
+            pkgr.setKeyType(DataType.INTEGER);
+            edge.setIntermediateOutputKeyClass(NullableIntWritable.class.getName());
+            edge.setIntermediateOutputKeyComparatorClass(
+            PigWritableComparators.PigIntRawBytesComparator.class.getName());
+            // Add combiner as well. Each of the bloom filter is 1 MB by default. When there are
+            // 100s of mappers producing bloom filter, it is better to have combiner
+            // on the reduce side.
+            POPackage pkg_c = new POPackage(OperatorKey.genOpKey(scope));
+            pkg_c.setPkgr(new BloomPackager(createBloomInMap, numBloomFilters, vectorSizeBytes, numHash, hashType));
+            pkg_c.getPkgr().setKeyType(DataType.INTEGER);
+            pkg_c.setNumInps(1);
+            edge.combinePlan.addAsLeaf(pkg_c);
+            POProject prjKey = new POProject(OperatorKey.genOpKey(scope));
+            prjKey.setResultType(DataType.INTEGER);
+            List<PhysicalPlan> clrInps = new ArrayList<PhysicalPlan>();
+            PhysicalPlan pp = new PhysicalPlan();
+            pp.add(prjKey);
+            clrInps.add(pp);
+            POLocalRearrangeTez clr = localRearrangeFactory.create(0, LocalRearrangeType.WITHPLAN, clrInps, DataType.INTEGER);
+            clr.setOutputKey(combineBloomOpKey);
+            edge.combinePlan.addAsLeaf(clr);
             // No combiner needed on map as there will be only one bloom filter per map for each partition
             // In the reducer, the bloom filters will be combined with same logic of reduce in BloomPackager
             edge.setCombinerInMap(false);
             edge.setCombinerInReducer(true);
         } else {
-            pkgr.setBloomKeyType(op.getPkgr().getKeyType());
+            pkgr.setKeyType(DataType.BYTEARRAY);
+            edge.setIntermediateOutputKeyClass(NullableBytesWritable.class.getName());
+            edge.setIntermediateOutputKeyComparatorClass(PigWritableComparators.PigBytesRawBytesComparator.class.getName());
+            edge.partitionerClass = BloomFilterPartitioner.class;
             // Do distinct of the keys on the map side to reduce data sent to reducers.
-            // In case of reduce, not adding a combiner and doing the distinct during reduce itself.
-            // If needed one can be added later
-            edge.setCombinerInMap(true);
-            edge.setCombinerInReducer(false);
+            edge.setNeedsDistinctCombiner(!conf.getBoolean(PigConfiguration.PIG_BLOOMJOIN_NOCOMBINER, false));
         }
 
         // Broadcast the final bloom filter to other inputs
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java
index 7e3e325..1d6f784 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java
@@ -20,7 +20,6 @@
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.util.HashSet;
 import java.util.Iterator;
 
 import org.apache.hadoop.util.bloom.BloomFilter;
@@ -38,32 +37,27 @@
 public class BloomPackager extends Packager {
 
     private static final long serialVersionUID = 1L;
+    private static final Result RESULT_EMPTY = new Result(POStatus.STATUS_NULL, null);
+    private static final Result RESULT_EOP = new Result(POStatus.STATUS_EOP, null);
 
     private boolean bloomCreatedInMap;
     private int vectorSizeBytes;
+    private int numBloomFilters;
     private int numHash;
     private int hashType;
-    private byte bloomKeyType;
-    private boolean isCombiner;
 
     private transient ByteArrayOutputStream baos;
-    private transient Iterator<Object> distinctKeyIter;
+    private transient BloomFilter[] bloomFilters;
+    private transient int nextFilterIdx;
 
-    public BloomPackager(boolean bloomCreatedInMap, int vectorSizeBytes,
+    public BloomPackager(boolean bloomCreatedInMap, int numBloomFilters, int vectorSizeBytes,
             int numHash, int hashType) {
         super();
         this.bloomCreatedInMap = bloomCreatedInMap;
         this.vectorSizeBytes = vectorSizeBytes;
         this.numHash = numHash;
         this.hashType = hashType;
-    }
-
-    public void setBloomKeyType(byte keyType) {
-        bloomKeyType = keyType;
-    }
-
-    public void setCombiner(boolean isCombiner) {
-        this.isCombiner = isCombiner;
+        this.numBloomFilters = numBloomFilters;
     }
 
     @Override
@@ -78,21 +72,26 @@
     @Override
     public Result getNext() throws ExecException {
         try {
+            if (bags == null) {
+                return new Result(POStatus.STATUS_EOP, null);
+            }
             if (bloomCreatedInMap) {
-                if (bags == null) {
-                    return new Result(POStatus.STATUS_EOP, null);
-                }
-                // Same function for combiner and reducer
                 return combineBloomFilters();
             } else {
-                if (isCombiner) {
-                    return getDistinctBloomKeys();
-                } else {
-                    if (bags == null) {
-                        return new Result(POStatus.STATUS_EOP, null);
-                    }
-                    return createBloomFilter();
+                if (parent.isEndOfAllInput()) {
+                    return retrieveBloomFilter();
                 }
+                if (!bags[0].iterator().hasNext()) {
+                    return new Result(POStatus.STATUS_EOP, null);
+                }
+                if (bloomFilters == null) { // init
+                    bloomFilters = new BloomFilter[numBloomFilters];
+                }
+                // Create the bloom filters from the keys
+                Tuple tup = bags[0].iterator().next();
+                addKeyToBloomFilter(key, (int) tup.get(0));
+                detachInput();
+                return RESULT_EMPTY;
             }
         } catch (IOException e) {
             throw new ExecException("Error while constructing final bloom filter", e);
@@ -116,28 +115,6 @@
         return getSerializedBloomFilter(partition, bloomFilter, bloomBytes.get().length);
     }
 
-    private Result createBloomFilter() throws IOException {
-        // We get a bag of keys. Create a bloom filter from them
-        // First do distinct of the keys. Not using DistinctBag as memory should not be a problem.
-        HashSet<Object> bloomKeys = new HashSet<>();
-        Iterator<Tuple> iter = bags[0].iterator();
-        while (iter.hasNext()) {
-            bloomKeys.add(iter.next().get(0));
-        }
-
-        Object partition = key;
-        detachInput(); // Free up the key and bags reference
-
-        BloomFilter bloomFilter = new BloomFilter(vectorSizeBytes * 8, numHash, hashType);
-        for (Object bloomKey: bloomKeys) {
-            Key k = new Key(DataType.toBytes(bloomKey, bloomKeyType));
-            bloomFilter.add(k);
-        }
-        bloomKeys = null;
-        return getSerializedBloomFilter(partition, bloomFilter, vectorSizeBytes + 64);
-
-    }
-
     private Result getSerializedBloomFilter(Object partition,
             BloomFilter bloomFilter, int serializedSize) throws ExecException,
             IOException {
@@ -159,26 +136,28 @@
         return r;
     }
 
-    private Result getDistinctBloomKeys() throws ExecException {
-        if (distinctKeyIter == null) {
-            HashSet<Object> bloomKeys = new HashSet<>();
-            Iterator<Tuple> iter = bags[0].iterator();
-            while (iter.hasNext()) {
-                bloomKeys.add(iter.next().get(0));
-            }
-            distinctKeyIter = bloomKeys.iterator();
+    private void addKeyToBloomFilter(Object key, int partition) throws ExecException {
+        Key k = new Key(((DataByteArray)key).get());
+        BloomFilter filter = bloomFilters[partition];
+        if (filter == null) {
+            filter = new BloomFilter(vectorSizeBytes * 8, numHash, hashType);
+            bloomFilters[partition] = filter;
         }
-        while (distinctKeyIter.hasNext()) {
-            Tuple res = mTupleFactory.newTuple(2);
-            res.set(0, key);
-            res.set(1, distinctKeyIter.next());
-
-            Result r = new Result();
-            r.result = res;
-            r.returnStatus = POStatus.STATUS_OK;
-            return r;
-        }
-        distinctKeyIter = null;
-        return new Result(POStatus.STATUS_EOP, null);
+        filter.add(k);
     }
-}
+
+    private Result retrieveBloomFilter() throws IOException  {
+        while (nextFilterIdx < numBloomFilters) {
+            if (bloomFilters[nextFilterIdx] != null) {
+                return getSerializedBloomFilter(nextFilterIdx, bloomFilters[nextFilterIdx++], vectorSizeBytes + 64);
+            } else {
+                nextFilterIdx++;
+            }
+        }
+        return RESULT_EOP;
+    }
+
+    public boolean isBloomCreatedInMap() {
+        return bloomCreatedInMap;
+    }
+}
\ No newline at end of file
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBuildBloomRearrangeTez.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBuildBloomRearrangeTez.java
index eb8a612..4045942 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBuildBloomRearrangeTez.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBuildBloomRearrangeTez.java
@@ -37,6 +37,7 @@
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.NullableBytesWritable;
 import org.apache.pig.impl.io.NullableIntWritable;
 import org.apache.pig.impl.io.NullableTuple;
 import org.apache.pig.impl.io.PigNullableWritable;
@@ -75,8 +76,7 @@
     private transient BloomFilter[] bloomFilters;
     private transient KeyValueWriter bloomWriter;
     private transient PigNullableWritable nullKey;
-    private transient Tuple bloomValue;
-    private transient NullableTuple bloomNullableTuple;
+    private transient NullableTuple[] bloomPartitions;
 
     public POBuildBloomRearrangeTez(POLocalRearrangeTez lr,
             boolean createBloomInMap, int numBloomFilters, int vectorSizeBytes,
@@ -142,8 +142,7 @@
             throw new ExecException(e);
         }
         bloomFilters = new BloomFilter[numBloomFilters];
-        bloomValue = mTupleFactory.newTuple(1);
-        bloomNullableTuple = new NullableTuple(bloomValue);
+        bloomPartitions = new NullableTuple[numBloomFilters];
     }
 
     @Override
@@ -167,7 +166,7 @@
                             if (createBloomInMap) {
                                 addKeyToBloomFilter(keyObj);
                             } else {
-                                writeJoinKeyForBloom(keyObj);
+                                writeJoinKeyForBloom(keyObj, key);
                             }
                         } else if (skipNullKeys) {
                             // Inner join. So don't bother writing null key
@@ -225,21 +224,27 @@
         }
     }
 
-    private void writeJoinKeyForBloom(Object key) throws IOException {
-        int partition = (key.hashCode() & Integer.MAX_VALUE) % numBloomFilters;
-        bloomValue.set(0, key);
-        bloomWriter.write(new NullableIntWritable(partition), bloomNullableTuple);
+    private void writeJoinKeyForBloom(Object keyObj, PigNullableWritable key) throws IOException {
+        int partition = (keyObj.hashCode() & Integer.MAX_VALUE) % numBloomFilters;
+        if (bloomPartitions[partition] == null) {
+            Tuple tuple = mTupleFactory.newTuple(1);
+            tuple.set(0, partition);
+            bloomPartitions[partition] = new NullableTuple(tuple);
+        }
+        bloomWriter.write(new NullableBytesWritable(new DataByteArray(DataType.toBytes(keyObj, keyType))), bloomPartitions[partition]);
     }
 
     private void writeBloomFilters() throws IOException {
+        Tuple tuple = mTupleFactory.newTuple(1);
+        NullableTuple nTuple = new NullableTuple(tuple);
         ByteArrayOutputStream baos = new ByteArrayOutputStream(vectorSizeBytes + 64);
         for (int i = 0; i < bloomFilters.length; i++) {
             if (bloomFilters[i] != null) {
                 DataOutputStream dos = new DataOutputStream(baos);
                 bloomFilters[i].write(dos);
                 dos.flush();
-                bloomValue.set(0, new DataByteArray(baos.toByteArray()));
-                bloomWriter.write(new NullableIntWritable(i), bloomNullableTuple);
+                tuple.set(0, new DataByteArray(baos.toByteArray()));
+                bloomWriter.write(new NullableIntWritable(i), nTuple);
                 baos.reset();
             }
         }
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java
index 99ebf1c..768558a 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java
@@ -188,7 +188,7 @@
                 if (Boolean.valueOf(conf.get(JobControlCompiler.END_OF_INP_IN_MAP, "false"))) {
                     this.parentPlan.endOfAllInput = true;
                 }
-                return RESULT_EOP;
+                return pkgr.getNext();
             }
 
             key = pkgr.getKey(min);
diff --git a/test/e2e/pig/tests/join.conf b/test/e2e/pig/tests/join.conf
index 97f6c05..a576ee0 100644
--- a/test/e2e/pig/tests/join.conf
+++ b/test/e2e/pig/tests/join.conf
@@ -28,7 +28,9 @@
             {
             # Tuple join key
             'num' => 1,
-            'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
+            'pig' => q\
+SET pig.bloomjoin.num.filters 5;
+a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
 b = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions);
 --c = filter a by age < 20;
 --d = filter b by age < 20;
@@ -171,7 +173,9 @@
             {
             # Tuple join key
             'num' => 1,
-            'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
+            'pig' => q\
+SET pig.bloomjoin.num.filters 5;
+a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
 b = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions);
 --c = filter a by age < 20;
 --d = filter b by age < 20;
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-1-KeyToReducer.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-1-KeyToReducer.gld
index c767523..331197b 100644
--- a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-1-KeyToReducer.gld
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-1-KeyToReducer.gld
@@ -27,15 +27,11 @@
     |---c: Load(file:///tmp/input2:org.apache.pig.builtin.PigStorage) - scope-14
 Tez vertex scope-50
 # Combine plan on edge <scope-48>
-Local Rearrange[tuple]{int}(false) - scope-55	->	 scope-50
-|   |
-|   Project[int][0] - scope-54
-|
-|---Package(BloomPackager)[tuple]{int} - scope-53
+org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.DistinctCombiner$Combine
 # Plan on vertex
 POValueOutputTez - scope-52	->	 [scope-46, scope-47]
 |
-|---Package(BloomPackager)[tuple]{int} - scope-51
+|---Package(BloomPackager)[tuple]{bytearray} - scope-51
 Tez vertex scope-46
 # Plan on vertex
 d: BloomFilter Rearrange[tuple]{bytearray}(false) - scope-26	<-	 scope-50	->	 scope-49
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-2-KeyToReducer.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-2-KeyToReducer.gld
index de14a55..bfafaf0 100644
--- a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-2-KeyToReducer.gld
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-2-KeyToReducer.gld
@@ -28,15 +28,11 @@
     |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
 Tez vertex scope-42
 # Combine plan on edge <scope-39>
-Local Rearrange[tuple]{int}(false) - scope-47	->	 scope-42
-|   |
-|   Project[int][0] - scope-46
-|
-|---Package(BloomPackager)[tuple]{int} - scope-45
+org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.DistinctCombiner$Combine
 # Plan on vertex
 POValueOutputTez - scope-44	->	 [scope-40]
 |
-|---Package(BloomPackager)[tuple]{int} - scope-43
+|---Package(BloomPackager)[tuple]{bytearray} - scope-43
 Tez vertex scope-40
 # Plan on vertex
 d: BloomFilter Rearrange[tuple]{chararray}(false) - scope-22	<-	 scope-42	->	 scope-41
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-3-KeyToReducer.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-3-KeyToReducer.gld
index 72a9b16..d00163e 100644
--- a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-3-KeyToReducer.gld
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-3-KeyToReducer.gld
@@ -4,19 +4,19 @@
 #--------------------------------------------------
 # TEZ DAG plan: pig-0_scope-0
 #--------------------------------------------------
-Tez vertex scope-45	->	Tez vertex group scope-58,Tez vertex group scope-59,
-Tez vertex scope-46	->	Tez vertex group scope-58,Tez vertex group scope-59,
-Tez vertex group scope-59	->	Tez vertex scope-52,
+Tez vertex scope-45	->	Tez vertex group scope-55,Tez vertex group scope-56,
+Tez vertex scope-46	->	Tez vertex group scope-55,Tez vertex group scope-56,
+Tez vertex group scope-56	->	Tez vertex scope-52,
 Tez vertex scope-52	->	Tez vertex scope-44,
 Tez vertex scope-44	->	Tez vertex scope-51,
-Tez vertex group scope-58	->	Tez vertex scope-51,
+Tez vertex group scope-55	->	Tez vertex scope-51,
 Tez vertex scope-51
 
 Tez vertex scope-45
 # Plan on vertex
-d: BuildBloom Rearrange[tuple]{int}(false) - scope-60	->	[ scope-51, scope-52]
+d: BuildBloom Rearrange[tuple]{int}(false) - scope-57	->	[ scope-51, scope-52]
 |   |
-|   Project[int][0] - scope-61
+|   Project[int][0] - scope-58
 |
 |---b: New For Each(false,false)[bag] - scope-15
     |   |
@@ -31,9 +31,9 @@
     |---b: Load(file:///tmp/input2:org.apache.pig.builtin.PigStorage) - scope-8
 Tez vertex scope-46
 # Plan on vertex
-d: BuildBloom Rearrange[tuple]{int}(false) - scope-62	->	[ scope-51, scope-52]
+d: BuildBloom Rearrange[tuple]{int}(false) - scope-59	->	[ scope-51, scope-52]
 |   |
-|   Project[int][0] - scope-63
+|   Project[int][0] - scope-60
 |
 |---c: New For Each(false,false)[bag] - scope-23
     |   |
@@ -46,25 +46,17 @@
     |   |---Project[bytearray][1] - scope-20
     |
     |---c: Load(file:///tmp/input3:org.apache.pig.builtin.PigStorage) - scope-16
-Tez vertex group scope-59	<-	 [scope-45, scope-46]	->	 scope-52
+Tez vertex group scope-56	<-	 [scope-45, scope-46]	->	 scope-52
 # No plan on vertex group
 Tez vertex scope-52
 # Combine plan on edge <scope-45>
-Local Rearrange[tuple]{int}(false) - scope-57	->	 scope-52
-|   |
-|   Project[int][0] - scope-56
-|
-|---Package(BloomPackager)[tuple]{int} - scope-55
+org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.DistinctCombiner$Combine
 # Combine plan on edge <scope-46>
-Local Rearrange[tuple]{int}(false) - scope-57	->	 scope-52
-|   |
-|   Project[int][0] - scope-56
-|
-|---Package(BloomPackager)[tuple]{int} - scope-55
+org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.DistinctCombiner$Combine
 # Plan on vertex
 POValueOutputTez - scope-54	->	 [scope-44]
 |
-|---Package(BloomPackager)[tuple]{int} - scope-53
+|---Package(BloomPackager)[tuple]{bytearray} - scope-53
 Tez vertex scope-44
 # Plan on vertex
 d: BloomFilter Rearrange[tuple]{int}(false) - scope-29	<-	 scope-52	->	 scope-51
@@ -82,7 +74,7 @@
     |   |---Project[bytearray][1] - scope-4
     |
     |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
-Tez vertex group scope-58	<-	 [scope-45, scope-46]	->	 scope-51
+Tez vertex group scope-55	<-	 [scope-45, scope-46]	->	 scope-51
 # No plan on vertex group
 Tez vertex scope-51
 # Plan on vertex
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-4-KeyToReducer.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-4-KeyToReducer.gld
index 928b9a5..2ae40ff 100644
--- a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-4-KeyToReducer.gld
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-4-KeyToReducer.gld
@@ -60,15 +60,11 @@
     |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-17
 Tez vertex scope-52
 # Combine plan on edge <scope-50>
-Local Rearrange[tuple]{int}(false) - scope-57	->	 scope-52
-|   |
-|   Project[int][0] - scope-56
-|
-|---Package(BloomPackager)[tuple]{int} - scope-55
+org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.DistinctCombiner$Combine
 # Plan on vertex
 POValueOutputTez - scope-54	->	 [scope-46]
 |
-|---Package(BloomPackager)[tuple]{int} - scope-53
+|---Package(BloomPackager)[tuple]{bytearray} - scope-53
 Tez vertex scope-46
 # Plan on vertex
 d: BloomFilter Rearrange[tuple]{int}(false) - scope-29	<-	 scope-52	->	 scope-51
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-5-KeyToReducer.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-5-KeyToReducer.gld
index a426025..3c120cc 100644
--- a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-5-KeyToReducer.gld
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-5-KeyToReducer.gld
@@ -29,18 +29,14 @@
     |---b: Load(file:///tmp/input2:org.apache.pig.builtin.PigStorage) - scope-21
 Tez vertex scope-62
 # Combine plan on edge <scope-60>
-Local Rearrange[tuple]{int}(false) - scope-67	->	 scope-62
-|   |
-|   Project[int][0] - scope-66
-|
-|---Package(BloomPackager)[tuple]{int} - scope-65
+org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.DistinctCombiner$Combine
 # Plan on vertex
 POValueOutputTez - scope-64	->	 [scope-54, scope-58]
 |
-|---Package(BloomPackager)[tuple]{int} - scope-63
+|---Package(BloomPackager)[tuple]{bytearray} - scope-63
 Tez vertex scope-54
 # Plan on vertex
-a: Split - scope-68
+a: Split - scope-65
 |   |
 |   d: BloomFilter Rearrange[tuple]{int}(false) - scope-34	<-	 scope-62	->	 scope-61
 |   |   |
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-6-KeyToReducer.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-6-KeyToReducer.gld
index c5a2000..f564349 100644
--- a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-6-KeyToReducer.gld
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-6-KeyToReducer.gld
@@ -11,7 +11,7 @@
 
 Tez vertex scope-49
 # Plan on vertex
-a: Split - scope-63
+a: Split - scope-60
 |   |
 |   a2: Store(file:///tmp/pigoutput/a2:org.apache.pig.builtin.PigStorage) - scope-15
 |   |
@@ -48,15 +48,11 @@
     |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
 Tez vertex scope-57
 # Combine plan on edge <scope-49>
-Local Rearrange[tuple]{int}(false) - scope-62	->	 scope-57
-|   |
-|   Project[int][0] - scope-61
-|
-|---Package(BloomPackager)[tuple]{int} - scope-60
+org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.DistinctCombiner$Combine
 # Plan on vertex
 POValueOutputTez - scope-59	->	 [scope-53]
 |
-|---Package(BloomPackager)[tuple]{int} - scope-58
+|---Package(BloomPackager)[tuple]{bytearray} - scope-58
 Tez vertex scope-53
 # Plan on vertex
 d: BloomFilter Rearrange[tuple]{int}(false) - scope-34	<-	 scope-57	->	 scope-56
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-7-KeyToReducer.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-7-KeyToReducer.gld
index 1acbb13..e2a7adc 100644
--- a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-7-KeyToReducer.gld
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-7-KeyToReducer.gld
@@ -12,7 +12,7 @@
 
 Tez vertex scope-43
 # Plan on vertex
-a: Split - scope-58
+a: Split - scope-55
 |   |
 |   e: BuildBloom Rearrange[tuple]{int}(false) - scope-36	->	[ scope-51, scope-52]
 |   |   |
@@ -41,15 +41,11 @@
     |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
 Tez vertex scope-52
 # Combine plan on edge <scope-43>
-Local Rearrange[tuple]{int}(false) - scope-57	->	 scope-52
-|   |
-|   Project[int][0] - scope-56
-|
-|---Package(BloomPackager)[tuple]{int} - scope-55
+org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.DistinctCombiner$Combine
 # Plan on vertex
 POValueOutputTez - scope-54	->	 [scope-45, scope-47]
 |
-|---Package(BloomPackager)[tuple]{int} - scope-53
+|---Package(BloomPackager)[tuple]{bytearray} - scope-53
 Tez vertex scope-45
 # Plan on vertex
 e: BloomFilter Rearrange[tuple]{int}(false) - scope-32	<-	 scope-52	->	 scope-51