PIG-4963: Add a Bloom join (rohini)

git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1780431 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index d0a7454..d49cdf9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -34,6 +34,8 @@
  
 IMPROVEMENTS
 
+PIG-4963: Add a Bloom join (rohini)
+
 PIG-3938: Add LoadCaster to EvalFunc (knoguchi)
 
 PIG-5105: Tez unit tests failing with "Argument list too long" (rohini)
diff --git a/src/docs/src/documentation/content/xdocs/basic.xml b/src/docs/src/documentation/content/xdocs/basic.xml
index b3a12c6..ea8b75b 100644
--- a/src/docs/src/documentation/content/xdocs/basic.xml
+++ b/src/docs/src/documentation/content/xdocs/basic.xml
@@ -6955,7 +6955,7 @@
    <table>
       <tr> 
             <td>
-               <p>alias = JOIN alias BY {expression|'('expression [, expression …]')'} (, alias BY {expression|'('expression [, expression …]')'} …) [USING 'replicated' | 'skewed' | 'merge' | 'merge-sparse'] [PARTITION BY partitioner] [PARALLEL n];  </p>
+               <p>alias = JOIN alias BY {expression|'('expression [, expression …]')'} (, alias BY {expression|'('expression [, expression …]')'} …) [USING 'replicated' | 'bloom' | 'skewed' | 'merge' | 'merge-sparse'] [PARTITION BY partitioner] [PARALLEL n];  </p>
             </td>
          </tr> 
    </table></section>
@@ -7004,7 +7004,16 @@
                <p>Use to perform replicated joins (see <a href="perf.html#replicated-joins">Replicated Joins</a>).</p>
             </td>
          </tr>
-         
+
+         <tr>
+            <td>
+               <p>'bloom'</p>
+            </td>
+            <td>
+               <p>Use to perform bloom joins (see <a href="perf.html#bloom-joins">Bloom Joins</a>).</p>
+            </td>
+         </tr>
+
          <tr>
             <td>
                <p>'skewed'</p>
@@ -7142,7 +7151,7 @@
       <tr> 
             <td>
                <p>alias = JOIN left-alias BY left-alias-column [LEFT|RIGHT|FULL] [OUTER], right-alias BY right-alias-column 
-               [USING 'replicated' | 'skewed' | 'merge'] [PARTITION BY partitioner] [PARALLEL n];  </p>
+               [USING 'replicated' | 'bloom' | 'skewed' | 'merge'] [PARTITION BY partitioner] [PARALLEL n];  </p>
             </td>
          </tr> 
    </table>
@@ -7213,7 +7222,7 @@
             </td>
          </tr>
 
-  <tr>
+         <tr>
             <td>
                <p>USING</p>
             </td>
@@ -7230,8 +7239,18 @@
                <p>Only left outer join is supported for replicated joins.</p>
             </td>
          </tr>
-         
-                  <tr>
+
+         <tr>
+            <td>
+               <p>'bloom'</p>
+            </td>
+            <td>
+               <p>Use to perform bloom joins (see <a href="perf.html#bloom-joins">Bloom Joins</a>).</p>
+               <p>Full outer join is not supported for bloom joins.</p>
+            </td>
+         </tr>
+
+         <tr>
             <td>
                <p>'skewed'</p>
             </td>
@@ -7324,6 +7343,13 @@
 C= JOIN A BY $0 LEFT, B BY $0 USING 'replicated';
 </source>
 
+<p>This example shows a bloom right outer join.</p>
+<source>
+A = LOAD 'large';
+B = LOAD 'small';
+C= JOIN A BY $0 RIGHT, B BY $0 USING 'bloom';
+</source>
+
 <p>This example shows a skewed full outer join.</p>
 <source>
 A = LOAD 'studenttab' as (name, age, gpa);
diff --git a/src/docs/src/documentation/content/xdocs/perf.xml b/src/docs/src/documentation/content/xdocs/perf.xml
index 4163351..de9b694 100644
--- a/src/docs/src/documentation/content/xdocs/perf.xml
+++ b/src/docs/src/documentation/content/xdocs/perf.xml
@@ -1202,6 +1202,100 @@
 </section>
 <!-- END FRAGMENT REPLICATE JOINS-->
 
+<!-- BLOOM JOINS-->
+<!-- +++++++++++++++++++++++++++++++ -->
+<section id="bloom-joins">
+<title>Bloom Joins</title>
+<p>Bloom join is a special type of join where a bloom filter is constructed using join keys of one relation and
+used to filter records of the other relations before doing a regular hash join.
+The amount of data sent to the reducers will be a lot less depending up on the numbers of records that are filtered on the map side.
+Bloom join is very useful in cases where the number of matching records between relations in a join are comparatively less
+compared to the total records allowing many to be filtered before the join.
+Before bloom join was added as a type of join, same functionality was achieved by users by using
+the <a href="func.html#bloom">builtin bloom udfs</a> which is not as efficient and required more lines of code as well.
+Currently bloom join is only implemented in Tez execution mode. Builtin bloom udfs have to be used for other execution modes.</p>
+
+<section>
+<title>Usage</title>
+<p>Perform a bloom join with the USING clause (see <a href="basic.html#join-inner">JOIN (inner)</a> and <a href="basic.html#join-outer">JOIN (outer)</a>).
+In this example, a large relation is joined with two smaller relations. Note that the large relation comes first followed by the smaller relations.
+Bloom filter is built from join keys of the right most relation which is small and the filter is applied on the big and medium relations.
+None of the relations are required to fit into main memory. </p>
+<source>
+big = LOAD 'big_data' AS (b1,b2,b3);
+
+medium = LOAD 'medium_data' AS (m1,m2,m3);
+
+small = LOAD 'small_data' AS (s1,s2,s3);
+
+C = JOIN big BY b1, medium BY m1, small BY s1 USING 'bloom';
+</source>
+
+<p>
+In the case of inner join and right outer join, the right most relation is used for building the bloom filter
+and the users are expected to specify the smaller dataset as the right most relation.
+But in the case of left outer join, the left most relation is used for building the bloom filter and is expected to be the smaller dataset.
+This is because all records of the outer relation should be in the result and no records can be filtered.
+If the left relation turns out to be the bigger dataset, it would not be as efficient to build the bloom filter on the bigger dataset.
+But it might still perform better than a regular join if it is able to filter lot of records from the right relation.
+</p>
+
+<source>
+big = LOAD 'big_data' AS (b1,b2,b3);
+
+small = LOAD 'small_data' AS (m1,m2,m3);
+
+C = JOIN small BY s1 LEFT, big BY b1 USING 'bloom';
+</source>
+</section>
+
+<section>
+<title>Conditions</title>
+<ul>
+<li>Bloom join cannot be used with a FULL OUTER join.</li>
+<li>If the the underlying data is sufficiently skewed, bloom join might not help. Skewed join can be considered for those cases.</li>
+</ul>
+</section>
+
+<section>
+<title>Tuning options</title>
+<p>
+There are multiple <a href="start.html#properties">pig properties</a> than can be configured to construct a more efficient bloom filter.
+See <a href="http://en.wikipedia.org/wiki/Bloom_filter">Bloom Filter</a> for a discussion of how to select the number of bits and the number of hash functions.
+Easier option would be to search for "bloom filter calculator" in a search engine and use one of the online bloom filter calculators available to arrive at the desired values.
+</p>
+<ul>
+<li>pig.bloomjoin.strategy - The valid values for this are 'map' and 'reduce'. Default value is map.
+Bloom join has two different kind of implementations to be more efficient in different cases.
+In general, there is an extra reduce step in the DAG for construction of the bloom filter(s).
+<ul>
+<li>map - In each map, bloom filters are computed on the join keys partitioned by the hashcode of the key
+with pig.bloomjoin.num.filters number of partitions.
+Bloom filters for each partition from different maps are then combined in the reducers producing one bloom filter per partition.
+The default value of pig.bloomjoin.num.filters is 1 for this strategy and so usually only one bloom filter is created.
+This is efficient and fast if there are smaller number of maps (&lt;10) and the number of distinct keys are not too high.
+It can be faster with larger number of maps and even with bigger bloom vector sizes,
+ but the amount of data shuffled to the reducer for aggregation becomes huge making it inefficient.</li>
+<li>reduce - Join keys are sent from the map to the reducer partitioned by hashcode of the key with
+pig.bloomjoin.num.filters number of partitions. In the reducers, one bloom filter is then computed per partition.
+Number of reducers are set equal to the number of partitions allowing for each bloom filter to be computed in parallel.
+The default value of pig.bloomjoin.num.filters is 11 for this strategy.
+This is efficient for larger datasets with lot of maps or very large bloom vector size.
+In this case size of keys sent to the reducer is smaller than sending bloom filters to reducer for aggregation making it efficient.</li>
+</ul>
+</li>
+<li>pig.bloomjoin.num.filters - The number of bloom filters that will be created. Default is 1 for map strategy and 11 for reduce strategy.</li>
+<li>pig.bloomjoin.vectorsize.bytes - The size in bytes of the bit vector to be used for the bloom filter.
+A bigger vector size will be needed when the number of distinct keys is higher. Default value is 1048576 (1MB).</li>
+<li>pig.bloomjoin.hash.functions - The type of hash function to use. Valid values are 'jenkins' and 'murmur'. Default is murmur.</li>
+<li>pig.bloomjoin.hash.types - The number of hash functions to be used in bloom computation. It determines the probability of false positives.
+Higher the number lower the false positives. Too high a value can increase the cpu time. Default value is 3.</li>
+</ul>
+</section>
+
+</section>
+<!-- END BLOOM JOINS-->
+
 <!-- +++++++++++++++++++++++++++++++ -->
 <!-- SKEWED JOINS-->
 <section id="skewed-joins">
diff --git a/src/org/apache/pig/PigConfiguration.java b/src/org/apache/pig/PigConfiguration.java
index 36adb2c..a58c382 100644
--- a/src/org/apache/pig/PigConfiguration.java
+++ b/src/org/apache/pig/PigConfiguration.java
@@ -134,6 +134,58 @@
     public static final String PIG_SKEWEDJOIN_REDUCE_MEM = "pig.skewedjoin.reduce.mem";
 
     /**
+     * Bloom join has two different kind of implementations.
+     * <ul>
+     * <li>map <br>
+     * In each map, bloom filters are computed on the join keys partitioned by
+     * the hashcode of the key with {@link #PIG_BLOOMJOIN_NUM_FILTERS} number of
+     * partitions. Bloom filters from different maps are then combined in the
+     * reducer producing one bloom filter per partition. This is efficient and
+     * fast if there are smaller number of maps (<10) and the number of
+     * distinct keys are not too high. It can be faster with larger number of
+     * maps and even with bigger bloom vector sizes, but the amount of data
+     * shuffled to the reducer for aggregation becomes huge making it
+     * inefficient.</li>
+     * <li>reduce <br>
+     * Join keys are sent from the map to the reducer partitioned by hashcode of
+     * the key with {@link #PIG_BLOOMJOIN_NUM_FILTERS} number of reducers. One
+     * bloom filter is then created per partition. This is efficient for larger
+     * datasets with lot of maps or very large
+     * {@link #PIG_BLOOMJOIN_VECTORSIZE_BYTES}. In this case size of keys sent
+     * to the reducer is smaller than sending bloom filters to reducer for
+     * aggregation making it efficient.</li>
+     * </ul>
+     * Default value is map.
+     */
+    public static final String PIG_BLOOMJOIN_STRATEGY = "pig.bloomjoin.strategy";
+
+    /**
+     * The number of bloom filters that will be created.
+     * Default is 1 for map strategy and 11 for reduce strategy.
+     */
+    public static final String PIG_BLOOMJOIN_NUM_FILTERS = "pig.bloomjoin.num.filters";
+
+    /**
+     * The size in bytes of the bit vector to be used for the bloom filter.
+     * A bigger vector size will be needed when the number of distinct keys is higher.
+     * Default value is 1048576 (1MB).
+     */
+    public static final String PIG_BLOOMJOIN_VECTORSIZE_BYTES = "pig.bloomjoin.vectorsize.bytes";
+
+    /**
+     * The type of hash function to use. Valid values are jenkins and murmur.
+     * Default is murmur.
+     */
+    public static final String PIG_BLOOMJOIN_HASH_TYPE = "pig.bloomjoin.hash.type";
+
+    /**
+     * The number of hash functions to be used in bloom computation. It determines the probability of false positives.
+     * Higher the number lower the false positives. Too high a value can increase the cpu time.
+     * Default value is 3.
+     */
+    public static final String PIG_BLOOMJOIN_HASH_FUNCTIONS = "pig.bloomjoin.hash.functions";
+
+    /**
      * This key used to control the maximum size loaded into
      * the distributed cache when doing fragment-replicated join
      */
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
index cacba40..ae8d357 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
@@ -1116,7 +1116,9 @@
         try{
             nonBlocking(op);
             phyToMROpMap.put(op, curMROp);
-            if (op.getPkgr().getPackageType() == PackageType.JOIN) {
+            if (op.getPkgr().getPackageType() == PackageType.JOIN
+                    || op.getPkgr().getPackageType() == PackageType.BLOOMJOIN) {
+                // Bloom join is not implemented in mapreduce mode and falls back to regular join
                 curMROp.markRegularJoin();
             } else if (op.getPkgr().getPackageType() == PackageType.GROUP) {
                 if (op.getNumInps() == 1) {
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
index 550ff65..1611a8f 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
@@ -37,6 +37,7 @@
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.JoinPackager;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.BloomPackager;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
@@ -160,7 +161,7 @@
             // tuples out of the getnext() call of POJoinPackage
             // In this case, we process till we see EOP from
             // POJoinPacakage.getNext()
-            if (pack.getPkgr() instanceof JoinPackager)
+            if (pack.getPkgr() instanceof JoinPackager || pack.getPkgr() instanceof BloomPackager)
             {
                 pack.attachInput(key, tupIter.iterator());
                 while (true)
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java
index 9a70b81..c082044 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java
@@ -21,14 +21,16 @@
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPoissonSample;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POReservoirSample;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POBuildBloomRearrangeTez;
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.VisitorException;
 
@@ -105,7 +107,7 @@
         public void visitReservoirSample(POReservoirSample reservoirSample) throws VisitorException {
             endOfAllInputFlag = true;
         }
-        
+
         @Override
         public void visitPoissonSample(POPoissonSample poissonSample) throws VisitorException {
             endOfAllInputFlag = true;
@@ -122,6 +124,13 @@
             }
         }
 
+        @Override
+        public void visitLocalRearrange(POLocalRearrange lr) throws VisitorException{
+            if (lr instanceof POBuildBloomRearrangeTez) {
+                endOfAllInputFlag = true;
+            }
+            super.visitLocalRearrange(lr);
+        }
 
         /**
          * @return if end of all input is present
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
index 1ff1abd..0e35273 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
@@ -441,6 +441,10 @@
     public void reset() {
     }
 
+    public boolean isEndOfAllInput() {
+        return parentPlan.endOfAllInput;
+    }
+
     /**
      * @return PigProgressable stored in threadlocal
      */
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java
index 8992b6e..805f21a 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java
@@ -51,13 +51,13 @@
     protected DataBag[] bags;
 
     public static enum PackageType {
-        GROUP, JOIN
+        GROUP, JOIN, BLOOMJOIN
     };
 
     protected transient Illustrator illustrator = null;
 
     // The key being worked on
-    Object key;
+    protected Object key;
 
     // marker to indicate if key is a tuple
     protected boolean isKeyTuple = false;
@@ -65,7 +65,7 @@
     protected boolean isKeyCompound = false;
 
     // key's type
-    byte keyType;
+    protected byte keyType;
 
     // The number of inputs to this
     // co-group. 0 indicates a distinct, which means there will only be a
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
index 6e88528..5ed3ee3 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
@@ -479,7 +479,7 @@
                 POLocalRearrangeTez.class);
 
         for (POLocalRearrangeTez lr : lrs) {
-            if (lr.getOutputKey().equals(to.getOperatorKey().toString())) {
+            if (lr.containsOutputKey(to.getOperatorKey().toString())) {
                 byte keyType = lr.getKeyType();
                 setIntermediateOutputKeyValue(keyType, conf, to, lr.isConnectedToPackage(), isMergedInput);
                 // In case of secondary key sort, main key type is the actual key type
@@ -540,26 +540,36 @@
 
         UserPayload payLoad = TezUtils.createUserPayloadFromConf(conf);
         out.setUserPayload(payLoad);
+        in.setUserPayload(payLoad);
 
+        // Remove combiner and reset payload
         if (!combinePlan.isEmpty()) {
             boolean noCombineInReducer = false;
+            boolean noCombineInMapper = edge.getCombinerInMap() == null ? false : !edge.getCombinerInMap();
             String reducerNoCombiner = globalConf.get(PigConfiguration.PIG_EXEC_NO_COMBINER_REDUCER);
-            if (reducerNoCombiner == null || reducerNoCombiner.equals("auto")) {
+            if (edge.getCombinerInReducer() != null) {
+                noCombineInReducer = !edge.getCombinerInReducer();
+            } else if (reducerNoCombiner == null || reducerNoCombiner.equals("auto")) {
                 noCombineInReducer = TezCompilerUtil.bagDataTypeInCombinePlan(combinePlan);
             } else {
                 noCombineInReducer = Boolean.parseBoolean(reducerNoCombiner);
             }
-            if (noCombineInReducer) {
+            if (noCombineInReducer || noCombineInMapper) {
                 log.info("Turning off combiner in reducer vertex " + to.getOperatorKey() + " for edge from " + from.getOperatorKey());
                 conf.unset(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINER_CLASS);
                 conf.unset(MRJobConfig.COMBINE_CLASS_ATTR);
                 conf.unset("pig.combinePlan");
                 conf.unset("pig.combine.package");
                 conf.unset("pig.map.keytype");
-                payLoad = TezUtils.createUserPayloadFromConf(conf);
+                UserPayload payLoadWithoutCombiner = TezUtils.createUserPayloadFromConf(conf);
+                if (noCombineInMapper) {
+                    out.setUserPayload(payLoadWithoutCombiner);
+                }
+                if (noCombineInReducer) {
+                    in.setUserPayload(payLoadWithoutCombiner);
+                }
             }
         }
-        in.setUserPayload(payLoad);
 
         if (edge.dataMovementType!=DataMovementType.BROADCAST && to.getEstimatedParallelism()!=-1 && to.getVertexParallelism()==-1 && (to.isGlobalSort()||to.isSkewedJoin())) {
             // Use custom edge
@@ -717,7 +727,7 @@
                             PlanHelper.getPhysicalOperators(pred.plan, POLocalRearrangeTez.class);
                     for (POLocalRearrangeTez lr : lrs) {
                         if (lr.isConnectedToPackage()
-                                && lr.getOutputKey().equals(tezOp.getOperatorKey().toString())) {
+                                && lr.containsOutputKey(tezOp.getOperatorKey().toString())) {
                             localRearrangeMap.put((int) lr.getIndex(), inputKey);
                             if (isVertexGroup) {
                                 isMergedInput = true;
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
index 6dc118f..c112528 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
@@ -32,10 +32,12 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.hash.Hash;
 import org.apache.pig.CollectableLoadFunc;
 import org.apache.pig.FuncSpec;
 import org.apache.pig.IndexableLoadFunc;
@@ -44,8 +46,10 @@
 import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigTupleWritableComparator;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MergeJoinIndexer;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigWritableComparators;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.ScalarPhyFinder;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.UDFFinder;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
@@ -82,7 +86,10 @@
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager.PackageType;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.BloomPackager;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.NativeTezOper;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POBloomFilterRearrangeTez;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POBuildBloomRearrangeTez;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POCounterStatsTez;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POCounterTez;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POFRJoinTez;
@@ -110,6 +117,7 @@
 import org.apache.pig.impl.builtin.PartitionSkewedKeys;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.io.NullableIntWritable;
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.Operator;
@@ -167,6 +175,10 @@
 
     private Map<PhysicalOperator, TezOperator> phyToTezOpMap;
 
+    // Contains the inputs to operator like join, with the list maintaining the
+    // same order of join from left to right
+    private Map<TezOperator, List<TezOperator>> inputsMap;
+
     public static final String USER_COMPARATOR_MARKER = "user.comparator.func:";
     public static final String FILE_CONCATENATION_THRESHOLD = "pig.files.concatenation.threshold";
     public static final String OPTIMISTIC_FILE_CONCATENATION = "pig.optimistic.files.concatenation";
@@ -175,6 +187,8 @@
     private boolean optimisticFileConcatenation = false;
     private List<String> readOnceLoadFuncs = null;
 
+    private Configuration conf;
+
     private POLocalRearrangeTezFactory localRearrangeFactory;
 
     public TezCompiler(PhysicalPlan plan, PigContext pigContext)
@@ -184,6 +198,7 @@
         this.pigContext = pigContext;
 
         pigProperties = pigContext.getProperties();
+        conf = ConfigurationUtil.toConfiguration(pigProperties, false);
         splitsSeen = Maps.newHashMap();
         tezPlan = new TezOperPlan();
         nig = NodeIdGenerator.getGenerator();
@@ -197,6 +212,7 @@
         scope = roots.get(0).getOperatorKey().getScope();
         localRearrangeFactory = new POLocalRearrangeTezFactory(scope, nig);
         phyToTezOpMap = Maps.newHashMap();
+        inputsMap = Maps.newHashMap();
 
         fileConcatenationThreshold = Integer.parseInt(pigProperties
                 .getProperty(FILE_CONCATENATION_THRESHOLD, "100"));
@@ -894,6 +910,7 @@
     public void visitGlobalRearrange(POGlobalRearrange op) throws VisitorException {
         try {
             blocking();
+            inputsMap.put(curTezOp, new ArrayList<>(Arrays.asList(compiledInputs)));
             TezCompilerUtil.setCustomPartitioner(op.getCustomPartitioner(), curTezOp);
             curTezOp.setRequestedParallelism(op.getRequestedParallelism());
             if (op.isCross()) {
@@ -1340,6 +1357,9 @@
                 } else if (op.getNumInps() > 1) {
                     curTezOp.markCogroup();
                 }
+            } else if (op.getPkgr().getPackageType() == PackageType.BLOOMJOIN) {
+                curTezOp.markRegularJoin();
+                addBloomToJoin(op, curTezOp);
             }
         } catch (Exception e) {
             int errCode = 2034;
@@ -1348,6 +1368,132 @@
         }
     }
 
+    private void addBloomToJoin(POPackage op, TezOperator curTezOp) throws PlanException {
+
+        List<TezOperator> inputs = inputsMap.get(curTezOp);
+        TezOperator buildBloomOp;
+        List<TezOperator> applyBloomOps = new ArrayList<>();
+
+        String strategy = conf.get(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, POBuildBloomRearrangeTez.DEFAULT_BLOOM_STRATEGY);
+        boolean createBloomInMap = "map".equals(strategy);
+        if (!createBloomInMap && !strategy.equals("reduce")) {
+            throw new PlanException(new IllegalArgumentException(
+                    "Invalid value for "
+                            + PigConfiguration.PIG_BLOOMJOIN_STRATEGY + " -  "
+                            + strategy + ". Valid values are map and reduce"));
+        }
+        int numHash = conf.getInt(PigConfiguration.PIG_BLOOMJOIN_HASH_FUNCTIONS, POBuildBloomRearrangeTez.DEFAULT_NUM_BLOOM_HASH_FUNCTIONS);
+        int vectorSizeBytes =  conf.getInt(PigConfiguration.PIG_BLOOMJOIN_VECTORSIZE_BYTES, POBuildBloomRearrangeTez.DEFAULT_BLOOM_VECTOR_SIZE_BYTES);
+        int numBloomFilters = POBuildBloomRearrangeTez.getNumBloomFilters(conf);
+        int hashType = Hash.parseHashType(conf.get(PigConfiguration.PIG_BLOOMJOIN_HASH_TYPE, POBuildBloomRearrangeTez.DEFAULT_BLOOM_HASH_TYPE));
+
+        // We build bloom of the right most input and apply the bloom filter on the left inputs by default.
+        // But in case of left outer join we build bloom of the left input and use it on the right input
+        boolean[] inner = op.getPkgr().getInner();
+        boolean skipNullKeys = true;
+        if (inner[inner.length - 1]) {  // inner has from right to left while inputs has from left to right
+            buildBloomOp = inputs.get(inputs.size() - 1); // Bloom filter is built from right most input
+            for (int i = 0; i < (inner.length - 1); i++) {
+                applyBloomOps.add(inputs.get(i));
+            }
+            skipNullKeys = inner[0];
+        } else {
+            // Left outer join
+            skipNullKeys = false;
+            buildBloomOp = inputs.get(0); // Bloom filter is built from left most input
+            for (int i = 1; i < inner.length; i++) {
+                applyBloomOps.add(inputs.get(i));
+            }
+        }
+
+        // Add BuildBloom operator to the input
+        POLocalRearrangeTez lr = (POLocalRearrangeTez) buildBloomOp.plan.getLeaves().get(0);
+        POBuildBloomRearrangeTez bbr = new POBuildBloomRearrangeTez(lr, createBloomInMap, numBloomFilters, vectorSizeBytes, numHash, hashType);
+        bbr.setSkipNullKeys(skipNullKeys);
+        buildBloomOp.plan.remove(lr);
+        buildBloomOp.plan.addAsLeaf(bbr);
+
+        // Add a new reduce vertex that will construct the final bloom filter
+        //    - by combining the bloom filters from the buildBloomOp input tasks in the map strategy
+        //    - or directly from the keys from the buildBloomOp input tasks in the reduce strategy
+        TezOperator combineBloomOp = getTezOp();
+        tezPlan.add(combineBloomOp);
+        combineBloomOp.markBuildBloom();
+        // Explicitly set the parallelism for the new vertex to number of bloom filters.
+        // Auto parallelism will bring it down based on the actual output size
+        combineBloomOp.setEstimatedParallelism(numBloomFilters);
+        // We don't want parallelism to be changed during the run by grace auto parallelism
+        // It will take the whole input size and estimate way higher
+        combineBloomOp.setDontEstimateParallelism(true);
+
+        String combineBloomOpKey = combineBloomOp.getOperatorKey().toString();
+        TezEdgeDescriptor edge = new TezEdgeDescriptor();
+        TezCompilerUtil.connect(tezPlan, buildBloomOp, combineBloomOp, edge);
+        bbr.setBloomOutputKey(combineBloomOpKey);
+
+
+        POPackage pkg = new POPackage(OperatorKey.genOpKey(scope));
+        pkg.setNumInps(1);
+        BloomPackager pkgr = new BloomPackager(createBloomInMap, vectorSizeBytes, numHash, hashType);;
+        pkgr.setKeyType(DataType.INTEGER);
+        pkg.setPkgr(pkgr);
+        POValueOutputTez combineBloomOutput = new POValueOutputTez(OperatorKey.genOpKey(scope));
+        combineBloomOp.plan.addAsLeaf(pkg);
+        combineBloomOp.plan.addAsLeaf(combineBloomOutput);
+
+        edge.setIntermediateOutputKeyClass(NullableIntWritable.class.getName());
+        edge.setIntermediateOutputKeyComparatorClass(PigWritableComparators.PigIntRawBytesComparator.class.getName());
+
+        // Add combiner as well.
+        POPackage pkg_c = new POPackage(OperatorKey.genOpKey(scope));
+        BloomPackager combinerPkgr = new BloomPackager(createBloomInMap, vectorSizeBytes, numHash, hashType);
+        combinerPkgr.setCombiner(true);
+        combinerPkgr.setKeyType(DataType.INTEGER);
+        pkg_c.setPkgr(combinerPkgr);
+        pkg_c.setNumInps(1);
+        edge.combinePlan.addAsLeaf(pkg_c);
+        POProject prjKey = new POProject(OperatorKey.genOpKey(scope));
+        prjKey.setResultType(DataType.INTEGER);
+        List<PhysicalPlan> clrInps = new ArrayList<PhysicalPlan>();
+        PhysicalPlan pp = new PhysicalPlan();
+        pp.add(prjKey);
+        clrInps.add(pp);
+        POLocalRearrangeTez clr = localRearrangeFactory.create(0, LocalRearrangeType.WITHPLAN, clrInps, DataType.INTEGER);
+        clr.setOutputKey(combineBloomOpKey);
+        edge.combinePlan.addAsLeaf(clr);
+
+        if (createBloomInMap) {
+            // No combiner needed on map as there will be only one bloom filter per map for each partition
+            // In the reducer, the bloom filters will be combined with same logic of reduce in BloomPackager
+            edge.setCombinerInMap(false);
+            edge.setCombinerInReducer(true);
+        } else {
+            pkgr.setBloomKeyType(op.getPkgr().getKeyType());
+            // Do distinct of the keys on the map side to reduce data sent to reducers.
+            // In case of reduce, not adding a combiner and doing the distinct during reduce itself.
+            // If needed one can be added later
+            edge.setCombinerInMap(true);
+            edge.setCombinerInReducer(false);
+        }
+
+        // Broadcast the final bloom filter to other inputs
+        for (TezOperator applyBloomOp : applyBloomOps) {
+            applyBloomOp.markFilterBloom();
+            lr = (POLocalRearrangeTez) applyBloomOp.plan.getLeaves().get(0);
+            POBloomFilterRearrangeTez bfr = new POBloomFilterRearrangeTez(lr, numBloomFilters);
+            applyBloomOp.plan.remove(lr);
+            applyBloomOp.plan.addAsLeaf(bfr);
+            bfr.setInputKey(combineBloomOpKey);
+            edge = new TezEdgeDescriptor();
+            edge.setIntermediateOutputKeyClass(NullableIntWritable.class.getName());
+            edge.setIntermediateOutputKeyComparatorClass(PigWritableComparators.PigIntRawBytesComparator.class.getName());
+            TezCompilerUtil.configureValueOnlyTupleOutput(edge, DataMovementType.BROADCAST);
+            TezCompilerUtil.connect(tezPlan, combineBloomOp, applyBloomOp, edge);
+            combineBloomOutput.addOutputKey(applyBloomOp.getOperatorKey().toString());
+        }
+
+    }
+
     @Override
     public void visitPOForEach(POForEach op) throws VisitorException{
         try{
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java
index cfdd3f1..97f94d5 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java
@@ -31,9 +31,13 @@
  * Descriptor for Tez edge. It holds combine plan as well as edge properties.
  */
 public class TezEdgeDescriptor implements Serializable {
-    // Combiner runs on both input and output of Tez edge.
-    transient public PhysicalPlan combinePlan;
+
+    public transient PhysicalPlan combinePlan;
     private boolean needsDistinctCombiner;
+    // Combiner runs on both input and output of Tez edge by default
+    // It can be configured to run only in output(map) or input(reduce)
+    private Boolean combinerInMap;
+    private Boolean combinerInReducer;
 
     public String inputClassName;
     public String outputClassName;
@@ -74,6 +78,22 @@
         needsDistinctCombiner = nic;
     }
 
+    public Boolean getCombinerInMap() {
+        return combinerInMap;
+    }
+
+    public void setCombinerInMap(Boolean combinerInMap) {
+        this.combinerInMap = combinerInMap;
+    }
+
+    public Boolean getCombinerInReducer() {
+        return combinerInReducer;
+    }
+
+    public void setCombinerInReducer(Boolean combinerInReducer) {
+        this.combinerInReducer = combinerInReducer;
+    }
+
     public boolean isUseSecondaryKey() {
         return useSecondaryKey;
     }
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
index 4d134ae..5d8ade9 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
@@ -181,7 +181,11 @@
         // Indicate if this job is a native job
         NATIVE,
         // Indicate if this job does rank counter
-        RANK_COUNTER;
+        RANK_COUNTER,
+        // Indicate if this job constructs bloom filter
+        BUILDBLOOM,
+        // Indicate if this job applies bloom filter
+        FILTERBLOOM;
     };
 
     // Features in the job/vertex. Mostly will be only one feature.
@@ -453,6 +457,22 @@
         feature.set(OPER_FEATURE.RANK_COUNTER.ordinal());
     }
 
+    public boolean isBuildBloom() {
+        return feature.get(OPER_FEATURE.BUILDBLOOM.ordinal());
+    }
+
+    public void markBuildBloom() {
+        feature.set(OPER_FEATURE.BUILDBLOOM.ordinal());
+    }
+
+    public boolean isFilterBloom() {
+        return feature.get(OPER_FEATURE.FILTERBLOOM.ordinal());
+    }
+
+    public void markFilterBloom() {
+        feature.set(OPER_FEATURE.FILTERBLOOM.ordinal());
+    }
+
     public void copyFeatures(TezOperator copyFrom, List<OPER_FEATURE> excludeFeatures) {
         for (OPER_FEATURE opf : OPER_FEATURE.values()) {
             if (excludeFeatures != null && excludeFeatures.contains(opf)) {
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java
index bf0a837..21ccbd3 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java
@@ -31,6 +31,7 @@
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.BloomPackager;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POLocalRearrangeTez;
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.VisitorException;
@@ -161,7 +162,7 @@
         @Override
         public void visitLocalRearrange(POLocalRearrange lrearrange) throws VisitorException {
             POLocalRearrangeTez lr = (POLocalRearrangeTez) lrearrange;
-            if (!(lr.isConnectedToPackage() && lr.getOutputKey().equals(pkgTezOp.getOperatorKey().toString()))) {
+            if (!(lr.isConnectedToPackage() && lr.containsOutputKey(pkgTezOp.getOperatorKey().toString()))) {
                 return;
             }
             loRearrangeFound++;
@@ -180,7 +181,9 @@
             if(keyInfo == null)
                 keyInfo = new HashMap<Integer, Pair<Boolean, Map<Integer, Integer>>>();
 
-            Integer index = Integer.valueOf(lrearrange.getIndex());
+            // For BloomPackager there is only one input, but the
+            // POBuildBloomRearrangeTez index is that of the join's index and can be non-zero
+            Integer index = (pkg.getPkgr() instanceof BloomPackager) ? 0 : Integer.valueOf(lrearrange.getIndex());
             if(keyInfo.get(index) != null) {
                 if (isPOSplit) {
                     // Case of POSplit having more than one input in case of self join or union
@@ -197,12 +200,20 @@
 
             }
 
-            keyInfo.put(index,
-                    new Pair<Boolean, Map<Integer, Integer>>(
-                            lrearrange.isProjectStar(), lrearrange.getProjectedColsMap()));
-            pkg.getPkgr().setKeyInfo(keyInfo);
-            pkg.getPkgr().setKeyTuple(lrearrange.isKeyTuple());
-            pkg.getPkgr().setKeyCompound(lrearrange.isKeyCompound());
+            if (pkg.getPkgr() instanceof BloomPackager ) {
+                keyInfo.put(index,
+                        new Pair<Boolean, Map<Integer, Integer>>(
+                                Boolean.FALSE, new HashMap<Integer, Integer>()));
+                pkg.getPkgr().setKeyInfo(keyInfo);
+            } else {
+                keyInfo.put(index,
+                        new Pair<Boolean, Map<Integer, Integer>>(
+                                lrearrange.isProjectStar(), lrearrange.getProjectedColsMap()));
+                pkg.getPkgr().setKeyInfo(keyInfo);
+                pkg.getPkgr().setKeyTuple(lrearrange.isKeyTuple());
+                pkg.getPkgr().setKeyCompound(lrearrange.isKeyCompound());
+            }
+
         }
 
         /**
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java
new file mode 100644
index 0000000..7e3e325
--- /dev/null
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez.plan.operator;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Iterator;
+
+import org.apache.hadoop.util.bloom.BloomFilter;
+import org.apache.hadoop.util.bloom.Key;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager;
+import org.apache.pig.builtin.BuildBloomBase;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+
+public class BloomPackager extends Packager {
+
+    private static final long serialVersionUID = 1L;
+
+    private boolean bloomCreatedInMap;
+    private int vectorSizeBytes;
+    private int numHash;
+    private int hashType;
+    private byte bloomKeyType;
+    private boolean isCombiner;
+
+    private transient ByteArrayOutputStream baos;
+    private transient Iterator<Object> distinctKeyIter;
+
+    public BloomPackager(boolean bloomCreatedInMap, int vectorSizeBytes,
+            int numHash, int hashType) {
+        super();
+        this.bloomCreatedInMap = bloomCreatedInMap;
+        this.vectorSizeBytes = vectorSizeBytes;
+        this.numHash = numHash;
+        this.hashType = hashType;
+    }
+
+    public void setBloomKeyType(byte keyType) {
+        bloomKeyType = keyType;
+    }
+
+    public void setCombiner(boolean isCombiner) {
+        this.isCombiner = isCombiner;
+    }
+
+    @Override
+    public void attachInput(Object key, DataBag[] bags, boolean[] readOnce)
+            throws ExecException {
+        this.key = key;
+        this.bags = bags;
+        this.readOnce = readOnce;
+        // Bag can be read directly and need not be materialized again
+    }
+
+    @Override
+    public Result getNext() throws ExecException {
+        try {
+            if (bloomCreatedInMap) {
+                if (bags == null) {
+                    return new Result(POStatus.STATUS_EOP, null);
+                }
+                // Same function for combiner and reducer
+                return combineBloomFilters();
+            } else {
+                if (isCombiner) {
+                    return getDistinctBloomKeys();
+                } else {
+                    if (bags == null) {
+                        return new Result(POStatus.STATUS_EOP, null);
+                    }
+                    return createBloomFilter();
+                }
+            }
+        } catch (IOException e) {
+            throw new ExecException("Error while constructing final bloom filter", e);
+        }
+    }
+
+    private Result combineBloomFilters() throws IOException {
+        // We get a bag of bloom filters. combine them into one
+        Iterator<Tuple> iter = bags[0].iterator();
+        Tuple tup = iter.next();
+        DataByteArray bloomBytes = (DataByteArray) tup.get(0);
+        BloomFilter bloomFilter = BuildBloomBase.bloomIn(bloomBytes);
+        while (iter.hasNext()) {
+            tup = iter.next();
+            bloomFilter.or(BuildBloomBase.bloomIn((DataByteArray) tup.get(0)));
+        }
+
+        Object partition = key;
+        detachInput(); // Free up the key and bags reference
+
+        return getSerializedBloomFilter(partition, bloomFilter, bloomBytes.get().length);
+    }
+
+    private Result createBloomFilter() throws IOException {
+        // We get a bag of keys. Create a bloom filter from them
+        // First do distinct of the keys. Not using DistinctBag as memory should not be a problem.
+        HashSet<Object> bloomKeys = new HashSet<>();
+        Iterator<Tuple> iter = bags[0].iterator();
+        while (iter.hasNext()) {
+            bloomKeys.add(iter.next().get(0));
+        }
+
+        Object partition = key;
+        detachInput(); // Free up the key and bags reference
+
+        BloomFilter bloomFilter = new BloomFilter(vectorSizeBytes * 8, numHash, hashType);
+        for (Object bloomKey: bloomKeys) {
+            Key k = new Key(DataType.toBytes(bloomKey, bloomKeyType));
+            bloomFilter.add(k);
+        }
+        bloomKeys = null;
+        return getSerializedBloomFilter(partition, bloomFilter, vectorSizeBytes + 64);
+
+    }
+
+    private Result getSerializedBloomFilter(Object partition,
+            BloomFilter bloomFilter, int serializedSize) throws ExecException,
+            IOException {
+        if (baos == null) {
+            baos = new ByteArrayOutputStream(serializedSize);
+        }
+        baos.reset();
+        DataOutputStream dos = new DataOutputStream(baos);
+        bloomFilter.write(dos);
+        dos.flush();
+
+        Tuple res = mTupleFactory.newTuple(2);
+        res.set(0, partition);
+        res.set(1, new DataByteArray(baos.toByteArray()));
+
+        Result r = new Result();
+        r.result = res;
+        r.returnStatus = POStatus.STATUS_OK;
+        return r;
+    }
+
+    private Result getDistinctBloomKeys() throws ExecException {
+        if (distinctKeyIter == null) {
+            HashSet<Object> bloomKeys = new HashSet<>();
+            Iterator<Tuple> iter = bags[0].iterator();
+            while (iter.hasNext()) {
+                bloomKeys.add(iter.next().get(0));
+            }
+            distinctKeyIter = bloomKeys.iterator();
+        }
+        while (distinctKeyIter.hasNext()) {
+            Tuple res = mTupleFactory.newTuple(2);
+            res.set(0, key);
+            res.set(1, distinctKeyIter.next());
+
+            Result r = new Result();
+            r.result = res;
+            r.returnStatus = POStatus.STATUS_OK;
+            return r;
+        }
+        distinctKeyIter = null;
+        return new Result(POStatus.STATUS_EOP, null);
+    }
+}
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBloomFilterRearrangeTez.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBloomFilterRearrangeTez.java
new file mode 100644
index 0000000..82b599d
--- /dev/null
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBloomFilterRearrangeTez.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez.plan.operator;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.bloom.BloomFilter;
+import org.apache.hadoop.util.bloom.Key;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.HDataType;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.tez.runtime.ObjectCache;
+import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
+import org.apache.pig.builtin.BuildBloomBase;
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.classification.InterfaceStability;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class POBloomFilterRearrangeTez extends POLocalRearrangeTez implements TezInput {
+    private static final long serialVersionUID = 1L;
+
+    private static final Log LOG = LogFactory.getLog(POBloomFilterRearrangeTez.class);
+    private String inputKey;
+    private transient KeyValueReader reader;
+    private transient String cacheKey;
+    private int numBloomFilters;
+    private transient BloomFilter[] bloomFilters;
+
+    public POBloomFilterRearrangeTez(POLocalRearrangeTez lr, int numBloomFilters) {
+        super(lr);
+        this.numBloomFilters = numBloomFilters;
+    }
+
+    public void setInputKey(String inputKey) {
+        this.inputKey = inputKey;
+    }
+
+    @Override
+    public String[] getTezInputs() {
+        return new String[] { inputKey };
+    }
+
+    @Override
+    public void replaceInput(String oldInputKey, String newInputKey) {
+        if (oldInputKey.equals(inputKey)) {
+            inputKey = newInputKey;
+        }
+    }
+
+    @Override
+    public void addInputsToSkip(Set<String> inputsToSkip) {
+        cacheKey = "bloom-" + inputKey;
+        Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey);
+        if (cacheValue != null) {
+            inputsToSkip.add(inputKey);
+        }
+    }
+
+    @Override
+    public void attachInputs(Map<String, LogicalInput> inputs,
+            Configuration conf) throws ExecException {
+        Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey);
+        if (cacheValue != null) {
+            bloomFilters = (BloomFilter[]) cacheValue;
+            return;
+        }
+        LogicalInput input = inputs.get(inputKey);
+        if (input == null) {
+            throw new ExecException("Input from vertex " + inputKey + " is missing");
+        }
+        try {
+            reader = (KeyValueReader) input.getReader();
+            LOG.info("Attached input from vertex " + inputKey + " : input=" + input + ", reader=" + reader);
+            while (reader.next()) {
+                if (bloomFilters == null) {
+                    bloomFilters = new BloomFilter[numBloomFilters];
+                }
+                Tuple val = (Tuple) reader.getCurrentValue();
+                int index = (int) val.get(0);
+                bloomFilters[index] = BuildBloomBase.bloomIn((DataByteArray) val.get(1));
+            }
+            ObjectCache.getInstance().cache(cacheKey, bloomFilters);
+        } catch (Exception e) {
+            throw new ExecException(e);
+        }
+    }
+
+    @Override
+    public Result getNextTuple() throws ExecException {
+
+        // If there is no bloom filter, then it means right input was empty
+        // Skip processing
+        if (bloomFilters == null) {
+            return RESULT_EOP;
+        }
+
+        while (true) {
+            res = super.getRearrangedTuple();
+            try {
+                switch (res.returnStatus) {
+                case POStatus.STATUS_OK:
+                    if (illustrator == null) {
+                        Tuple result = (Tuple) res.result;
+                        Byte index = (Byte) result.get(0);
+
+                        // Skip the record if key is not in the bloom filter
+                        if (!isKeyInBloomFilter(result.get(1))) {
+                            continue;
+                        }
+                        PigNullableWritable key = HDataType.getWritableComparableTypes(result.get(1), keyType);
+                        NullableTuple val = new NullableTuple((Tuple)result.get(2));
+                        key.setIndex(index);
+                        val.setIndex(index);
+                        writer.write(key, val);
+                    } else {
+                        illustratorMarkup(res.result, res.result, 0);
+                    }
+                    continue;
+                case POStatus.STATUS_NULL:
+                    continue;
+                case POStatus.STATUS_EOP:
+                case POStatus.STATUS_ERR:
+                default:
+                    return res;
+                }
+            } catch (IOException ioe) {
+                int errCode = 2135;
+                String msg = "Received error from POBloomFilterRearrage function." + ioe.getMessage();
+                throw new ExecException(msg, errCode, ioe);
+            }
+        }
+    }
+
+    private boolean isKeyInBloomFilter(Object key) throws ExecException {
+        if (key == null) {
+            // Null values are dropped in a inner join and in the case of outer join,
+            // POBloomFilterRearrangeTez is only in the plan on the non outer relation.
+            // So just skip them
+            return false;
+        }
+        if (bloomFilters.length == 1) {
+            // Skip computing hashcode
+            Key k = new Key(DataType.toBytes(key, keyType));
+            return bloomFilters[0].membershipTest(k);
+        } else {
+            int partition = (key.hashCode() & Integer.MAX_VALUE) % numBloomFilters;
+            BloomFilter filter = bloomFilters[partition];
+            if (filter != null) {
+                Key k = new Key(DataType.toBytes(key, keyType));
+                return filter.membershipTest(k);
+            }
+            return false;
+        }
+    }
+
+    @Override
+    public POBloomFilterRearrangeTez clone() throws CloneNotSupportedException {
+        return (POBloomFilterRearrangeTez) super.clone();
+    }
+
+    @Override
+    public String name() {
+        return getAliasString() + "BloomFilter Rearrange" + "["
+                + DataType.findTypeName(resultType) + "]" + "{"
+                + DataType.findTypeName(keyType) + "}" + "(" + mIsDistinct
+                + ") - " + mKey.toString() + "\t<-\t " + inputKey + "\t->\t " + outputKey;
+    }
+
+}
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBuildBloomRearrangeTez.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBuildBloomRearrangeTez.java
new file mode 100644
index 0000000..eb8a612
--- /dev/null
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBuildBloomRearrangeTez.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez.plan.operator;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.bloom.BloomFilter;
+import org.apache.hadoop.util.bloom.Key;
+import org.apache.pig.PigConfiguration;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.HDataType;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.classification.InterfaceStability;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.NullableIntWritable;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
+
+/**
+ * This operator writes out the key value for the hash join reduce operation similar to POLocalRearrangeTez.
+ * In addition, it also writes out the bloom filter constructed from the join keys
+ * in the case of bloomjoin map strategy or join keys themselves in case of reduce strategy.
+ *
+ * Using multiple bloom filters partitioned by the hash of the key allows for parallelism.
+ * It also allows us to have lower false positives with smaller vector sizes.
+ *
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class POBuildBloomRearrangeTez extends POLocalRearrangeTez {
+    private static final long serialVersionUID = 1L;
+    private static final Log LOG = LogFactory.getLog(POBuildBloomRearrangeTez.class);
+
+    public static final String DEFAULT_BLOOM_STRATEGY = "map";
+    public static final int DEFAULT_NUM_BLOOM_FILTERS_REDUCE = 11;
+    public static final int DEFAULT_NUM_BLOOM_HASH_FUNCTIONS = 3;
+    public static final String DEFAULT_BLOOM_HASH_TYPE = "murmur";
+    public static final int DEFAULT_BLOOM_VECTOR_SIZE_BYTES = 1024 * 1024;
+
+    private String bloomOutputKey;
+    private boolean skipNullKeys = false;
+    private boolean createBloomInMap;
+    private int numBloomFilters;
+    private int vectorSizeBytes;
+    private int numHash;
+    private int hashType;
+
+    private transient BloomFilter[] bloomFilters;
+    private transient KeyValueWriter bloomWriter;
+    private transient PigNullableWritable nullKey;
+    private transient Tuple bloomValue;
+    private transient NullableTuple bloomNullableTuple;
+
+    public POBuildBloomRearrangeTez(POLocalRearrangeTez lr,
+            boolean createBloomInMap, int numBloomFilters, int vectorSizeBytes,
+            int numHash, int hashType) {
+        super(lr);
+        this.createBloomInMap = createBloomInMap;
+        this.numBloomFilters = numBloomFilters;
+        this.vectorSizeBytes = vectorSizeBytes;
+        this.numHash = numHash;
+        this.hashType = hashType;
+    }
+
+    public static int getNumBloomFilters(Configuration conf) {
+        if ("map".equals(conf.get(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, DEFAULT_BLOOM_STRATEGY))) {
+            return conf.getInt(PigConfiguration.PIG_BLOOMJOIN_NUM_FILTERS, 1);
+        } else {
+            return conf.getInt(PigConfiguration.PIG_BLOOMJOIN_NUM_FILTERS, DEFAULT_NUM_BLOOM_FILTERS_REDUCE);
+        }
+    }
+
+    public void setSkipNullKeys(boolean skipNullKeys) {
+        this.skipNullKeys = skipNullKeys;
+    }
+
+    public void setBloomOutputKey(String bloomOutputKey) {
+        this.bloomOutputKey = bloomOutputKey;
+    }
+
+    @Override
+    public boolean containsOutputKey(String key) {
+        if(super.containsOutputKey(key)) {
+            return true;
+        }
+        return bloomOutputKey.equals(key);
+    }
+
+    @Override
+    public String[] getTezOutputs() {
+        return new String[] { outputKey, bloomOutputKey };
+    }
+
+    @Override
+    public void replaceOutput(String oldOutputKey, String newOutputKey) {
+        if (oldOutputKey.equals(outputKey)) {
+            outputKey = newOutputKey;
+        } else if (oldOutputKey.equals(bloomOutputKey)) {
+            bloomOutputKey = newOutputKey;
+        }
+    }
+
+    @Override
+    public void attachOutputs(Map<String, LogicalOutput> outputs,
+            Configuration conf) throws ExecException {
+        super.attachOutputs(outputs, conf);
+        LogicalOutput output = outputs.get(bloomOutputKey);
+        if (output == null) {
+            throw new ExecException("Output to vertex " + bloomOutputKey + " is missing");
+        }
+        try {
+            bloomWriter = (KeyValueWriter) output.getWriter();
+            LOG.info("Attached output to vertex " + bloomOutputKey + " : output=" + output + ", writer=" + bloomWriter);
+        } catch (Exception e) {
+            throw new ExecException(e);
+        }
+        bloomFilters = new BloomFilter[numBloomFilters];
+        bloomValue = mTupleFactory.newTuple(1);
+        bloomNullableTuple = new NullableTuple(bloomValue);
+    }
+
+    @Override
+    public Result getNextTuple() throws ExecException {
+
+        PigNullableWritable key;
+        while (true) {
+            res = super.getRearrangedTuple();
+            try {
+                switch (res.returnStatus) {
+                case POStatus.STATUS_OK:
+                    if (illustrator == null) {
+                        Tuple result = (Tuple) res.result;
+                        Byte index = (Byte) result.get(0);
+
+                        Object keyObj = result.get(1);
+                        if (keyObj != null) {
+                            key = HDataType.getWritableComparableTypes(keyObj, keyType);
+                            // null keys cannot be part of bloom filter
+                            // Since they are also dropped during join we can skip them
+                            if (createBloomInMap) {
+                                addKeyToBloomFilter(keyObj);
+                            } else {
+                                writeJoinKeyForBloom(keyObj);
+                            }
+                        } else if (skipNullKeys) {
+                            // Inner join. So don't bother writing null key
+                            continue;
+                        } else {
+                            if (nullKey == null) {
+                                nullKey = HDataType.getWritableComparableTypes(keyObj, keyType);
+                            }
+                            key = nullKey;
+                        }
+
+                        NullableTuple val = new NullableTuple((Tuple)result.get(2));
+                        key.setIndex(index);
+                        val.setIndex(index);
+                        writer.write(key, val);
+                    } else {
+                        illustratorMarkup(res.result, res.result, 0);
+                    }
+                    continue;
+                case POStatus.STATUS_NULL:
+                    continue;
+                case POStatus.STATUS_EOP:
+                    if (this.parentPlan.endOfAllInput && createBloomInMap) {
+                        // In case of Split will get EOP after every record.
+                        // So check for endOfAllInput
+                        writeBloomFilters();
+                    }
+                case POStatus.STATUS_ERR:
+                default:
+                    return res;
+                }
+            } catch (IOException ioe) {
+                int errCode = 2135;
+                String msg = "Received error from POBuildBloomRearrage function." + ioe.getMessage();
+                throw new ExecException(msg, errCode, ioe);
+            }
+        }
+    }
+
+    private void addKeyToBloomFilter(Object key) throws ExecException {
+        Key k = new Key(DataType.toBytes(key, keyType));
+        if (bloomFilters.length == 1) {
+            if (bloomFilters[0] == null) {
+                bloomFilters[0] = new BloomFilter(vectorSizeBytes * 8, numHash, hashType);
+            }
+            bloomFilters[0].add(k);
+        } else {
+            int partition = (key.hashCode() & Integer.MAX_VALUE) % numBloomFilters;
+            BloomFilter filter = bloomFilters[partition];
+            if (filter == null) {
+                filter = new BloomFilter(vectorSizeBytes * 8, numHash, hashType);
+                bloomFilters[partition] = filter;
+            }
+            filter.add(k);
+        }
+    }
+
+    private void writeJoinKeyForBloom(Object key) throws IOException {
+        int partition = (key.hashCode() & Integer.MAX_VALUE) % numBloomFilters;
+        bloomValue.set(0, key);
+        bloomWriter.write(new NullableIntWritable(partition), bloomNullableTuple);
+    }
+
+    private void writeBloomFilters() throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream(vectorSizeBytes + 64);
+        for (int i = 0; i < bloomFilters.length; i++) {
+            if (bloomFilters[i] != null) {
+                DataOutputStream dos = new DataOutputStream(baos);
+                bloomFilters[i].write(dos);
+                dos.flush();
+                bloomValue.set(0, new DataByteArray(baos.toByteArray()));
+                bloomWriter.write(new NullableIntWritable(i), bloomNullableTuple);
+                baos.reset();
+            }
+        }
+    }
+
+    @Override
+    public POBuildBloomRearrangeTez clone() throws CloneNotSupportedException {
+        return (POBuildBloomRearrangeTez) super.clone();
+    }
+
+    @Override
+    public String name() {
+        return getAliasString() + "BuildBloom Rearrange" + "["
+                + DataType.findTypeName(resultType) + "]" + "{"
+                + DataType.findTypeName(keyType) + "}" + "(" + mIsDistinct
+                + ") - " + mKey.toString() + "\t->\t[ " + outputKey + ", " + bloomOutputKey +"]";
+    }
+
+}
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java
index 0f07483..1552103 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java
@@ -71,8 +71,8 @@
         }
     }
 
-    public String getOutputKey() {
-        return outputKey;
+    public boolean containsOutputKey(String key) {
+        return outputKey.equals(key);
     }
 
     public void setOutputKey(String outputKey) {
@@ -122,6 +122,10 @@
         }
     }
 
+    protected Result getRearrangedTuple() throws ExecException {
+        return super.getNextTuple();
+    }
+
     @Override
     public Result getNextTuple() throws ExecException {
         res = super.getNextTuple();
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java
index 2914128..99ebf1c 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java
@@ -129,7 +129,9 @@
                 finished[i] = !readers.get(i).next();
             }
 
-            this.readOnceOneBag = (numInputs == 1) && (pkgr instanceof CombinerPackager || pkgr instanceof LitePackager);
+            this.readOnceOneBag = (numInputs == 1)
+                    && (pkgr instanceof CombinerPackager
+                            || pkgr instanceof LitePackager || pkgr instanceof BloomPackager);
             if (readOnceOneBag) {
                 readOnce[0] = true;
             }
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/CombinerOptimizer.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/CombinerOptimizer.java
index a14a9b7..d0b994d 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/CombinerOptimizer.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/CombinerOptimizer.java
@@ -69,6 +69,11 @@
         }
 
         for (TezOperator from : predecessors) {
+            PhysicalPlan combinePlan = to.inEdges.get(from.getOperatorKey()).combinePlan;
+            if (!combinePlan.isEmpty()) {
+                // Cases like bloom join have combine plan already set
+                continue;
+            }
             List<POLocalRearrangeTez> rearranges = PlanHelper.getPhysicalOperators(from.plan, POLocalRearrangeTez.class);
             if (rearranges.isEmpty()) {
                 continue;
@@ -77,7 +82,7 @@
             POLocalRearrangeTez connectingLR = null;
             PhysicalPlan rearrangePlan = from.plan;
             for (POLocalRearrangeTez lr : rearranges) {
-                if (lr.getOutputKey().equals(to.getOperatorKey().toString())) {
+                if (lr.containsOutputKey(to.getOperatorKey().toString())) {
                     connectingLR = lr;
                     break;
                 }
@@ -90,7 +95,6 @@
 
             // Detected the POLocalRearrange -> POPackage pattern. Let's add
             // combiner if possible.
-            PhysicalPlan combinePlan = to.inEdges.get(from.getOperatorKey()).combinePlan;
             CombinerOptimizerUtil.addCombiner(rearrangePlan, to.plan, combinePlan, messageCollector, doMapAgg);
 
             if(!combinePlan.isEmpty()) {
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
index 6e35c37..b9de7d0 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
@@ -123,8 +123,8 @@
                     boolean overrideRequestedParallelism = false;
                     if (parallelism != -1
                             && autoParallelismEnabled
-                            && tezOp.isIntermediateReducer()
                             && !tezOp.isDontEstimateParallelism()
+                            && tezOp.isIntermediateReducer()
                             && tezOp.isOverrideIntermediateParallelism()) {
                         overrideRequestedParallelism = true;
                     }
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java
index c56f83f..caf1786 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java
@@ -75,7 +75,7 @@
         POLocalRearrangeTez connectingLR = null;
         PhysicalPlan rearrangePlan = from.plan;
         for (POLocalRearrangeTez lr : rearranges) {
-            if (lr.getOutputKey().equals(to.getOperatorKey().toString())) {
+            if (lr.containsOutputKey(to.getOperatorKey().toString())) {
                 connectingLR = lr;
                 break;
             }
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezEstimatedParallelismClearer.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezEstimatedParallelismClearer.java
index 5020df8..d17a75d 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezEstimatedParallelismClearer.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezEstimatedParallelismClearer.java
@@ -30,6 +30,8 @@
 
     @Override
     public void visitTezOp(TezOperator tezOp) throws VisitorException {
-        tezOp.setEstimatedParallelism(-1);
+        if (!tezOp.isDontEstimateParallelism()) {
+            tezOp.setEstimatedParallelism(-1);
+        }
     }
 }
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
index c8afd61..d892e9e 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
@@ -97,16 +97,16 @@
 
         bytesPerReducer = conf.getLong(PigReducerEstimator.BYTES_PER_REDUCER_PARAM, PigReducerEstimator.DEFAULT_BYTES_PER_REDUCER);
 
+        // If we have already estimated parallelism, use that one
+        if (tezOper.getEstimatedParallelism() != -1) {
+            return tezOper.getEstimatedParallelism();
+        }
+
         // If parallelism is set explicitly, respect it
         if (!tezOper.isIntermediateReducer() && tezOper.getRequestedParallelism()!=-1) {
             return tezOper.getRequestedParallelism();
         }
 
-        // If we have already estimated parallelism, use that one
-        if (tezOper.getEstimatedParallelism()!=-1) {
-            return tezOper.getEstimatedParallelism();
-        }
-
         List<TezOperator> preds = plan.getPredecessors(tezOper);
         if (preds==null) {
             throw new IOException("Cannot estimate parallelism for source vertex");
diff --git a/src/org/apache/pig/newplan/logical/relational/LOJoin.java b/src/org/apache/pig/newplan/logical/relational/LOJoin.java
index d1d5d8c..5698bb5 100644
--- a/src/org/apache/pig/newplan/logical/relational/LOJoin.java
+++ b/src/org/apache/pig/newplan/logical/relational/LOJoin.java
@@ -38,6 +38,7 @@
      */
     public static enum JOINTYPE {
         HASH,    // Hash Join
+        BLOOM,   // Bloom Join
         REPLICATED, // Fragment Replicated join
         SKEWED, // Skewed Join
         MERGE,   // Sort Merge Join
diff --git a/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java b/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
index 99df7f0..7faa1fd 100644
--- a/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
+++ b/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
@@ -1414,7 +1414,7 @@
 
             return;
         }
-        else if (loj.getJoinType() == LOJoin.JOINTYPE.HASH){
+        else if (loj.getJoinType() == LOJoin.JOINTYPE.HASH || loj.getJoinType() == LOJoin.JOINTYPE.BLOOM){
             POPackage poPackage = compileToLR_GR_PackTrio(loj, loj.getCustomPartitioner(), innerFlags, loj.getExpressionPlans());
             POForEach fe = compileFE4Flattening(innerFlags,  scope, parallel, alias, location, inputs);
             currentPlan.add(fe);
@@ -1425,7 +1425,20 @@
                         e.getErrorCode(),e.getErrorSource(),e);
             }
             logToPhyMap.put(loj, fe);
-            poPackage.getPkgr().setPackageType(PackageType.JOIN);
+            if (loj.getJoinType() == LOJoin.JOINTYPE.BLOOM) {
+                if (innerFlags.length == 2) {
+                    if (innerFlags[0] == false && innerFlags[1] == false) {
+                        throw new LogicalToPhysicalTranslatorException(
+                                "Error at " + loj.getLocation() + " with alias "+ loj.getAlias() +
+                                        ". Bloom join cannot be used with a FULL OUTER join.",
+                                1109,
+                                PigException.INPUT);
+                    }
+                }
+                poPackage.getPkgr().setPackageType(PackageType.BLOOMJOIN);
+            } else {
+                poPackage.getPkgr().setPackageType(PackageType.JOIN);
+            }
         }
         translateSoftLinks(loj);
     }
diff --git a/src/org/apache/pig/parser/LogicalPlanBuilder.java b/src/org/apache/pig/parser/LogicalPlanBuilder.java
index 4cc2e18..b160585 100644
--- a/src/org/apache/pig/parser/LogicalPlanBuilder.java
+++ b/src/org/apache/pig/parser/LogicalPlanBuilder.java
@@ -1788,6 +1788,8 @@
             return JOINTYPE.REPLICATED;
          } else if( modifier.equalsIgnoreCase( "hash" ) || modifier.equalsIgnoreCase( "default" ) ) {
              return LOJoin.JOINTYPE.HASH;
+         } else if( modifier.equalsIgnoreCase( "bloom" ) ) {
+             return LOJoin.JOINTYPE.BLOOM;
          } else if( modifier.equalsIgnoreCase( "skewed" ) ) {
              return JOINTYPE.SKEWED;
          } else if (modifier.equalsIgnoreCase("merge")) {
@@ -1796,7 +1798,7 @@
              return JOINTYPE.MERGESPARSE;
          } else {
              throw new ParserValidationException( intStream, loc,
-                      "Only REPL, REPLICATED, HASH, SKEWED, MERGE, and MERGE-SPARSE are vaild JOIN modifiers." );
+                      "Only REPL, REPLICATED, HASH, BLOOM, SKEWED, MERGE, and MERGE-SPARSE are vaild JOIN modifiers." );
          }
     }
 
diff --git a/src/org/apache/pig/tools/pigstats/ScriptState.java b/src/org/apache/pig/tools/pigstats/ScriptState.java
index 6eff0d5..e755f1c 100644
--- a/src/org/apache/pig/tools/pigstats/ScriptState.java
+++ b/src/org/apache/pig/tools/pigstats/ScriptState.java
@@ -133,6 +133,8 @@
         MERGE_SPARSE_JOIN,
         REPLICATED_JOIN,
         SKEWED_JOIN,
+        BUILD_BLOOM,
+        FILTER_BLOOM,
         HASH_JOIN,
         COLLECTED_GROUP,
         MERGE_COGROUP,
@@ -312,7 +314,7 @@
                 maxScriptSize = Integer.valueOf(prop);
             }
         }
-       
+
         this.truncatedScript = (script.length() > maxScriptSize) ? script.substring(0, maxScriptSize)
                                                    : script;
 
@@ -485,6 +487,10 @@
         public void visit(LOJoin op) {
             if (op.getJoinType() == JOINTYPE.HASH) {
                 feature.set(PIG_FEATURE.HASH_JOIN.ordinal());
+            } else if (op.getJoinType() == JOINTYPE.BLOOM) {
+                feature.set(PIG_FEATURE.HASH_JOIN.ordinal());
+                feature.set(PIG_FEATURE.BUILD_BLOOM.ordinal());
+                feature.set(PIG_FEATURE.FILTER_BLOOM.ordinal());
             } else if (op.getJoinType() == JOINTYPE.MERGE) {
                 feature.set(PIG_FEATURE.MERGE_JOIN.ordinal());
             } else if (op.getJoinType() == JOINTYPE.MERGESPARSE) {
@@ -506,6 +512,7 @@
             feature.set(PIG_FEATURE.RANK.ordinal());
         }
 
+        @Override
         public void visit(LOSort op) {
             feature.set(PIG_FEATURE.ORDER_BY.ordinal());
         }
diff --git a/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java b/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java
index fd4256b..716f079 100644
--- a/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java
+++ b/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java
@@ -275,6 +275,12 @@
                 if (tezOp.isRegularJoin()) {
                     feature.set(PIG_FEATURE.HASH_JOIN.ordinal());
                 }
+                if (tezOp.isBuildBloom()) {
+                    feature.set(PIG_FEATURE.BUILD_BLOOM.ordinal());
+                }
+                if (tezOp.isFilterBloom()) {
+                    feature.set(PIG_FEATURE.FILTER_BLOOM.ordinal());
+                }
                 if (tezOp.isUnion()) {
                     feature.set(PIG_FEATURE.UNION.ordinal());
                 }
diff --git a/test/e2e/pig/build.xml b/test/e2e/pig/build.xml
index a0dc32d..1ec9cf6 100644
--- a/test/e2e/pig/build.xml
+++ b/test/e2e/pig/build.xml
@@ -137,6 +137,7 @@
       <path path="${test.location}/tests/multiquery.conf"/>
       <path path="${test.location}/tests/negative.conf"/>
       <path path="${test.location}/tests/nightly.conf"/>
+      <path path="${test.location}/tests/join.conf"/>
       <path path="${test.location}/tests/streaming.conf"/>
       <path path="${test.location}/tests/streaming_local.conf"/>
       <path path="${test.location}/tests/turing_jython.conf"/>
diff --git a/test/e2e/pig/tests/join.conf b/test/e2e/pig/tests/join.conf
new file mode 100644
index 0000000..97f6c05
--- /dev/null
+++ b/test/e2e/pig/tests/join.conf
@@ -0,0 +1,310 @@
+#!/usr/bin/env perl
+############################################################################           
+#  Licensed to the Apache Software Foundation (ASF) under one or more                  
+#  contributor license agreements.  See the NOTICE file distributed with               
+#  this work for additional information regarding copyright ownership.                 
+#  The ASF licenses this file to You under the Apache License, Version 2.0             
+#  (the "License"); you may not use this file except in compliance with                
+#  the License.  You may obtain a copy of the License at                               
+#                                                                                      
+#      http://www.apache.org/licenses/LICENSE-2.0                                      
+#                                                                                      
+#  Unless required by applicable law or agreed to in writing, software                 
+#  distributed under the License is distributed on an "AS IS" BASIS,                   
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.            
+#  See the License for the specific language governing permissions and                 
+#  limitations under the License.                                                      
+                                                                                       
+###############################################################################
+
+$cfg = {
+    'driver' => 'Pig',
+
+    'groups' => [
+        {
+        'name' => 'BloomJoin_Map',
+        'execonly' => 'tez',
+        'tests' => [
+            {
+            # Tuple join key
+            'num' => 1,
+            'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions);
+--c = filter a by age < 20;
+--d = filter b by age < 20;
+e = join a by (name, age), b by (name, age) using 'bloom';
+store e into ':OUTPATH:';\,
+            'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions);
+--c = filter a by age < 20;
+--d = filter b by age < 20;
+e = join a by (name, age), b by (name, age);
+store e into ':OUTPATH:';\,
+            },
+            {
+            # bytearray join key
+            'num' => 2,
+            'pig' => q\
+SET mapreduce.input.fileinputformat.split.maxsize '50000';
+SET pig.splitCombination false;
+a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions);
+c = filter a by age < 20;
+d = filter b by age < 20;
+e = join c by name, d by name using 'bloom';
+store e into ':OUTPATH:';\,
+            'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions);
+c = filter a by age < 20;
+d = filter b by age < 20;
+e = join c by name, d by name;
+store e into ':OUTPATH:';\,
+            },
+            {
+            # Left outer join and chararray join key
+            'num' => 3,
+            'pig' => q\
+SET mapreduce.input.fileinputformat.split.maxsize '50000';
+SET pig.splitCombination false;
+a = load ':INPATH:/singlefile/studentnulltab10k' as (name:chararray, age, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name:chararray, age, registration, contributions);
+c = join a by name left, b by name using 'bloom';
+d = foreach c generate a::name, a::age, gpa, registration, contributions;
+store d into ':OUTPATH:';\,
+            'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name:chararray, age, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name:chararray, age, registration, contributions);
+c = join a by name left, b by name;
+d = foreach c generate a::name, a::age, gpa, registration, contributions;
+store d into ':OUTPATH:';\,
+            },
+            {
+            # Right outer join
+            'num' => 4,
+            'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name:chararray, age:int, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name:chararray, age:int, registration, contributions);
+c = join a by (name,age) right, b by (name,age) using 'bloom';
+store c into ':OUTPATH:';\,
+            'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name:chararray, age:int, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name:chararray, age:int, registration, contributions);
+c = join a by (name,age) right, b by (name,age);
+store c into ':OUTPATH:';\,
+            },
+            {
+            # Left input from a union
+            'num' => 5,
+            'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa);
+b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name:chararray, age:int, gpa);
+c = union a, b;
+d = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
+d = filter d by age > 60;
+e = join c by name, d by name using 'bloom' PARALLEL 3;
+store e into ':OUTPATH:';\,
+            'verify_pig_script' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa);
+b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name:chararray, age:int, gpa);
+c = union a, b;
+d = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
+d = filter d by age > 60;
+e = join c by name, d by name;
+store e into ':OUTPATH:';\,
+            },
+            {
+            # Right input from a union and integer join key
+            'num' => 6,
+            'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa);
+b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name:chararray, age:int, gpa);
+c = union a, b;
+c = filter c by age > 75;
+d = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
+e = join d by age, c by age using 'bloom' PARALLEL 3;
+store e into ':OUTPATH:';\,
+            'verify_pig_script' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa);
+b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name:chararray, age:int, gpa);
+c = union a, b;
+c = filter c by age > 75;
+d = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
+e = join d by age, c by age;
+store e into ':OUTPATH:';\,
+            },
+            {
+            # Left input from a split
+            'num' => 7,
+            'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age:int, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name, age:int, registration, contributions);
+b = filter b by age > 75;
+c = filter a by age > 50;
+d = join a by age, b by age using 'bloom';
+store c into ':OUTPATH:.1';
+store d into ':OUTPATH:.2';\,
+            'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age:int, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name, age:int, registration, contributions);
+b = filter b by age > 75;
+c = filter a by age > 50;
+d = join a by age, b by age;
+store c into ':OUTPATH:.1';
+store d into ':OUTPATH:.2';\,
+            },
+            {
+            # Right input from a split
+            'num' => 8,
+            'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age:int, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name, age:int, registration, contributions);
+c = filter a by age > 75;
+d = filter a by name == 'nick miller';
+e = join b by age, c by age using 'bloom';
+store d into ':OUTPATH:.1';
+store e into ':OUTPATH:.2';\,
+            'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age:int, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name, age:int, registration, contributions);
+c = filter a by age > 75;
+d = filter a by name == 'nick miller';
+e = join b by age, c by age;
+store d into ':OUTPATH:.1';
+store e into ':OUTPATH:.2';\,
+            },
+        ] # end of tests
+        },
+        {
+        'name' => 'BloomJoin_Reduce',
+        'execonly' => 'tez',
+        'java_params' => ['-Dpig.bloomjoin.strategy=reduce'],
+        'tests' => [
+            {
+            # Tuple join key
+            'num' => 1,
+            'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions);
+--c = filter a by age < 20;
+--d = filter b by age < 20;
+e = join a by (name, age), b by (name, age) using 'bloom';
+store e into ':OUTPATH:';\,
+            'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions);
+--c = filter a by age < 20;
+--d = filter b by age < 20;
+e = join a by (name, age), b by (name, age);
+store e into ':OUTPATH:';\,
+            },
+            {
+            # bytearray join key
+            'num' => 2,
+            'pig' => q\
+SET mapreduce.input.fileinputformat.split.maxsize '50000';
+SET pig.splitCombination false;
+a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions);
+c = filter a by age < 20;
+d = filter b by age < 20;
+e = join c by name, d by name using 'bloom';
+store e into ':OUTPATH:';\,
+            'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions);
+c = filter a by age < 20;
+d = filter b by age < 20;
+e = join c by name, d by name;
+store e into ':OUTPATH:';\,
+            },
+            {
+            # Left outer join and chararray join key
+            'num' => 3,
+            'pig' => q\
+SET mapreduce.input.fileinputformat.split.maxsize '50000';
+SET pig.splitCombination false;
+a = load ':INPATH:/singlefile/studentnulltab10k' as (name:chararray, age, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name:chararray, age, registration, contributions);
+c = join a by name left, b by name using 'bloom';
+d = foreach c generate a::name, a::age, gpa, registration, contributions;
+store d into ':OUTPATH:';\,
+            'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name:chararray, age, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name:chararray, age, registration, contributions);
+c = join a by name left, b by name;
+d = foreach c generate a::name, a::age, gpa, registration, contributions;
+store d into ':OUTPATH:';\,
+            },
+            {
+            # Right outer join
+            'num' => 4,
+            'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name:chararray, age:int, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name:chararray, age:int, registration, contributions);
+c = join a by (name,age) right, b by (name,age) using 'bloom';
+store c into ':OUTPATH:';\,
+            'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name:chararray, age:int, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name:chararray, age:int, registration, contributions);
+c = join a by (name,age) right, b by (name,age);
+store c into ':OUTPATH:';\,
+            },
+            {
+            # Left input from a union
+            'num' => 5,
+            'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa);
+b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name:chararray, age:int, gpa);
+c = union a, b;
+d = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
+d = filter d by age > 60;
+e = join c by name, d by name using 'bloom' PARALLEL 3;
+store e into ':OUTPATH:';\,
+            'verify_pig_script' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa);
+b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name:chararray, age:int, gpa);
+c = union a, b;
+d = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
+d = filter d by age > 60;
+e = join c by name, d by name;
+store e into ':OUTPATH:';\,
+            },
+            {
+            # Right input from a union and integer join key
+            'num' => 6,
+            'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa);
+b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name:chararray, age:int, gpa);
+c = union a, b;
+c = filter c by age > 75;
+d = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
+e = join d by age, c by age using 'bloom' PARALLEL 3;
+store e into ':OUTPATH:';\,
+            'verify_pig_script' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa);
+b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name:chararray, age:int, gpa);
+c = union a, b;
+c = filter c by age > 75;
+d = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
+e = join d by age, c by age;
+store e into ':OUTPATH:';\,
+            },
+            {
+            # Left input from a split
+            'num' => 7,
+            'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age:int, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name, age:int, registration, contributions);
+b = filter b by age > 75;
+c = filter a by age > 50;
+d = join a by age, b by age using 'bloom';
+store c into ':OUTPATH:.1';
+store d into ':OUTPATH:.2';\,
+            'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age:int, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name, age:int, registration, contributions);
+b = filter b by age > 75;
+c = filter a by age > 50;
+d = join a by age, b by age;
+store c into ':OUTPATH:.1';
+store d into ':OUTPATH:.2';\,
+            },
+            {
+            # Right input from a split
+            'num' => 8,
+            'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age:int, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name, age:int, registration, contributions);
+c = filter a by age > 75;
+d = filter a by name == 'nick miller';
+e = join b by age, c by age using 'bloom';
+store d into ':OUTPATH:.1';
+store e into ':OUTPATH:.2';\,
+            'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age:int, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name, age:int, registration, contributions);
+c = filter a by age > 75;
+d = filter a by name == 'nick miller';
+e = join b by age, c by age;
+store d into ':OUTPATH:.1';
+store e into ':OUTPATH:.2';\,
+            },
+        ] # end of tests
+        }
+    ] # end of groups
+};
\ No newline at end of file
diff --git a/test/e2e/pig/tests/multiquery.conf b/test/e2e/pig/tests/multiquery.conf
index 4bef81a..667659a 100644
--- a/test/e2e/pig/tests/multiquery.conf
+++ b/test/e2e/pig/tests/multiquery.conf
@@ -906,7 +906,38 @@
 

 n = JOIN a BY name, m BY name;

 store n into ':OUTPATH:';\,

-            }

+            },

+            {

+            # Self join bloom left outer

+            'num' => 12,

+            'execonly' => 'tez',

+            'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);

+b = filter a by gpa >= 3.9;

+c = filter a by gpa > 3;

+d = join b by name left outer, c by name using 'bloom';

+store d into ':OUTPATH:';\,

+            'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);

+b = filter a by gpa >= 3.9;

+c = filter a by gpa > 3;

+d = join b by name left outer, c by name;

+store d into ':OUTPATH:';\,

+            },

+            {

+            # Self join bloom left outer with strategy as reduce

+            'num' => 13,

+            'execonly' => 'tez',

+            'java_params' => ['-Dpig.bloomjoin.strategy=reduce'],

+            'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);

+b = filter a by gpa >= 3.9;

+c = filter a by gpa > 3;

+d = join b by name left outer, c by name using 'bloom';

+store d into ':OUTPATH:';\,

+            'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);

+b = filter a by gpa >= 3.9;

+c = filter a by gpa > 3;

+d = join b by name left outer, c by name;

+store d into ':OUTPATH:';\,

+            },

             ] # end of tests

         },

 

diff --git a/test/e2e/pig/tests/orc.conf b/test/e2e/pig/tests/orc.conf
index 6277586..9498d88 100644
--- a/test/e2e/pig/tests/orc.conf
+++ b/test/e2e/pig/tests/orc.conf
@@ -1,3 +1,21 @@
+#!/usr/bin/env perl
+############################################################################           
+#  Licensed to the Apache Software Foundation (ASF) under one or more                  
+#  contributor license agreements.  See the NOTICE file distributed with               
+#  this work for additional information regarding copyright ownership.                 
+#  The ASF licenses this file to You under the Apache License, Version 2.0             
+#  (the "License"); you may not use this file except in compliance with                
+#  the License.  You may obtain a copy of the License at                               
+#                                                                                      
+#      http://www.apache.org/licenses/LICENSE-2.0                                      
+#                                                                                      
+#  Unless required by applicable law or agreed to in writing, software                 
+#  distributed under the License is distributed on an "AS IS" BASIS,                   
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.            
+#  See the License for the specific language governing permissions and                 
+#  limitations under the License.                                                      
+                                                                                       
+###############################################################################
 $cfg = {
         'driver' => 'Pig',
         'nummachines' => 5,
diff --git a/test/org/apache/pig/test/TestEmptyInputDir.java b/test/org/apache/pig/test/TestEmptyInputDir.java
index ffeb34b..a9a46af 100644
--- a/test/org/apache/pig/test/TestEmptyInputDir.java
+++ b/test/org/apache/pig/test/TestEmptyInputDir.java
@@ -246,6 +246,66 @@
         }
     }
 
+    @Test
+    public void testBloomJoin() throws Exception {
+        PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
+        w.println("A = load '" + INPUT_FILE + "' as (x:int);");
+        w.println("B = load '" + EMPTY_DIR + "' as (x:int);");
+        w.println("C = join B by $0, A by $0 using 'bloom';");
+        w.println("D = join A by $0, B by $0 using 'bloom';");
+        w.println("store C into '" + OUTPUT_FILE + "';");
+        w.println("store D into 'output1';");
+        w.close();
+
+        try {
+            String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, };
+            PigStats stats = PigRunner.run(args, null);
+
+            assertTrue(stats.isSuccessful());
+            assertEquals(0, stats.getNumberRecords(OUTPUT_FILE));
+            assertEquals(0, stats.getNumberRecords("output1"));
+            assertEmptyOutputFile();
+        } finally {
+            new File(PIG_FILE).delete();
+            Util.deleteFile(cluster, OUTPUT_FILE);
+            Util.deleteFile(cluster, "output1");
+        }
+    }
+
+    @Test
+    public void testBloomJoinOuter() throws Exception {
+        PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
+        w.println("A = load '" + INPUT_FILE + "' as (x:int);");
+        w.println("B = load '" + EMPTY_DIR + "' as (x:int);");
+        w.println("C = join B by $0 left outer, A by $0 using 'bloom';");
+        w.println("D = join A by $0 left outer, B by $0 using 'bloom';");
+        w.println("E = join B by $0 right outer, A by $0 using 'bloom';");
+        w.println("F = join A by $0 right outer, B by $0 using 'bloom';");
+        w.println("store C into '" + OUTPUT_FILE + "';");
+        w.println("store D into 'output1';");
+        w.println("store E into 'output2';");
+        w.println("store F into 'output3';");
+        w.close();
+
+        try {
+            String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, };
+            PigStats stats = PigRunner.run(args, null);
+
+            assertTrue(stats.isSuccessful());
+            assertEquals(0, stats.getNumberRecords(OUTPUT_FILE));
+            assertEquals(2, stats.getNumberRecords("output1"));
+            assertEquals(2, stats.getNumberRecords("output2"));
+            assertEquals(0, stats.getNumberRecords("output3"));
+            assertEmptyOutputFile();
+        } finally {
+            new File(PIG_FILE).delete();
+            Util.deleteFile(cluster, OUTPUT_FILE);
+            Util.deleteFile(cluster, "output1");
+            Util.deleteFile(cluster, "output2");
+            Util.deleteFile(cluster, "output3");
+        }
+    }
+
     private void assertEmptyOutputFile() throws IllegalArgumentException, IOException {
         FileSystem fs = cluster.getFileSystem();
         FileStatus status = fs.getFileStatus(new Path(OUTPUT_FILE));
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-1-KeyToReducer.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-1-KeyToReducer.gld
new file mode 100644
index 0000000..c767523
--- /dev/null
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-1-KeyToReducer.gld
@@ -0,0 +1,91 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-48	->	Tez vertex scope-49,Tez vertex scope-50,
+Tez vertex scope-50	->	Tez vertex scope-46,Tez vertex scope-47,
+Tez vertex scope-46	->	Tez vertex scope-49,
+Tez vertex scope-47	->	Tez vertex scope-49,
+Tez vertex scope-49
+
+Tez vertex scope-48
+# Plan on vertex
+d: BuildBloom Rearrange[tuple]{bytearray}(false) - scope-30	->	[ scope-49, scope-50]
+|   |
+|   Project[bytearray][0] - scope-31
+|
+|---c: New For Each(false,false)[bag] - scope-20
+    |   |
+    |   Project[bytearray][0] - scope-15
+    |   |
+    |   Cast[int] - scope-18
+    |   |
+    |   |---Project[bytearray][1] - scope-17
+    |
+    |---c: Load(file:///tmp/input2:org.apache.pig.builtin.PigStorage) - scope-14
+Tez vertex scope-50
+# Combine plan on edge <scope-48>
+Local Rearrange[tuple]{int}(false) - scope-55	->	 scope-50
+|   |
+|   Project[int][0] - scope-54
+|
+|---Package(BloomPackager)[tuple]{int} - scope-53
+# Plan on vertex
+POValueOutputTez - scope-52	->	 [scope-46, scope-47]
+|
+|---Package(BloomPackager)[tuple]{int} - scope-51
+Tez vertex scope-46
+# Plan on vertex
+d: BloomFilter Rearrange[tuple]{bytearray}(false) - scope-26	<-	 scope-50	->	 scope-49
+|   |
+|   Project[bytearray][0] - scope-27
+|
+|---b: New For Each(false,false)[bag] - scope-6
+    |   |
+    |   Project[bytearray][0] - scope-1
+    |   |
+    |   Cast[int] - scope-4
+    |   |
+    |   |---Project[bytearray][1] - scope-3
+    |
+    |---b: Load(file:///tmp/input2:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-47
+# Plan on vertex
+d: BloomFilter Rearrange[tuple]{bytearray}(false) - scope-28	<-	 scope-50	->	 scope-49
+|   |
+|   Project[bytearray][0] - scope-29
+|
+|---a: New For Each(false,false)[bag] - scope-13
+    |   |
+    |   Project[bytearray][0] - scope-8
+    |   |
+    |   Cast[int] - scope-11
+    |   |
+    |   |---Project[bytearray][1] - scope-10
+    |
+    |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-7
+Tez vertex scope-49
+# Plan on vertex
+e: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-45
+|
+|---e: New For Each(false,false,false,false)[bag] - scope-44
+    |   |
+    |   Project[bytearray][2] - scope-36
+    |   |
+    |   Project[int][3] - scope-38
+    |   |
+    |   Project[int][1] - scope-40
+    |   |
+    |   Project[int][5] - scope-42
+    |
+    |---d: New For Each(true,true,true)[tuple] - scope-35
+        |   |
+        |   Project[bag][1] - scope-32
+        |   |
+        |   Project[bag][2] - scope-33
+        |   |
+        |   Project[bag][3] - scope-34
+        |
+        |---d: Package(Packager)[tuple]{bytearray} - scope-25
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-1.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-1.gld
new file mode 100644
index 0000000..c767523
--- /dev/null
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-1.gld
@@ -0,0 +1,91 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-48	->	Tez vertex scope-49,Tez vertex scope-50,
+Tez vertex scope-50	->	Tez vertex scope-46,Tez vertex scope-47,
+Tez vertex scope-46	->	Tez vertex scope-49,
+Tez vertex scope-47	->	Tez vertex scope-49,
+Tez vertex scope-49
+
+Tez vertex scope-48
+# Plan on vertex
+d: BuildBloom Rearrange[tuple]{bytearray}(false) - scope-30	->	[ scope-49, scope-50]
+|   |
+|   Project[bytearray][0] - scope-31
+|
+|---c: New For Each(false,false)[bag] - scope-20
+    |   |
+    |   Project[bytearray][0] - scope-15
+    |   |
+    |   Cast[int] - scope-18
+    |   |
+    |   |---Project[bytearray][1] - scope-17
+    |
+    |---c: Load(file:///tmp/input2:org.apache.pig.builtin.PigStorage) - scope-14
+Tez vertex scope-50
+# Combine plan on edge <scope-48>
+Local Rearrange[tuple]{int}(false) - scope-55	->	 scope-50
+|   |
+|   Project[int][0] - scope-54
+|
+|---Package(BloomPackager)[tuple]{int} - scope-53
+# Plan on vertex
+POValueOutputTez - scope-52	->	 [scope-46, scope-47]
+|
+|---Package(BloomPackager)[tuple]{int} - scope-51
+Tez vertex scope-46
+# Plan on vertex
+d: BloomFilter Rearrange[tuple]{bytearray}(false) - scope-26	<-	 scope-50	->	 scope-49
+|   |
+|   Project[bytearray][0] - scope-27
+|
+|---b: New For Each(false,false)[bag] - scope-6
+    |   |
+    |   Project[bytearray][0] - scope-1
+    |   |
+    |   Cast[int] - scope-4
+    |   |
+    |   |---Project[bytearray][1] - scope-3
+    |
+    |---b: Load(file:///tmp/input2:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-47
+# Plan on vertex
+d: BloomFilter Rearrange[tuple]{bytearray}(false) - scope-28	<-	 scope-50	->	 scope-49
+|   |
+|   Project[bytearray][0] - scope-29
+|
+|---a: New For Each(false,false)[bag] - scope-13
+    |   |
+    |   Project[bytearray][0] - scope-8
+    |   |
+    |   Cast[int] - scope-11
+    |   |
+    |   |---Project[bytearray][1] - scope-10
+    |
+    |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-7
+Tez vertex scope-49
+# Plan on vertex
+e: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-45
+|
+|---e: New For Each(false,false,false,false)[bag] - scope-44
+    |   |
+    |   Project[bytearray][2] - scope-36
+    |   |
+    |   Project[int][3] - scope-38
+    |   |
+    |   Project[int][1] - scope-40
+    |   |
+    |   Project[int][5] - scope-42
+    |
+    |---d: New For Each(true,true,true)[tuple] - scope-35
+        |   |
+        |   Project[bag][1] - scope-32
+        |   |
+        |   Project[bag][2] - scope-33
+        |   |
+        |   Project[bag][3] - scope-34
+        |
+        |---d: Package(Packager)[tuple]{bytearray} - scope-25
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-2-KeyToReducer.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-2-KeyToReducer.gld
new file mode 100644
index 0000000..de14a55
--- /dev/null
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-2-KeyToReducer.gld
@@ -0,0 +1,83 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-39	->	Tez vertex scope-41,Tez vertex scope-42,
+Tez vertex scope-42	->	Tez vertex scope-40,
+Tez vertex scope-40	->	Tez vertex scope-41,
+Tez vertex scope-41
+
+Tez vertex scope-39
+# Plan on vertex
+d: BuildBloom Rearrange[tuple]{chararray}(false) - scope-20	->	[ scope-41, scope-42]
+|   |
+|   Project[chararray][0] - scope-21
+|
+|---a: New For Each(false,false)[bag] - scope-7
+    |   |
+    |   Cast[chararray] - scope-2
+    |   |
+    |   |---Project[bytearray][0] - scope-1
+    |   |
+    |   Cast[int] - scope-5
+    |   |
+    |   |---Project[bytearray][1] - scope-4
+    |
+    |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-42
+# Combine plan on edge <scope-39>
+Local Rearrange[tuple]{int}(false) - scope-47	->	 scope-42
+|   |
+|   Project[int][0] - scope-46
+|
+|---Package(BloomPackager)[tuple]{int} - scope-45
+# Plan on vertex
+POValueOutputTez - scope-44	->	 [scope-40]
+|
+|---Package(BloomPackager)[tuple]{int} - scope-43
+Tez vertex scope-40
+# Plan on vertex
+d: BloomFilter Rearrange[tuple]{chararray}(false) - scope-22	<-	 scope-42	->	 scope-41
+|   |
+|   Project[chararray][0] - scope-23
+|
+|---b: New For Each(false,false)[bag] - scope-15
+    |   |
+    |   Cast[chararray] - scope-10
+    |   |
+    |   |---Project[bytearray][0] - scope-9
+    |   |
+    |   Cast[int] - scope-13
+    |   |
+    |   |---Project[bytearray][1] - scope-12
+    |
+    |---b: Load(file:///tmp/input2:org.apache.pig.builtin.PigStorage) - scope-8
+Tez vertex scope-41
+# Plan on vertex
+e: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-38
+|
+|---e: New For Each(false,false,false)[bag] - scope-37
+    |   |
+    |   Project[chararray][0] - scope-31
+    |   |
+    |   Project[int][1] - scope-33
+    |   |
+    |   Project[int][3] - scope-35
+    |
+    |---d: New For Each(true,true)[tuple] - scope-30
+        |   |
+        |   Project[bag][1] - scope-24
+        |   |
+        |   POBinCond[bag] - scope-29
+        |   |
+        |   |---Project[bag][2] - scope-25
+        |   |
+        |   |---POUserFunc(org.apache.pig.builtin.IsEmpty)[boolean] - scope-27
+        |   |   |
+        |   |   |---Project[bag][2] - scope-26
+        |   |
+        |   |---Constant({(,)}) - scope-28
+        |
+        |---d: Package(Packager)[tuple]{chararray} - scope-19
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-2.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-2.gld
new file mode 100644
index 0000000..de14a55
--- /dev/null
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-2.gld
@@ -0,0 +1,83 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-39	->	Tez vertex scope-41,Tez vertex scope-42,
+Tez vertex scope-42	->	Tez vertex scope-40,
+Tez vertex scope-40	->	Tez vertex scope-41,
+Tez vertex scope-41
+
+Tez vertex scope-39
+# Plan on vertex
+d: BuildBloom Rearrange[tuple]{chararray}(false) - scope-20	->	[ scope-41, scope-42]
+|   |
+|   Project[chararray][0] - scope-21
+|
+|---a: New For Each(false,false)[bag] - scope-7
+    |   |
+    |   Cast[chararray] - scope-2
+    |   |
+    |   |---Project[bytearray][0] - scope-1
+    |   |
+    |   Cast[int] - scope-5
+    |   |
+    |   |---Project[bytearray][1] - scope-4
+    |
+    |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-42
+# Combine plan on edge <scope-39>
+Local Rearrange[tuple]{int}(false) - scope-47	->	 scope-42
+|   |
+|   Project[int][0] - scope-46
+|
+|---Package(BloomPackager)[tuple]{int} - scope-45
+# Plan on vertex
+POValueOutputTez - scope-44	->	 [scope-40]
+|
+|---Package(BloomPackager)[tuple]{int} - scope-43
+Tez vertex scope-40
+# Plan on vertex
+d: BloomFilter Rearrange[tuple]{chararray}(false) - scope-22	<-	 scope-42	->	 scope-41
+|   |
+|   Project[chararray][0] - scope-23
+|
+|---b: New For Each(false,false)[bag] - scope-15
+    |   |
+    |   Cast[chararray] - scope-10
+    |   |
+    |   |---Project[bytearray][0] - scope-9
+    |   |
+    |   Cast[int] - scope-13
+    |   |
+    |   |---Project[bytearray][1] - scope-12
+    |
+    |---b: Load(file:///tmp/input2:org.apache.pig.builtin.PigStorage) - scope-8
+Tez vertex scope-41
+# Plan on vertex
+e: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-38
+|
+|---e: New For Each(false,false,false)[bag] - scope-37
+    |   |
+    |   Project[chararray][0] - scope-31
+    |   |
+    |   Project[int][1] - scope-33
+    |   |
+    |   Project[int][3] - scope-35
+    |
+    |---d: New For Each(true,true)[tuple] - scope-30
+        |   |
+        |   Project[bag][1] - scope-24
+        |   |
+        |   POBinCond[bag] - scope-29
+        |   |
+        |   |---Project[bag][2] - scope-25
+        |   |
+        |   |---POUserFunc(org.apache.pig.builtin.IsEmpty)[boolean] - scope-27
+        |   |   |
+        |   |   |---Project[bag][2] - scope-26
+        |   |
+        |   |---Constant({(,)}) - scope-28
+        |
+        |---d: Package(Packager)[tuple]{chararray} - scope-19
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-3-KeyToReducer.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-3-KeyToReducer.gld
new file mode 100644
index 0000000..72a9b16
--- /dev/null
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-3-KeyToReducer.gld
@@ -0,0 +1,105 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-45	->	Tez vertex group scope-58,Tez vertex group scope-59,
+Tez vertex scope-46	->	Tez vertex group scope-58,Tez vertex group scope-59,
+Tez vertex group scope-59	->	Tez vertex scope-52,
+Tez vertex scope-52	->	Tez vertex scope-44,
+Tez vertex scope-44	->	Tez vertex scope-51,
+Tez vertex group scope-58	->	Tez vertex scope-51,
+Tez vertex scope-51
+
+Tez vertex scope-45
+# Plan on vertex
+d: BuildBloom Rearrange[tuple]{int}(false) - scope-60	->	[ scope-51, scope-52]
+|   |
+|   Project[int][0] - scope-61
+|
+|---b: New For Each(false,false)[bag] - scope-15
+    |   |
+    |   Cast[int] - scope-10
+    |   |
+    |   |---Project[bytearray][0] - scope-9
+    |   |
+    |   Cast[int] - scope-13
+    |   |
+    |   |---Project[bytearray][1] - scope-12
+    |
+    |---b: Load(file:///tmp/input2:org.apache.pig.builtin.PigStorage) - scope-8
+Tez vertex scope-46
+# Plan on vertex
+d: BuildBloom Rearrange[tuple]{int}(false) - scope-62	->	[ scope-51, scope-52]
+|   |
+|   Project[int][0] - scope-63
+|
+|---c: New For Each(false,false)[bag] - scope-23
+    |   |
+    |   Cast[int] - scope-18
+    |   |
+    |   |---Project[bytearray][0] - scope-17
+    |   |
+    |   Cast[int] - scope-21
+    |   |
+    |   |---Project[bytearray][1] - scope-20
+    |
+    |---c: Load(file:///tmp/input3:org.apache.pig.builtin.PigStorage) - scope-16
+Tez vertex group scope-59	<-	 [scope-45, scope-46]	->	 scope-52
+# No plan on vertex group
+Tez vertex scope-52
+# Combine plan on edge <scope-45>
+Local Rearrange[tuple]{int}(false) - scope-57	->	 scope-52
+|   |
+|   Project[int][0] - scope-56
+|
+|---Package(BloomPackager)[tuple]{int} - scope-55
+# Combine plan on edge <scope-46>
+Local Rearrange[tuple]{int}(false) - scope-57	->	 scope-52
+|   |
+|   Project[int][0] - scope-56
+|
+|---Package(BloomPackager)[tuple]{int} - scope-55
+# Plan on vertex
+POValueOutputTez - scope-54	->	 [scope-44]
+|
+|---Package(BloomPackager)[tuple]{int} - scope-53
+Tez vertex scope-44
+# Plan on vertex
+d: BloomFilter Rearrange[tuple]{int}(false) - scope-29	<-	 scope-52	->	 scope-51
+|   |
+|   Project[int][0] - scope-30
+|
+|---a: New For Each(false,false)[bag] - scope-7
+    |   |
+    |   Cast[int] - scope-2
+    |   |
+    |   |---Project[bytearray][0] - scope-1
+    |   |
+    |   Cast[int] - scope-5
+    |   |
+    |   |---Project[bytearray][1] - scope-4
+    |
+    |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex group scope-58	<-	 [scope-45, scope-46]	->	 scope-51
+# No plan on vertex group
+Tez vertex scope-51
+# Plan on vertex
+e: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-43
+|
+|---e: New For Each(false,false,false)[bag] - scope-42
+    |   |
+    |   Project[int][0] - scope-36
+    |   |
+    |   Project[int][1] - scope-38
+    |   |
+    |   Project[int][3] - scope-40
+    |
+    |---d: New For Each(true,true)[tuple] - scope-35
+        |   |
+        |   Project[bag][1] - scope-33
+        |   |
+        |   Project[bag][2] - scope-34
+        |
+        |---d: Package(Packager)[tuple]{int} - scope-28
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-3.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-3.gld
new file mode 100644
index 0000000..72a9b16
--- /dev/null
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-3.gld
@@ -0,0 +1,105 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-45	->	Tez vertex group scope-58,Tez vertex group scope-59,
+Tez vertex scope-46	->	Tez vertex group scope-58,Tez vertex group scope-59,
+Tez vertex group scope-59	->	Tez vertex scope-52,
+Tez vertex scope-52	->	Tez vertex scope-44,
+Tez vertex scope-44	->	Tez vertex scope-51,
+Tez vertex group scope-58	->	Tez vertex scope-51,
+Tez vertex scope-51
+
+Tez vertex scope-45
+# Plan on vertex
+d: BuildBloom Rearrange[tuple]{int}(false) - scope-60	->	[ scope-51, scope-52]
+|   |
+|   Project[int][0] - scope-61
+|
+|---b: New For Each(false,false)[bag] - scope-15
+    |   |
+    |   Cast[int] - scope-10
+    |   |
+    |   |---Project[bytearray][0] - scope-9
+    |   |
+    |   Cast[int] - scope-13
+    |   |
+    |   |---Project[bytearray][1] - scope-12
+    |
+    |---b: Load(file:///tmp/input2:org.apache.pig.builtin.PigStorage) - scope-8
+Tez vertex scope-46
+# Plan on vertex
+d: BuildBloom Rearrange[tuple]{int}(false) - scope-62	->	[ scope-51, scope-52]
+|   |
+|   Project[int][0] - scope-63
+|
+|---c: New For Each(false,false)[bag] - scope-23
+    |   |
+    |   Cast[int] - scope-18
+    |   |
+    |   |---Project[bytearray][0] - scope-17
+    |   |
+    |   Cast[int] - scope-21
+    |   |
+    |   |---Project[bytearray][1] - scope-20
+    |
+    |---c: Load(file:///tmp/input3:org.apache.pig.builtin.PigStorage) - scope-16
+Tez vertex group scope-59	<-	 [scope-45, scope-46]	->	 scope-52
+# No plan on vertex group
+Tez vertex scope-52
+# Combine plan on edge <scope-45>
+Local Rearrange[tuple]{int}(false) - scope-57	->	 scope-52
+|   |
+|   Project[int][0] - scope-56
+|
+|---Package(BloomPackager)[tuple]{int} - scope-55
+# Combine plan on edge <scope-46>
+Local Rearrange[tuple]{int}(false) - scope-57	->	 scope-52
+|   |
+|   Project[int][0] - scope-56
+|
+|---Package(BloomPackager)[tuple]{int} - scope-55
+# Plan on vertex
+POValueOutputTez - scope-54	->	 [scope-44]
+|
+|---Package(BloomPackager)[tuple]{int} - scope-53
+Tez vertex scope-44
+# Plan on vertex
+d: BloomFilter Rearrange[tuple]{int}(false) - scope-29	<-	 scope-52	->	 scope-51
+|   |
+|   Project[int][0] - scope-30
+|
+|---a: New For Each(false,false)[bag] - scope-7
+    |   |
+    |   Cast[int] - scope-2
+    |   |
+    |   |---Project[bytearray][0] - scope-1
+    |   |
+    |   Cast[int] - scope-5
+    |   |
+    |   |---Project[bytearray][1] - scope-4
+    |
+    |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex group scope-58	<-	 [scope-45, scope-46]	->	 scope-51
+# No plan on vertex group
+Tez vertex scope-51
+# Plan on vertex
+e: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-43
+|
+|---e: New For Each(false,false,false)[bag] - scope-42
+    |   |
+    |   Project[int][0] - scope-36
+    |   |
+    |   Project[int][1] - scope-38
+    |   |
+    |   Project[int][3] - scope-40
+    |
+    |---d: New For Each(true,true)[tuple] - scope-35
+        |   |
+        |   Project[bag][1] - scope-33
+        |   |
+        |   Project[bag][2] - scope-34
+        |
+        |---d: Package(Packager)[tuple]{int} - scope-28
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-4-KeyToReducer.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-4-KeyToReducer.gld
new file mode 100644
index 0000000..928b9a5
--- /dev/null
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-4-KeyToReducer.gld
@@ -0,0 +1,97 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-44	->	Tez vertex scope-46,
+Tez vertex scope-45	->	Tez vertex scope-46,
+Tez vertex scope-50	->	Tez vertex scope-51,Tez vertex scope-52,
+Tez vertex scope-52	->	Tez vertex scope-46,
+Tez vertex scope-46	->	Tez vertex scope-51,
+Tez vertex scope-51
+
+Tez vertex scope-44
+# Plan on vertex
+POValueOutputTez - scope-48	->	 [scope-46]
+|
+|---b: New For Each(false,false)[bag] - scope-7
+    |   |
+    |   Cast[int] - scope-2
+    |   |
+    |   |---Project[bytearray][0] - scope-1
+    |   |
+    |   Cast[int] - scope-5
+    |   |
+    |   |---Project[bytearray][1] - scope-4
+    |
+    |---b: Load(file:///tmp/input2:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-45
+# Plan on vertex
+POValueOutputTez - scope-49	->	 [scope-46]
+|
+|---c: New For Each(false,false)[bag] - scope-15
+    |   |
+    |   Cast[int] - scope-10
+    |   |
+    |   |---Project[bytearray][0] - scope-9
+    |   |
+    |   Cast[int] - scope-13
+    |   |
+    |   |---Project[bytearray][1] - scope-12
+    |
+    |---c: Load(file:///tmp/input3:org.apache.pig.builtin.PigStorage) - scope-8
+Tez vertex scope-50
+# Plan on vertex
+d: BuildBloom Rearrange[tuple]{int}(false) - scope-31	->	[ scope-51, scope-52]
+|   |
+|   Project[int][0] - scope-32
+|
+|---a: New For Each(false,false)[bag] - scope-24
+    |   |
+    |   Cast[int] - scope-19
+    |   |
+    |   |---Project[bytearray][0] - scope-18
+    |   |
+    |   Cast[int] - scope-22
+    |   |
+    |   |---Project[bytearray][1] - scope-21
+    |
+    |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-17
+Tez vertex scope-52
+# Combine plan on edge <scope-50>
+Local Rearrange[tuple]{int}(false) - scope-57	->	 scope-52
+|   |
+|   Project[int][0] - scope-56
+|
+|---Package(BloomPackager)[tuple]{int} - scope-55
+# Plan on vertex
+POValueOutputTez - scope-54	->	 [scope-46]
+|
+|---Package(BloomPackager)[tuple]{int} - scope-53
+Tez vertex scope-46
+# Plan on vertex
+d: BloomFilter Rearrange[tuple]{int}(false) - scope-29	<-	 scope-52	->	 scope-51
+|   |
+|   Project[int][0] - scope-30
+|
+|---POShuffledValueInputTez - scope-47	<-	 [scope-44, scope-45]
+Tez vertex scope-51
+# Plan on vertex
+e: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-43
+|
+|---e: New For Each(false,false,false)[bag] - scope-42
+    |   |
+    |   Project[int][2] - scope-36
+    |   |
+    |   Project[int][3] - scope-38
+    |   |
+    |   Project[int][1] - scope-40
+    |
+    |---d: New For Each(true,true)[tuple] - scope-35
+        |   |
+        |   Project[bag][1] - scope-33
+        |   |
+        |   Project[bag][2] - scope-34
+        |
+        |---d: Package(Packager)[tuple]{int} - scope-28
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-4.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-4.gld
new file mode 100644
index 0000000..928b9a5
--- /dev/null
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-4.gld
@@ -0,0 +1,97 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-44	->	Tez vertex scope-46,
+Tez vertex scope-45	->	Tez vertex scope-46,
+Tez vertex scope-50	->	Tez vertex scope-51,Tez vertex scope-52,
+Tez vertex scope-52	->	Tez vertex scope-46,
+Tez vertex scope-46	->	Tez vertex scope-51,
+Tez vertex scope-51
+
+Tez vertex scope-44
+# Plan on vertex
+POValueOutputTez - scope-48	->	 [scope-46]
+|
+|---b: New For Each(false,false)[bag] - scope-7
+    |   |
+    |   Cast[int] - scope-2
+    |   |
+    |   |---Project[bytearray][0] - scope-1
+    |   |
+    |   Cast[int] - scope-5
+    |   |
+    |   |---Project[bytearray][1] - scope-4
+    |
+    |---b: Load(file:///tmp/input2:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-45
+# Plan on vertex
+POValueOutputTez - scope-49	->	 [scope-46]
+|
+|---c: New For Each(false,false)[bag] - scope-15
+    |   |
+    |   Cast[int] - scope-10
+    |   |
+    |   |---Project[bytearray][0] - scope-9
+    |   |
+    |   Cast[int] - scope-13
+    |   |
+    |   |---Project[bytearray][1] - scope-12
+    |
+    |---c: Load(file:///tmp/input3:org.apache.pig.builtin.PigStorage) - scope-8
+Tez vertex scope-50
+# Plan on vertex
+d: BuildBloom Rearrange[tuple]{int}(false) - scope-31	->	[ scope-51, scope-52]
+|   |
+|   Project[int][0] - scope-32
+|
+|---a: New For Each(false,false)[bag] - scope-24
+    |   |
+    |   Cast[int] - scope-19
+    |   |
+    |   |---Project[bytearray][0] - scope-18
+    |   |
+    |   Cast[int] - scope-22
+    |   |
+    |   |---Project[bytearray][1] - scope-21
+    |
+    |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-17
+Tez vertex scope-52
+# Combine plan on edge <scope-50>
+Local Rearrange[tuple]{int}(false) - scope-57	->	 scope-52
+|   |
+|   Project[int][0] - scope-56
+|
+|---Package(BloomPackager)[tuple]{int} - scope-55
+# Plan on vertex
+POValueOutputTez - scope-54	->	 [scope-46]
+|
+|---Package(BloomPackager)[tuple]{int} - scope-53
+Tez vertex scope-46
+# Plan on vertex
+d: BloomFilter Rearrange[tuple]{int}(false) - scope-29	<-	 scope-52	->	 scope-51
+|   |
+|   Project[int][0] - scope-30
+|
+|---POShuffledValueInputTez - scope-47	<-	 [scope-44, scope-45]
+Tez vertex scope-51
+# Plan on vertex
+e: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-43
+|
+|---e: New For Each(false,false,false)[bag] - scope-42
+    |   |
+    |   Project[int][2] - scope-36
+    |   |
+    |   Project[int][3] - scope-38
+    |   |
+    |   Project[int][1] - scope-40
+    |
+    |---d: New For Each(true,true)[tuple] - scope-35
+        |   |
+        |   Project[bag][1] - scope-33
+        |   |
+        |   Project[bag][2] - scope-34
+        |
+        |---d: Package(Packager)[tuple]{int} - scope-28
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-5-KeyToReducer.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-5-KeyToReducer.gld
new file mode 100644
index 0000000..a426025
--- /dev/null
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-5-KeyToReducer.gld
@@ -0,0 +1,107 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-60	->	Tez vertex scope-61,Tez vertex scope-62,
+Tez vertex scope-62	->	Tez vertex scope-54,Tez vertex scope-58,
+Tez vertex scope-54	->	Tez vertex scope-58,Tez vertex scope-61,
+Tez vertex scope-58	->	Tez vertex scope-61,
+Tez vertex scope-61
+
+Tez vertex scope-60
+# Plan on vertex
+d: BuildBloom Rearrange[tuple]{int}(false) - scope-38	->	[ scope-61, scope-62]
+|   |
+|   Project[int][0] - scope-39
+|
+|---b: New For Each(false,false)[bag] - scope-28
+    |   |
+    |   Cast[int] - scope-23
+    |   |
+    |   |---Project[bytearray][0] - scope-22
+    |   |
+    |   Cast[int] - scope-26
+    |   |
+    |   |---Project[bytearray][1] - scope-25
+    |
+    |---b: Load(file:///tmp/input2:org.apache.pig.builtin.PigStorage) - scope-21
+Tez vertex scope-62
+# Combine plan on edge <scope-60>
+Local Rearrange[tuple]{int}(false) - scope-67	->	 scope-62
+|   |
+|   Project[int][0] - scope-66
+|
+|---Package(BloomPackager)[tuple]{int} - scope-65
+# Plan on vertex
+POValueOutputTez - scope-64	->	 [scope-54, scope-58]
+|
+|---Package(BloomPackager)[tuple]{int} - scope-63
+Tez vertex scope-54
+# Plan on vertex
+a: Split - scope-68
+|   |
+|   d: BloomFilter Rearrange[tuple]{int}(false) - scope-34	<-	 scope-62	->	 scope-61
+|   |   |
+|   |   Project[int][0] - scope-35
+|   |
+|   |---a1: Filter[bag] - scope-11
+|       |   |
+|       |   Equal To[boolean] - scope-14
+|       |   |
+|       |   |---Project[int][0] - scope-12
+|       |   |
+|       |   |---Constant(3) - scope-13
+|   |
+|   POValueOutputTez - scope-55	->	 [scope-58]
+|
+|---a: New For Each(false,false)[bag] - scope-7
+    |   |
+    |   Cast[int] - scope-2
+    |   |
+    |   |---Project[bytearray][0] - scope-1
+    |   |
+    |   Cast[int] - scope-5
+    |   |
+    |   |---Project[bytearray][1] - scope-4
+    |
+    |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-58
+# Plan on vertex
+d: BloomFilter Rearrange[tuple]{int}(false) - scope-36	<-	 scope-62	->	 scope-61
+|   |
+|   Project[int][0] - scope-37
+|
+|---a2: Filter[bag] - scope-17
+    |   |
+    |   Equal To[boolean] - scope-20
+    |   |
+    |   |---Project[int][0] - scope-18
+    |   |
+    |   |---Constant(4) - scope-19
+    |
+    |---POValueInputTez - scope-59	<-	 scope-54
+Tez vertex scope-61
+# Plan on vertex
+e: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-53
+|
+|---e: New For Each(false,false,false,false)[bag] - scope-52
+    |   |
+    |   Project[int][0] - scope-44
+    |   |
+    |   Project[int][1] - scope-46
+    |   |
+    |   Project[int][3] - scope-48
+    |   |
+    |   Project[int][5] - scope-50
+    |
+    |---d: New For Each(true,true,true)[tuple] - scope-43
+        |   |
+        |   Project[bag][1] - scope-40
+        |   |
+        |   Project[bag][2] - scope-41
+        |   |
+        |   Project[bag][3] - scope-42
+        |
+        |---d: Package(Packager)[tuple]{int} - scope-33
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-5.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-5.gld
new file mode 100644
index 0000000..a426025
--- /dev/null
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-5.gld
@@ -0,0 +1,107 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-60	->	Tez vertex scope-61,Tez vertex scope-62,
+Tez vertex scope-62	->	Tez vertex scope-54,Tez vertex scope-58,
+Tez vertex scope-54	->	Tez vertex scope-58,Tez vertex scope-61,
+Tez vertex scope-58	->	Tez vertex scope-61,
+Tez vertex scope-61
+
+Tez vertex scope-60
+# Plan on vertex
+d: BuildBloom Rearrange[tuple]{int}(false) - scope-38	->	[ scope-61, scope-62]
+|   |
+|   Project[int][0] - scope-39
+|
+|---b: New For Each(false,false)[bag] - scope-28
+    |   |
+    |   Cast[int] - scope-23
+    |   |
+    |   |---Project[bytearray][0] - scope-22
+    |   |
+    |   Cast[int] - scope-26
+    |   |
+    |   |---Project[bytearray][1] - scope-25
+    |
+    |---b: Load(file:///tmp/input2:org.apache.pig.builtin.PigStorage) - scope-21
+Tez vertex scope-62
+# Combine plan on edge <scope-60>
+Local Rearrange[tuple]{int}(false) - scope-67	->	 scope-62
+|   |
+|   Project[int][0] - scope-66
+|
+|---Package(BloomPackager)[tuple]{int} - scope-65
+# Plan on vertex
+POValueOutputTez - scope-64	->	 [scope-54, scope-58]
+|
+|---Package(BloomPackager)[tuple]{int} - scope-63
+Tez vertex scope-54
+# Plan on vertex
+a: Split - scope-68
+|   |
+|   d: BloomFilter Rearrange[tuple]{int}(false) - scope-34	<-	 scope-62	->	 scope-61
+|   |   |
+|   |   Project[int][0] - scope-35
+|   |
+|   |---a1: Filter[bag] - scope-11
+|       |   |
+|       |   Equal To[boolean] - scope-14
+|       |   |
+|       |   |---Project[int][0] - scope-12
+|       |   |
+|       |   |---Constant(3) - scope-13
+|   |
+|   POValueOutputTez - scope-55	->	 [scope-58]
+|
+|---a: New For Each(false,false)[bag] - scope-7
+    |   |
+    |   Cast[int] - scope-2
+    |   |
+    |   |---Project[bytearray][0] - scope-1
+    |   |
+    |   Cast[int] - scope-5
+    |   |
+    |   |---Project[bytearray][1] - scope-4
+    |
+    |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-58
+# Plan on vertex
+d: BloomFilter Rearrange[tuple]{int}(false) - scope-36	<-	 scope-62	->	 scope-61
+|   |
+|   Project[int][0] - scope-37
+|
+|---a2: Filter[bag] - scope-17
+    |   |
+    |   Equal To[boolean] - scope-20
+    |   |
+    |   |---Project[int][0] - scope-18
+    |   |
+    |   |---Constant(4) - scope-19
+    |
+    |---POValueInputTez - scope-59	<-	 scope-54
+Tez vertex scope-61
+# Plan on vertex
+e: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-53
+|
+|---e: New For Each(false,false,false,false)[bag] - scope-52
+    |   |
+    |   Project[int][0] - scope-44
+    |   |
+    |   Project[int][1] - scope-46
+    |   |
+    |   Project[int][3] - scope-48
+    |   |
+    |   Project[int][5] - scope-50
+    |
+    |---d: New For Each(true,true,true)[tuple] - scope-43
+        |   |
+        |   Project[bag][1] - scope-40
+        |   |
+        |   Project[bag][2] - scope-41
+        |   |
+        |   Project[bag][3] - scope-42
+        |
+        |---d: Package(Packager)[tuple]{int} - scope-33
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-6-KeyToReducer.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-6-KeyToReducer.gld
new file mode 100644
index 0000000..c5a2000
--- /dev/null
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-6-KeyToReducer.gld
@@ -0,0 +1,95 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-49	->	Tez vertex scope-56,Tez vertex scope-57,
+Tez vertex scope-57	->	Tez vertex scope-53,
+Tez vertex scope-53	->	Tez vertex scope-56,
+Tez vertex scope-56
+
+Tez vertex scope-49
+# Plan on vertex
+a: Split - scope-63
+|   |
+|   a2: Store(file:///tmp/pigoutput/a2:org.apache.pig.builtin.PigStorage) - scope-15
+|   |
+|   |---a2: Filter[bag] - scope-11
+|       |   |
+|       |   Equal To[boolean] - scope-14
+|       |   |
+|       |   |---Project[int][0] - scope-12
+|       |   |
+|       |   |---Constant(4) - scope-13
+|   |
+|   d: BuildBloom Rearrange[tuple]{int}(false) - scope-36	->	[ scope-56, scope-57]
+|   |   |
+|   |   Project[int][0] - scope-37
+|   |
+|   |---a1: Filter[bag] - scope-26
+|       |   |
+|       |   Equal To[boolean] - scope-29
+|       |   |
+|       |   |---Project[int][0] - scope-27
+|       |   |
+|       |   |---Constant(3) - scope-28
+|
+|---a: New For Each(false,false)[bag] - scope-7
+    |   |
+    |   Cast[int] - scope-2
+    |   |
+    |   |---Project[bytearray][0] - scope-1
+    |   |
+    |   Cast[int] - scope-5
+    |   |
+    |   |---Project[bytearray][1] - scope-4
+    |
+    |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-57
+# Combine plan on edge <scope-49>
+Local Rearrange[tuple]{int}(false) - scope-62	->	 scope-57
+|   |
+|   Project[int][0] - scope-61
+|
+|---Package(BloomPackager)[tuple]{int} - scope-60
+# Plan on vertex
+POValueOutputTez - scope-59	->	 [scope-53]
+|
+|---Package(BloomPackager)[tuple]{int} - scope-58
+Tez vertex scope-53
+# Plan on vertex
+d: BloomFilter Rearrange[tuple]{int}(false) - scope-34	<-	 scope-57	->	 scope-56
+|   |
+|   Project[int][0] - scope-35
+|
+|---b: New For Each(false,false)[bag] - scope-23
+    |   |
+    |   Cast[int] - scope-18
+    |   |
+    |   |---Project[bytearray][0] - scope-17
+    |   |
+    |   Cast[int] - scope-21
+    |   |
+    |   |---Project[bytearray][1] - scope-20
+    |
+    |---b: Load(file:///tmp/input2:org.apache.pig.builtin.PigStorage) - scope-16
+Tez vertex scope-56
+# Plan on vertex
+e: Store(file:///tmp/pigoutput/e:org.apache.pig.builtin.PigStorage) - scope-48
+|
+|---e: New For Each(false,false,false)[bag] - scope-47
+    |   |
+    |   Project[int][2] - scope-41
+    |   |
+    |   Project[int][3] - scope-43
+    |   |
+    |   Project[int][1] - scope-45
+    |
+    |---d: New For Each(true,true)[tuple] - scope-40
+        |   |
+        |   Project[bag][1] - scope-38
+        |   |
+        |   Project[bag][2] - scope-39
+        |
+        |---d: Package(Packager)[tuple]{int} - scope-33
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-6.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-6.gld
new file mode 100644
index 0000000..c5a2000
--- /dev/null
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-6.gld
@@ -0,0 +1,95 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-49	->	Tez vertex scope-56,Tez vertex scope-57,
+Tez vertex scope-57	->	Tez vertex scope-53,
+Tez vertex scope-53	->	Tez vertex scope-56,
+Tez vertex scope-56
+
+Tez vertex scope-49
+# Plan on vertex
+a: Split - scope-63
+|   |
+|   a2: Store(file:///tmp/pigoutput/a2:org.apache.pig.builtin.PigStorage) - scope-15
+|   |
+|   |---a2: Filter[bag] - scope-11
+|       |   |
+|       |   Equal To[boolean] - scope-14
+|       |   |
+|       |   |---Project[int][0] - scope-12
+|       |   |
+|       |   |---Constant(4) - scope-13
+|   |
+|   d: BuildBloom Rearrange[tuple]{int}(false) - scope-36	->	[ scope-56, scope-57]
+|   |   |
+|   |   Project[int][0] - scope-37
+|   |
+|   |---a1: Filter[bag] - scope-26
+|       |   |
+|       |   Equal To[boolean] - scope-29
+|       |   |
+|       |   |---Project[int][0] - scope-27
+|       |   |
+|       |   |---Constant(3) - scope-28
+|
+|---a: New For Each(false,false)[bag] - scope-7
+    |   |
+    |   Cast[int] - scope-2
+    |   |
+    |   |---Project[bytearray][0] - scope-1
+    |   |
+    |   Cast[int] - scope-5
+    |   |
+    |   |---Project[bytearray][1] - scope-4
+    |
+    |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-57
+# Combine plan on edge <scope-49>
+Local Rearrange[tuple]{int}(false) - scope-62	->	 scope-57
+|   |
+|   Project[int][0] - scope-61
+|
+|---Package(BloomPackager)[tuple]{int} - scope-60
+# Plan on vertex
+POValueOutputTez - scope-59	->	 [scope-53]
+|
+|---Package(BloomPackager)[tuple]{int} - scope-58
+Tez vertex scope-53
+# Plan on vertex
+d: BloomFilter Rearrange[tuple]{int}(false) - scope-34	<-	 scope-57	->	 scope-56
+|   |
+|   Project[int][0] - scope-35
+|
+|---b: New For Each(false,false)[bag] - scope-23
+    |   |
+    |   Cast[int] - scope-18
+    |   |
+    |   |---Project[bytearray][0] - scope-17
+    |   |
+    |   Cast[int] - scope-21
+    |   |
+    |   |---Project[bytearray][1] - scope-20
+    |
+    |---b: Load(file:///tmp/input2:org.apache.pig.builtin.PigStorage) - scope-16
+Tez vertex scope-56
+# Plan on vertex
+e: Store(file:///tmp/pigoutput/e:org.apache.pig.builtin.PigStorage) - scope-48
+|
+|---e: New For Each(false,false,false)[bag] - scope-47
+    |   |
+    |   Project[int][2] - scope-41
+    |   |
+    |   Project[int][3] - scope-43
+    |   |
+    |   Project[int][1] - scope-45
+    |
+    |---d: New For Each(true,true)[tuple] - scope-40
+        |   |
+        |   Project[bag][1] - scope-38
+        |   |
+        |   Project[bag][2] - scope-39
+        |
+        |---d: Package(Packager)[tuple]{int} - scope-33
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-7-KeyToReducer.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-7-KeyToReducer.gld
new file mode 100644
index 0000000..1acbb13
--- /dev/null
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-7-KeyToReducer.gld
@@ -0,0 +1,95 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-43	->	Tez vertex scope-45,Tez vertex scope-47,Tez vertex scope-51,Tez vertex scope-52,
+Tez vertex scope-52	->	Tez vertex scope-45,Tez vertex scope-47,
+Tez vertex scope-45	->	Tez vertex scope-51,
+Tez vertex scope-47	->	Tez vertex scope-51,
+Tez vertex scope-51
+
+Tez vertex scope-43
+# Plan on vertex
+a: Split - scope-58
+|   |
+|   e: BuildBloom Rearrange[tuple]{int}(false) - scope-36	->	[ scope-51, scope-52]
+|   |   |
+|   |   Project[int][0] - scope-37
+|   |
+|   |---d: Filter[bag] - scope-23
+|       |   |
+|       |   Greater Than[boolean] - scope-26
+|       |   |
+|       |   |---Project[int][0] - scope-24
+|       |   |
+|       |   |---Constant(10) - scope-25
+|   |
+|   POValueOutputTez - scope-44	->	 [scope-45, scope-47]
+|
+|---a: New For Each(false,false)[bag] - scope-7
+    |   |
+    |   Cast[int] - scope-2
+    |   |
+    |   |---Project[bytearray][0] - scope-1
+    |   |
+    |   Cast[int] - scope-5
+    |   |
+    |   |---Project[bytearray][1] - scope-4
+    |
+    |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-52
+# Combine plan on edge <scope-43>
+Local Rearrange[tuple]{int}(false) - scope-57	->	 scope-52
+|   |
+|   Project[int][0] - scope-56
+|
+|---Package(BloomPackager)[tuple]{int} - scope-55
+# Plan on vertex
+POValueOutputTez - scope-54	->	 [scope-45, scope-47]
+|
+|---Package(BloomPackager)[tuple]{int} - scope-53
+Tez vertex scope-45
+# Plan on vertex
+e: BloomFilter Rearrange[tuple]{int}(false) - scope-32	<-	 scope-52	->	 scope-51
+|   |
+|   Project[int][0] - scope-33
+|
+|---b: Filter[bag] - scope-11
+    |   |
+    |   Less Than[boolean] - scope-14
+    |   |
+    |   |---Project[int][0] - scope-12
+    |   |
+    |   |---Constant(5) - scope-13
+    |
+    |---POValueInputTez - scope-46	<-	 scope-43
+Tez vertex scope-47
+# Plan on vertex
+e: BloomFilter Rearrange[tuple]{int}(false) - scope-34	<-	 scope-52	->	 scope-51
+|   |
+|   Project[int][0] - scope-35
+|
+|---c: Filter[bag] - scope-17
+    |   |
+    |   Equal To[boolean] - scope-20
+    |   |
+    |   |---Project[int][0] - scope-18
+    |   |
+    |   |---Constant(10) - scope-19
+    |
+    |---POValueInputTez - scope-48	<-	 scope-43
+Tez vertex scope-51
+# Plan on vertex
+e: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-42
+|
+|---e: New For Each(true,true,true)[tuple] - scope-41
+    |   |
+    |   Project[bag][1] - scope-38
+    |   |
+    |   Project[bag][2] - scope-39
+    |   |
+    |   Project[bag][3] - scope-40
+    |
+    |---e: Package(Packager)[tuple]{int} - scope-31
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-7.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-7.gld
new file mode 100644
index 0000000..1acbb13
--- /dev/null
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-7.gld
@@ -0,0 +1,95 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-43	->	Tez vertex scope-45,Tez vertex scope-47,Tez vertex scope-51,Tez vertex scope-52,
+Tez vertex scope-52	->	Tez vertex scope-45,Tez vertex scope-47,
+Tez vertex scope-45	->	Tez vertex scope-51,
+Tez vertex scope-47	->	Tez vertex scope-51,
+Tez vertex scope-51
+
+Tez vertex scope-43
+# Plan on vertex
+a: Split - scope-58
+|   |
+|   e: BuildBloom Rearrange[tuple]{int}(false) - scope-36	->	[ scope-51, scope-52]
+|   |   |
+|   |   Project[int][0] - scope-37
+|   |
+|   |---d: Filter[bag] - scope-23
+|       |   |
+|       |   Greater Than[boolean] - scope-26
+|       |   |
+|       |   |---Project[int][0] - scope-24
+|       |   |
+|       |   |---Constant(10) - scope-25
+|   |
+|   POValueOutputTez - scope-44	->	 [scope-45, scope-47]
+|
+|---a: New For Each(false,false)[bag] - scope-7
+    |   |
+    |   Cast[int] - scope-2
+    |   |
+    |   |---Project[bytearray][0] - scope-1
+    |   |
+    |   Cast[int] - scope-5
+    |   |
+    |   |---Project[bytearray][1] - scope-4
+    |
+    |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-52
+# Combine plan on edge <scope-43>
+Local Rearrange[tuple]{int}(false) - scope-57	->	 scope-52
+|   |
+|   Project[int][0] - scope-56
+|
+|---Package(BloomPackager)[tuple]{int} - scope-55
+# Plan on vertex
+POValueOutputTez - scope-54	->	 [scope-45, scope-47]
+|
+|---Package(BloomPackager)[tuple]{int} - scope-53
+Tez vertex scope-45
+# Plan on vertex
+e: BloomFilter Rearrange[tuple]{int}(false) - scope-32	<-	 scope-52	->	 scope-51
+|   |
+|   Project[int][0] - scope-33
+|
+|---b: Filter[bag] - scope-11
+    |   |
+    |   Less Than[boolean] - scope-14
+    |   |
+    |   |---Project[int][0] - scope-12
+    |   |
+    |   |---Constant(5) - scope-13
+    |
+    |---POValueInputTez - scope-46	<-	 scope-43
+Tez vertex scope-47
+# Plan on vertex
+e: BloomFilter Rearrange[tuple]{int}(false) - scope-34	<-	 scope-52	->	 scope-51
+|   |
+|   Project[int][0] - scope-35
+|
+|---c: Filter[bag] - scope-17
+    |   |
+    |   Equal To[boolean] - scope-20
+    |   |
+    |   |---Project[int][0] - scope-18
+    |   |
+    |   |---Constant(10) - scope-19
+    |
+    |---POValueInputTez - scope-48	<-	 scope-43
+Tez vertex scope-51
+# Plan on vertex
+e: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-42
+|
+|---e: New For Each(true,true,true)[tuple] - scope-41
+    |   |
+    |   Project[bag][1] - scope-38
+    |   |
+    |   Project[bag][2] - scope-39
+    |   |
+    |   Project[bag][3] - scope-40
+    |
+    |---e: Package(Packager)[tuple]{int} - scope-31
diff --git a/test/org/apache/pig/tez/TestTezCompiler.java b/test/org/apache/pig/tez/TestTezCompiler.java
index d01a4c2..801c195 100644
--- a/test/org/apache/pig/tez/TestTezCompiler.java
+++ b/test/org/apache/pig/tez/TestTezCompiler.java
@@ -88,6 +88,7 @@
         pc.getProperties().remove(PigConfiguration.PIG_OPT_MULTIQUERY);
         pc.getProperties().remove(PigConfiguration.PIG_TEZ_OPT_UNION);
         pc.getProperties().remove(PigConfiguration.PIG_EXEC_NO_SECONDARY_KEY);
+        pc.getProperties().remove(PigConfiguration.PIG_BLOOMJOIN_STRATEGY);
         pigServer = new PigServer(pc);
     }
 
@@ -178,6 +179,125 @@
     }
 
     @Test
+    public void testBloomJoin() throws Exception {
+        String query =
+                "a = load 'file:///tmp/input1' as (x, y:int);" +
+                "b = load 'file:///tmp/input2' as (x, z:int);" +
+                "c = load 'file:///tmp/input2' as (x, w:int);" +
+                "d = join b by x, a by x, c by x using 'bloom';" +
+                "e = foreach d generate a::x as x, y, z, w;" +
+                "store e into 'file:///tmp/pigoutput';";
+
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-1.gld");
+        resetScope();
+        setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, "reduce");
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-1-KeyToReducer.gld");
+    }
+
+    @Test
+    public void testBloomJoinLeftOuter() throws Exception {
+        String query =
+                "a = load 'file:///tmp/input1' as (x:chararray, y:int);" +
+                "b = load 'file:///tmp/input2' as (x:chararray, z:int);" +
+                "d = join a by x left, b by x using 'bloom';" +
+                "e = foreach d generate a::x as x, y, z;" +
+                "store e into 'file:///tmp/pigoutput';";
+
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-2.gld");
+        resetScope();
+        setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, "reduce");
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-2-KeyToReducer.gld");
+    }
+
+    @Test
+    public void testBloomJoinUnion() throws Exception {
+        // Left input from a union
+        String query =
+                "a = load 'file:///tmp/input1' as (x:int, y:int);" +
+                "b = load 'file:///tmp/input2' as (x:int, z:int);" +
+                "c = load 'file:///tmp/input3' as (x:int, z:int);" +
+                "b = union b, c;" +
+                "d = join a by x, b by x using 'bloom';" +
+                "e = foreach d generate a::x as x, y, z;" +
+                "store e into 'file:///tmp/pigoutput';";
+
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-3.gld");
+        resetScope();
+        setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, "reduce");
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-3-KeyToReducer.gld");
+        setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, null);
+
+        resetScope();
+        // Right input from a union
+        query =
+                "a = load 'file:///tmp/input1' as (x:int, y:int);" +
+                "b = load 'file:///tmp/input2' as (x:int, z:int);" +
+                "c = load 'file:///tmp/input3' as (x:int, z:int);" +
+                "b = union b, c;" +
+                "d = join b by x, a by x using 'bloom';" +
+                "e = foreach d generate a::x as x, y, z;" +
+                "store e into 'file:///tmp/pigoutput';";
+
+        // Needs shared edges and PIG-3856 to be a more optimial plan
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-4.gld");
+        resetScope();
+        setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, "reduce");
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-4-KeyToReducer.gld");
+    }
+
+    @Test
+    public void testBloomJoinSplit() throws Exception {
+        // Left input from a split
+        String query =
+                "a = load 'file:///tmp/input1' as (x:int, y:int);" +
+                "b = load 'file:///tmp/input2' as (x:int, z:int);" +
+                "a1 = filter a by x == 3;" +
+                "a2 = filter a by x == 4;" +
+                "d = join a1 by x, a2 by x, b by x using 'bloom';" +
+                "e = foreach d generate a1::x as x, a1::y as y1, a2::y as y2, z;" +
+                "store e into 'file:///tmp/pigoutput';";
+
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-5.gld");
+        resetScope();
+        setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, "reduce");
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-5-KeyToReducer.gld");
+        setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, null);
+
+        resetScope();
+        // Right input from a split
+        query =
+                "a = load 'file:///tmp/input1' as (x:int, y:int);" +
+                "b = load 'file:///tmp/input2' as (x:int, z:int);" +
+                "a1 = filter a by x == 3;" +
+                "a2 = filter a by x == 4;" +
+                "d = join b by x, a1 by x using 'bloom';" +
+                "e = foreach d generate a1::x as x, y, z;" +
+                "store a2 into 'file:///tmp/pigoutput/a2';" +
+                "store e into 'file:///tmp/pigoutput/e';";
+
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-6.gld");
+        resetScope();
+        setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, "reduce");
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-6-KeyToReducer.gld");
+    }
+
+    @Test
+    public void testBloomSelfJoin() throws Exception {
+        String query =
+                "a = load 'file:///tmp/input1' as (x:int, y:int);" +
+                "b = filter a by x < 5;" +
+                "c = filter a by x == 10;" +
+                "d = filter a by x > 10;" +
+                "e = join b by x, c by x, d by x using 'bloom';" +
+                "store e into 'file:///tmp/pigoutput';";
+
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-7.gld");
+        resetScope();
+        setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, "reduce");
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-7-KeyToReducer.gld");
+    }
+
+    @Test
     public void testSelfJoin() throws Exception {
         String query =
                 "a = load 'file:///tmp/input1' as (x:int, y:int);" +