PIG-4963: Add a Bloom join (rohini)
git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1780431 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index d0a7454..d49cdf9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -34,6 +34,8 @@
IMPROVEMENTS
+PIG-4963: Add a Bloom join (rohini)
+
PIG-3938: Add LoadCaster to EvalFunc (knoguchi)
PIG-5105: Tez unit tests failing with "Argument list too long" (rohini)
diff --git a/src/docs/src/documentation/content/xdocs/basic.xml b/src/docs/src/documentation/content/xdocs/basic.xml
index b3a12c6..ea8b75b 100644
--- a/src/docs/src/documentation/content/xdocs/basic.xml
+++ b/src/docs/src/documentation/content/xdocs/basic.xml
@@ -6955,7 +6955,7 @@
<table>
<tr>
<td>
- <p>alias = JOIN alias BY {expression|'('expression [, expression …]')'} (, alias BY {expression|'('expression [, expression …]')'} …) [USING 'replicated' | 'skewed' | 'merge' | 'merge-sparse'] [PARTITION BY partitioner] [PARALLEL n]; </p>
+ <p>alias = JOIN alias BY {expression|'('expression [, expression …]')'} (, alias BY {expression|'('expression [, expression …]')'} …) [USING 'replicated' | 'bloom' | 'skewed' | 'merge' | 'merge-sparse'] [PARTITION BY partitioner] [PARALLEL n]; </p>
</td>
</tr>
</table></section>
@@ -7004,7 +7004,16 @@
<p>Use to perform replicated joins (see <a href="perf.html#replicated-joins">Replicated Joins</a>).</p>
</td>
</tr>
-
+
+ <tr>
+ <td>
+ <p>'bloom'</p>
+ </td>
+ <td>
+ <p>Use to perform bloom joins (see <a href="perf.html#bloom-joins">Bloom Joins</a>).</p>
+ </td>
+ </tr>
+
<tr>
<td>
<p>'skewed'</p>
@@ -7142,7 +7151,7 @@
<tr>
<td>
<p>alias = JOIN left-alias BY left-alias-column [LEFT|RIGHT|FULL] [OUTER], right-alias BY right-alias-column
- [USING 'replicated' | 'skewed' | 'merge'] [PARTITION BY partitioner] [PARALLEL n]; </p>
+ [USING 'replicated' | 'bloom' | 'skewed' | 'merge'] [PARTITION BY partitioner] [PARALLEL n]; </p>
</td>
</tr>
</table>
@@ -7213,7 +7222,7 @@
</td>
</tr>
- <tr>
+ <tr>
<td>
<p>USING</p>
</td>
@@ -7230,8 +7239,18 @@
<p>Only left outer join is supported for replicated joins.</p>
</td>
</tr>
-
- <tr>
+
+ <tr>
+ <td>
+ <p>'bloom'</p>
+ </td>
+ <td>
+ <p>Use to perform bloom joins (see <a href="perf.html#bloom-joins">Bloom Joins</a>).</p>
+ <p>Full outer join is not supported for bloom joins.</p>
+ </td>
+ </tr>
+
+ <tr>
<td>
<p>'skewed'</p>
</td>
@@ -7324,6 +7343,13 @@
C= JOIN A BY $0 LEFT, B BY $0 USING 'replicated';
</source>
+<p>This example shows a bloom right outer join.</p>
+<source>
+A = LOAD 'large';
+B = LOAD 'small';
+C= JOIN A BY $0 RIGHT, B BY $0 USING 'bloom';
+</source>
+
<p>This example shows a skewed full outer join.</p>
<source>
A = LOAD 'studenttab' as (name, age, gpa);
diff --git a/src/docs/src/documentation/content/xdocs/perf.xml b/src/docs/src/documentation/content/xdocs/perf.xml
index 4163351..de9b694 100644
--- a/src/docs/src/documentation/content/xdocs/perf.xml
+++ b/src/docs/src/documentation/content/xdocs/perf.xml
@@ -1202,6 +1202,100 @@
</section>
<!-- END FRAGMENT REPLICATE JOINS-->
+<!-- BLOOM JOINS-->
+<!-- +++++++++++++++++++++++++++++++ -->
+<section id="bloom-joins">
+<title>Bloom Joins</title>
+<p>Bloom join is a special type of join where a bloom filter is constructed using join keys of one relation and
+used to filter records of the other relations before doing a regular hash join.
+The amount of data sent to the reducers will be a lot less depending up on the numbers of records that are filtered on the map side.
+Bloom join is very useful in cases where the number of matching records between relations in a join are comparatively less
+compared to the total records allowing many to be filtered before the join.
+Before bloom join was added as a type of join, same functionality was achieved by users by using
+the <a href="func.html#bloom">builtin bloom udfs</a> which is not as efficient and required more lines of code as well.
+Currently bloom join is only implemented in Tez execution mode. Builtin bloom udfs have to be used for other execution modes.</p>
+
+<section>
+<title>Usage</title>
+<p>Perform a bloom join with the USING clause (see <a href="basic.html#join-inner">JOIN (inner)</a> and <a href="basic.html#join-outer">JOIN (outer)</a>).
+In this example, a large relation is joined with two smaller relations. Note that the large relation comes first followed by the smaller relations.
+Bloom filter is built from join keys of the right most relation which is small and the filter is applied on the big and medium relations.
+None of the relations are required to fit into main memory. </p>
+<source>
+big = LOAD 'big_data' AS (b1,b2,b3);
+
+medium = LOAD 'medium_data' AS (m1,m2,m3);
+
+small = LOAD 'small_data' AS (s1,s2,s3);
+
+C = JOIN big BY b1, medium BY m1, small BY s1 USING 'bloom';
+</source>
+
+<p>
+In the case of inner join and right outer join, the right most relation is used for building the bloom filter
+and the users are expected to specify the smaller dataset as the right most relation.
+But in the case of left outer join, the left most relation is used for building the bloom filter and is expected to be the smaller dataset.
+This is because all records of the outer relation should be in the result and no records can be filtered.
+If the left relation turns out to be the bigger dataset, it would not be as efficient to build the bloom filter on the bigger dataset.
+But it might still perform better than a regular join if it is able to filter lot of records from the right relation.
+</p>
+
+<source>
+big = LOAD 'big_data' AS (b1,b2,b3);
+
+small = LOAD 'small_data' AS (m1,m2,m3);
+
+C = JOIN small BY s1 LEFT, big BY b1 USING 'bloom';
+</source>
+</section>
+
+<section>
+<title>Conditions</title>
+<ul>
+<li>Bloom join cannot be used with a FULL OUTER join.</li>
+<li>If the the underlying data is sufficiently skewed, bloom join might not help. Skewed join can be considered for those cases.</li>
+</ul>
+</section>
+
+<section>
+<title>Tuning options</title>
+<p>
+There are multiple <a href="start.html#properties">pig properties</a> than can be configured to construct a more efficient bloom filter.
+See <a href="http://en.wikipedia.org/wiki/Bloom_filter">Bloom Filter</a> for a discussion of how to select the number of bits and the number of hash functions.
+Easier option would be to search for "bloom filter calculator" in a search engine and use one of the online bloom filter calculators available to arrive at the desired values.
+</p>
+<ul>
+<li>pig.bloomjoin.strategy - The valid values for this are 'map' and 'reduce'. Default value is map.
+Bloom join has two different kind of implementations to be more efficient in different cases.
+In general, there is an extra reduce step in the DAG for construction of the bloom filter(s).
+<ul>
+<li>map - In each map, bloom filters are computed on the join keys partitioned by the hashcode of the key
+with pig.bloomjoin.num.filters number of partitions.
+Bloom filters for each partition from different maps are then combined in the reducers producing one bloom filter per partition.
+The default value of pig.bloomjoin.num.filters is 1 for this strategy and so usually only one bloom filter is created.
+This is efficient and fast if there are smaller number of maps (<10) and the number of distinct keys are not too high.
+It can be faster with larger number of maps and even with bigger bloom vector sizes,
+ but the amount of data shuffled to the reducer for aggregation becomes huge making it inefficient.</li>
+<li>reduce - Join keys are sent from the map to the reducer partitioned by hashcode of the key with
+pig.bloomjoin.num.filters number of partitions. In the reducers, one bloom filter is then computed per partition.
+Number of reducers are set equal to the number of partitions allowing for each bloom filter to be computed in parallel.
+The default value of pig.bloomjoin.num.filters is 11 for this strategy.
+This is efficient for larger datasets with lot of maps or very large bloom vector size.
+In this case size of keys sent to the reducer is smaller than sending bloom filters to reducer for aggregation making it efficient.</li>
+</ul>
+</li>
+<li>pig.bloomjoin.num.filters - The number of bloom filters that will be created. Default is 1 for map strategy and 11 for reduce strategy.</li>
+<li>pig.bloomjoin.vectorsize.bytes - The size in bytes of the bit vector to be used for the bloom filter.
+A bigger vector size will be needed when the number of distinct keys is higher. Default value is 1048576 (1MB).</li>
+<li>pig.bloomjoin.hash.functions - The type of hash function to use. Valid values are 'jenkins' and 'murmur'. Default is murmur.</li>
+<li>pig.bloomjoin.hash.types - The number of hash functions to be used in bloom computation. It determines the probability of false positives.
+Higher the number lower the false positives. Too high a value can increase the cpu time. Default value is 3.</li>
+</ul>
+</section>
+
+</section>
+<!-- END BLOOM JOINS-->
+
<!-- +++++++++++++++++++++++++++++++ -->
<!-- SKEWED JOINS-->
<section id="skewed-joins">
diff --git a/src/org/apache/pig/PigConfiguration.java b/src/org/apache/pig/PigConfiguration.java
index 36adb2c..a58c382 100644
--- a/src/org/apache/pig/PigConfiguration.java
+++ b/src/org/apache/pig/PigConfiguration.java
@@ -134,6 +134,58 @@
public static final String PIG_SKEWEDJOIN_REDUCE_MEM = "pig.skewedjoin.reduce.mem";
/**
+ * Bloom join has two different kind of implementations.
+ * <ul>
+ * <li>map <br>
+ * In each map, bloom filters are computed on the join keys partitioned by
+ * the hashcode of the key with {@link #PIG_BLOOMJOIN_NUM_FILTERS} number of
+ * partitions. Bloom filters from different maps are then combined in the
+ * reducer producing one bloom filter per partition. This is efficient and
+ * fast if there are smaller number of maps (<10) and the number of
+ * distinct keys are not too high. It can be faster with larger number of
+ * maps and even with bigger bloom vector sizes, but the amount of data
+ * shuffled to the reducer for aggregation becomes huge making it
+ * inefficient.</li>
+ * <li>reduce <br>
+ * Join keys are sent from the map to the reducer partitioned by hashcode of
+ * the key with {@link #PIG_BLOOMJOIN_NUM_FILTERS} number of reducers. One
+ * bloom filter is then created per partition. This is efficient for larger
+ * datasets with lot of maps or very large
+ * {@link #PIG_BLOOMJOIN_VECTORSIZE_BYTES}. In this case size of keys sent
+ * to the reducer is smaller than sending bloom filters to reducer for
+ * aggregation making it efficient.</li>
+ * </ul>
+ * Default value is map.
+ */
+ public static final String PIG_BLOOMJOIN_STRATEGY = "pig.bloomjoin.strategy";
+
+ /**
+ * The number of bloom filters that will be created.
+ * Default is 1 for map strategy and 11 for reduce strategy.
+ */
+ public static final String PIG_BLOOMJOIN_NUM_FILTERS = "pig.bloomjoin.num.filters";
+
+ /**
+ * The size in bytes of the bit vector to be used for the bloom filter.
+ * A bigger vector size will be needed when the number of distinct keys is higher.
+ * Default value is 1048576 (1MB).
+ */
+ public static final String PIG_BLOOMJOIN_VECTORSIZE_BYTES = "pig.bloomjoin.vectorsize.bytes";
+
+ /**
+ * The type of hash function to use. Valid values are jenkins and murmur.
+ * Default is murmur.
+ */
+ public static final String PIG_BLOOMJOIN_HASH_TYPE = "pig.bloomjoin.hash.type";
+
+ /**
+ * The number of hash functions to be used in bloom computation. It determines the probability of false positives.
+ * Higher the number lower the false positives. Too high a value can increase the cpu time.
+ * Default value is 3.
+ */
+ public static final String PIG_BLOOMJOIN_HASH_FUNCTIONS = "pig.bloomjoin.hash.functions";
+
+ /**
* This key used to control the maximum size loaded into
* the distributed cache when doing fragment-replicated join
*/
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
index cacba40..ae8d357 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
@@ -1116,7 +1116,9 @@
try{
nonBlocking(op);
phyToMROpMap.put(op, curMROp);
- if (op.getPkgr().getPackageType() == PackageType.JOIN) {
+ if (op.getPkgr().getPackageType() == PackageType.JOIN
+ || op.getPkgr().getPackageType() == PackageType.BLOOMJOIN) {
+ // Bloom join is not implemented in mapreduce mode and falls back to regular join
curMROp.markRegularJoin();
} else if (op.getPkgr().getPackageType() == PackageType.GROUP) {
if (op.getNumInps() == 1) {
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
index 550ff65..1611a8f 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
@@ -37,6 +37,7 @@
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.JoinPackager;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.BloomPackager;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
@@ -160,7 +161,7 @@
// tuples out of the getnext() call of POJoinPackage
// In this case, we process till we see EOP from
// POJoinPacakage.getNext()
- if (pack.getPkgr() instanceof JoinPackager)
+ if (pack.getPkgr() instanceof JoinPackager || pack.getPkgr() instanceof BloomPackager)
{
pack.attachInput(key, tupIter.iterator());
while (true)
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java
index 9a70b81..c082044 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java
@@ -21,14 +21,16 @@
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPoissonSample;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POReservoirSample;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POBuildBloomRearrangeTez;
import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.VisitorException;
@@ -105,7 +107,7 @@
public void visitReservoirSample(POReservoirSample reservoirSample) throws VisitorException {
endOfAllInputFlag = true;
}
-
+
@Override
public void visitPoissonSample(POPoissonSample poissonSample) throws VisitorException {
endOfAllInputFlag = true;
@@ -122,6 +124,13 @@
}
}
+ @Override
+ public void visitLocalRearrange(POLocalRearrange lr) throws VisitorException{
+ if (lr instanceof POBuildBloomRearrangeTez) {
+ endOfAllInputFlag = true;
+ }
+ super.visitLocalRearrange(lr);
+ }
/**
* @return if end of all input is present
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
index 1ff1abd..0e35273 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
@@ -441,6 +441,10 @@
public void reset() {
}
+ public boolean isEndOfAllInput() {
+ return parentPlan.endOfAllInput;
+ }
+
/**
* @return PigProgressable stored in threadlocal
*/
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java
index 8992b6e..805f21a 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java
@@ -51,13 +51,13 @@
protected DataBag[] bags;
public static enum PackageType {
- GROUP, JOIN
+ GROUP, JOIN, BLOOMJOIN
};
protected transient Illustrator illustrator = null;
// The key being worked on
- Object key;
+ protected Object key;
// marker to indicate if key is a tuple
protected boolean isKeyTuple = false;
@@ -65,7 +65,7 @@
protected boolean isKeyCompound = false;
// key's type
- byte keyType;
+ protected byte keyType;
// The number of inputs to this
// co-group. 0 indicates a distinct, which means there will only be a
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
index 6e88528..5ed3ee3 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
@@ -479,7 +479,7 @@
POLocalRearrangeTez.class);
for (POLocalRearrangeTez lr : lrs) {
- if (lr.getOutputKey().equals(to.getOperatorKey().toString())) {
+ if (lr.containsOutputKey(to.getOperatorKey().toString())) {
byte keyType = lr.getKeyType();
setIntermediateOutputKeyValue(keyType, conf, to, lr.isConnectedToPackage(), isMergedInput);
// In case of secondary key sort, main key type is the actual key type
@@ -540,26 +540,36 @@
UserPayload payLoad = TezUtils.createUserPayloadFromConf(conf);
out.setUserPayload(payLoad);
+ in.setUserPayload(payLoad);
+ // Remove combiner and reset payload
if (!combinePlan.isEmpty()) {
boolean noCombineInReducer = false;
+ boolean noCombineInMapper = edge.getCombinerInMap() == null ? false : !edge.getCombinerInMap();
String reducerNoCombiner = globalConf.get(PigConfiguration.PIG_EXEC_NO_COMBINER_REDUCER);
- if (reducerNoCombiner == null || reducerNoCombiner.equals("auto")) {
+ if (edge.getCombinerInReducer() != null) {
+ noCombineInReducer = !edge.getCombinerInReducer();
+ } else if (reducerNoCombiner == null || reducerNoCombiner.equals("auto")) {
noCombineInReducer = TezCompilerUtil.bagDataTypeInCombinePlan(combinePlan);
} else {
noCombineInReducer = Boolean.parseBoolean(reducerNoCombiner);
}
- if (noCombineInReducer) {
+ if (noCombineInReducer || noCombineInMapper) {
log.info("Turning off combiner in reducer vertex " + to.getOperatorKey() + " for edge from " + from.getOperatorKey());
conf.unset(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINER_CLASS);
conf.unset(MRJobConfig.COMBINE_CLASS_ATTR);
conf.unset("pig.combinePlan");
conf.unset("pig.combine.package");
conf.unset("pig.map.keytype");
- payLoad = TezUtils.createUserPayloadFromConf(conf);
+ UserPayload payLoadWithoutCombiner = TezUtils.createUserPayloadFromConf(conf);
+ if (noCombineInMapper) {
+ out.setUserPayload(payLoadWithoutCombiner);
+ }
+ if (noCombineInReducer) {
+ in.setUserPayload(payLoadWithoutCombiner);
+ }
}
}
- in.setUserPayload(payLoad);
if (edge.dataMovementType!=DataMovementType.BROADCAST && to.getEstimatedParallelism()!=-1 && to.getVertexParallelism()==-1 && (to.isGlobalSort()||to.isSkewedJoin())) {
// Use custom edge
@@ -717,7 +727,7 @@
PlanHelper.getPhysicalOperators(pred.plan, POLocalRearrangeTez.class);
for (POLocalRearrangeTez lr : lrs) {
if (lr.isConnectedToPackage()
- && lr.getOutputKey().equals(tezOp.getOperatorKey().toString())) {
+ && lr.containsOutputKey(tezOp.getOperatorKey().toString())) {
localRearrangeMap.put((int) lr.getIndex(), inputKey);
if (isVertexGroup) {
isMergedInput = true;
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
index 6dc118f..c112528 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
@@ -32,10 +32,12 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.hash.Hash;
import org.apache.pig.CollectableLoadFunc;
import org.apache.pig.FuncSpec;
import org.apache.pig.IndexableLoadFunc;
@@ -44,8 +46,10 @@
import org.apache.pig.PigConfiguration;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigTupleWritableComparator;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MergeJoinIndexer;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigWritableComparators;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.ScalarPhyFinder;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.UDFFinder;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
@@ -82,7 +86,10 @@
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager.PackageType;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.BloomPackager;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.NativeTezOper;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POBloomFilterRearrangeTez;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POBuildBloomRearrangeTez;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POCounterStatsTez;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POCounterTez;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POFRJoinTez;
@@ -110,6 +117,7 @@
import org.apache.pig.impl.builtin.PartitionSkewedKeys;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.io.NullableIntWritable;
import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.Operator;
@@ -167,6 +175,10 @@
private Map<PhysicalOperator, TezOperator> phyToTezOpMap;
+ // Contains the inputs to operator like join, with the list maintaining the
+ // same order of join from left to right
+ private Map<TezOperator, List<TezOperator>> inputsMap;
+
public static final String USER_COMPARATOR_MARKER = "user.comparator.func:";
public static final String FILE_CONCATENATION_THRESHOLD = "pig.files.concatenation.threshold";
public static final String OPTIMISTIC_FILE_CONCATENATION = "pig.optimistic.files.concatenation";
@@ -175,6 +187,8 @@
private boolean optimisticFileConcatenation = false;
private List<String> readOnceLoadFuncs = null;
+ private Configuration conf;
+
private POLocalRearrangeTezFactory localRearrangeFactory;
public TezCompiler(PhysicalPlan plan, PigContext pigContext)
@@ -184,6 +198,7 @@
this.pigContext = pigContext;
pigProperties = pigContext.getProperties();
+ conf = ConfigurationUtil.toConfiguration(pigProperties, false);
splitsSeen = Maps.newHashMap();
tezPlan = new TezOperPlan();
nig = NodeIdGenerator.getGenerator();
@@ -197,6 +212,7 @@
scope = roots.get(0).getOperatorKey().getScope();
localRearrangeFactory = new POLocalRearrangeTezFactory(scope, nig);
phyToTezOpMap = Maps.newHashMap();
+ inputsMap = Maps.newHashMap();
fileConcatenationThreshold = Integer.parseInt(pigProperties
.getProperty(FILE_CONCATENATION_THRESHOLD, "100"));
@@ -894,6 +910,7 @@
public void visitGlobalRearrange(POGlobalRearrange op) throws VisitorException {
try {
blocking();
+ inputsMap.put(curTezOp, new ArrayList<>(Arrays.asList(compiledInputs)));
TezCompilerUtil.setCustomPartitioner(op.getCustomPartitioner(), curTezOp);
curTezOp.setRequestedParallelism(op.getRequestedParallelism());
if (op.isCross()) {
@@ -1340,6 +1357,9 @@
} else if (op.getNumInps() > 1) {
curTezOp.markCogroup();
}
+ } else if (op.getPkgr().getPackageType() == PackageType.BLOOMJOIN) {
+ curTezOp.markRegularJoin();
+ addBloomToJoin(op, curTezOp);
}
} catch (Exception e) {
int errCode = 2034;
@@ -1348,6 +1368,132 @@
}
}
+ private void addBloomToJoin(POPackage op, TezOperator curTezOp) throws PlanException {
+
+ List<TezOperator> inputs = inputsMap.get(curTezOp);
+ TezOperator buildBloomOp;
+ List<TezOperator> applyBloomOps = new ArrayList<>();
+
+ String strategy = conf.get(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, POBuildBloomRearrangeTez.DEFAULT_BLOOM_STRATEGY);
+ boolean createBloomInMap = "map".equals(strategy);
+ if (!createBloomInMap && !strategy.equals("reduce")) {
+ throw new PlanException(new IllegalArgumentException(
+ "Invalid value for "
+ + PigConfiguration.PIG_BLOOMJOIN_STRATEGY + " - "
+ + strategy + ". Valid values are map and reduce"));
+ }
+ int numHash = conf.getInt(PigConfiguration.PIG_BLOOMJOIN_HASH_FUNCTIONS, POBuildBloomRearrangeTez.DEFAULT_NUM_BLOOM_HASH_FUNCTIONS);
+ int vectorSizeBytes = conf.getInt(PigConfiguration.PIG_BLOOMJOIN_VECTORSIZE_BYTES, POBuildBloomRearrangeTez.DEFAULT_BLOOM_VECTOR_SIZE_BYTES);
+ int numBloomFilters = POBuildBloomRearrangeTez.getNumBloomFilters(conf);
+ int hashType = Hash.parseHashType(conf.get(PigConfiguration.PIG_BLOOMJOIN_HASH_TYPE, POBuildBloomRearrangeTez.DEFAULT_BLOOM_HASH_TYPE));
+
+ // We build bloom of the right most input and apply the bloom filter on the left inputs by default.
+ // But in case of left outer join we build bloom of the left input and use it on the right input
+ boolean[] inner = op.getPkgr().getInner();
+ boolean skipNullKeys = true;
+ if (inner[inner.length - 1]) { // inner has from right to left while inputs has from left to right
+ buildBloomOp = inputs.get(inputs.size() - 1); // Bloom filter is built from right most input
+ for (int i = 0; i < (inner.length - 1); i++) {
+ applyBloomOps.add(inputs.get(i));
+ }
+ skipNullKeys = inner[0];
+ } else {
+ // Left outer join
+ skipNullKeys = false;
+ buildBloomOp = inputs.get(0); // Bloom filter is built from left most input
+ for (int i = 1; i < inner.length; i++) {
+ applyBloomOps.add(inputs.get(i));
+ }
+ }
+
+ // Add BuildBloom operator to the input
+ POLocalRearrangeTez lr = (POLocalRearrangeTez) buildBloomOp.plan.getLeaves().get(0);
+ POBuildBloomRearrangeTez bbr = new POBuildBloomRearrangeTez(lr, createBloomInMap, numBloomFilters, vectorSizeBytes, numHash, hashType);
+ bbr.setSkipNullKeys(skipNullKeys);
+ buildBloomOp.plan.remove(lr);
+ buildBloomOp.plan.addAsLeaf(bbr);
+
+ // Add a new reduce vertex that will construct the final bloom filter
+ // - by combining the bloom filters from the buildBloomOp input tasks in the map strategy
+ // - or directly from the keys from the buildBloomOp input tasks in the reduce strategy
+ TezOperator combineBloomOp = getTezOp();
+ tezPlan.add(combineBloomOp);
+ combineBloomOp.markBuildBloom();
+ // Explicitly set the parallelism for the new vertex to number of bloom filters.
+ // Auto parallelism will bring it down based on the actual output size
+ combineBloomOp.setEstimatedParallelism(numBloomFilters);
+ // We don't want parallelism to be changed during the run by grace auto parallelism
+ // It will take the whole input size and estimate way higher
+ combineBloomOp.setDontEstimateParallelism(true);
+
+ String combineBloomOpKey = combineBloomOp.getOperatorKey().toString();
+ TezEdgeDescriptor edge = new TezEdgeDescriptor();
+ TezCompilerUtil.connect(tezPlan, buildBloomOp, combineBloomOp, edge);
+ bbr.setBloomOutputKey(combineBloomOpKey);
+
+
+ POPackage pkg = new POPackage(OperatorKey.genOpKey(scope));
+ pkg.setNumInps(1);
+ BloomPackager pkgr = new BloomPackager(createBloomInMap, vectorSizeBytes, numHash, hashType);;
+ pkgr.setKeyType(DataType.INTEGER);
+ pkg.setPkgr(pkgr);
+ POValueOutputTez combineBloomOutput = new POValueOutputTez(OperatorKey.genOpKey(scope));
+ combineBloomOp.plan.addAsLeaf(pkg);
+ combineBloomOp.plan.addAsLeaf(combineBloomOutput);
+
+ edge.setIntermediateOutputKeyClass(NullableIntWritable.class.getName());
+ edge.setIntermediateOutputKeyComparatorClass(PigWritableComparators.PigIntRawBytesComparator.class.getName());
+
+ // Add combiner as well.
+ POPackage pkg_c = new POPackage(OperatorKey.genOpKey(scope));
+ BloomPackager combinerPkgr = new BloomPackager(createBloomInMap, vectorSizeBytes, numHash, hashType);
+ combinerPkgr.setCombiner(true);
+ combinerPkgr.setKeyType(DataType.INTEGER);
+ pkg_c.setPkgr(combinerPkgr);
+ pkg_c.setNumInps(1);
+ edge.combinePlan.addAsLeaf(pkg_c);
+ POProject prjKey = new POProject(OperatorKey.genOpKey(scope));
+ prjKey.setResultType(DataType.INTEGER);
+ List<PhysicalPlan> clrInps = new ArrayList<PhysicalPlan>();
+ PhysicalPlan pp = new PhysicalPlan();
+ pp.add(prjKey);
+ clrInps.add(pp);
+ POLocalRearrangeTez clr = localRearrangeFactory.create(0, LocalRearrangeType.WITHPLAN, clrInps, DataType.INTEGER);
+ clr.setOutputKey(combineBloomOpKey);
+ edge.combinePlan.addAsLeaf(clr);
+
+ if (createBloomInMap) {
+ // No combiner needed on map as there will be only one bloom filter per map for each partition
+ // In the reducer, the bloom filters will be combined with same logic of reduce in BloomPackager
+ edge.setCombinerInMap(false);
+ edge.setCombinerInReducer(true);
+ } else {
+ pkgr.setBloomKeyType(op.getPkgr().getKeyType());
+ // Do distinct of the keys on the map side to reduce data sent to reducers.
+ // In case of reduce, not adding a combiner and doing the distinct during reduce itself.
+ // If needed one can be added later
+ edge.setCombinerInMap(true);
+ edge.setCombinerInReducer(false);
+ }
+
+ // Broadcast the final bloom filter to other inputs
+ for (TezOperator applyBloomOp : applyBloomOps) {
+ applyBloomOp.markFilterBloom();
+ lr = (POLocalRearrangeTez) applyBloomOp.plan.getLeaves().get(0);
+ POBloomFilterRearrangeTez bfr = new POBloomFilterRearrangeTez(lr, numBloomFilters);
+ applyBloomOp.plan.remove(lr);
+ applyBloomOp.plan.addAsLeaf(bfr);
+ bfr.setInputKey(combineBloomOpKey);
+ edge = new TezEdgeDescriptor();
+ edge.setIntermediateOutputKeyClass(NullableIntWritable.class.getName());
+ edge.setIntermediateOutputKeyComparatorClass(PigWritableComparators.PigIntRawBytesComparator.class.getName());
+ TezCompilerUtil.configureValueOnlyTupleOutput(edge, DataMovementType.BROADCAST);
+ TezCompilerUtil.connect(tezPlan, combineBloomOp, applyBloomOp, edge);
+ combineBloomOutput.addOutputKey(applyBloomOp.getOperatorKey().toString());
+ }
+
+ }
+
@Override
public void visitPOForEach(POForEach op) throws VisitorException{
try{
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java
index cfdd3f1..97f94d5 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java
@@ -31,9 +31,13 @@
* Descriptor for Tez edge. It holds combine plan as well as edge properties.
*/
public class TezEdgeDescriptor implements Serializable {
- // Combiner runs on both input and output of Tez edge.
- transient public PhysicalPlan combinePlan;
+
+ public transient PhysicalPlan combinePlan;
private boolean needsDistinctCombiner;
+ // Combiner runs on both input and output of Tez edge by default
+ // It can be configured to run only in output(map) or input(reduce)
+ private Boolean combinerInMap;
+ private Boolean combinerInReducer;
public String inputClassName;
public String outputClassName;
@@ -74,6 +78,22 @@
needsDistinctCombiner = nic;
}
+ public Boolean getCombinerInMap() {
+ return combinerInMap;
+ }
+
+ public void setCombinerInMap(Boolean combinerInMap) {
+ this.combinerInMap = combinerInMap;
+ }
+
+ public Boolean getCombinerInReducer() {
+ return combinerInReducer;
+ }
+
+ public void setCombinerInReducer(Boolean combinerInReducer) {
+ this.combinerInReducer = combinerInReducer;
+ }
+
public boolean isUseSecondaryKey() {
return useSecondaryKey;
}
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
index 4d134ae..5d8ade9 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
@@ -181,7 +181,11 @@
// Indicate if this job is a native job
NATIVE,
// Indicate if this job does rank counter
- RANK_COUNTER;
+ RANK_COUNTER,
+ // Indicate if this job constructs bloom filter
+ BUILDBLOOM,
+ // Indicate if this job applies bloom filter
+ FILTERBLOOM;
};
// Features in the job/vertex. Mostly will be only one feature.
@@ -453,6 +457,22 @@
feature.set(OPER_FEATURE.RANK_COUNTER.ordinal());
}
+ public boolean isBuildBloom() {
+ return feature.get(OPER_FEATURE.BUILDBLOOM.ordinal());
+ }
+
+ public void markBuildBloom() {
+ feature.set(OPER_FEATURE.BUILDBLOOM.ordinal());
+ }
+
+ public boolean isFilterBloom() {
+ return feature.get(OPER_FEATURE.FILTERBLOOM.ordinal());
+ }
+
+ public void markFilterBloom() {
+ feature.set(OPER_FEATURE.FILTERBLOOM.ordinal());
+ }
+
public void copyFeatures(TezOperator copyFrom, List<OPER_FEATURE> excludeFeatures) {
for (OPER_FEATURE opf : OPER_FEATURE.values()) {
if (excludeFeatures != null && excludeFeatures.contains(opf)) {
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java
index bf0a837..21ccbd3 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java
@@ -31,6 +31,7 @@
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.BloomPackager;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POLocalRearrangeTez;
import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.VisitorException;
@@ -161,7 +162,7 @@
@Override
public void visitLocalRearrange(POLocalRearrange lrearrange) throws VisitorException {
POLocalRearrangeTez lr = (POLocalRearrangeTez) lrearrange;
- if (!(lr.isConnectedToPackage() && lr.getOutputKey().equals(pkgTezOp.getOperatorKey().toString()))) {
+ if (!(lr.isConnectedToPackage() && lr.containsOutputKey(pkgTezOp.getOperatorKey().toString()))) {
return;
}
loRearrangeFound++;
@@ -180,7 +181,9 @@
if(keyInfo == null)
keyInfo = new HashMap<Integer, Pair<Boolean, Map<Integer, Integer>>>();
- Integer index = Integer.valueOf(lrearrange.getIndex());
+ // For BloomPackager there is only one input, but the
+ // POBuildBloomRearrangeTez index is that of the join's index and can be non-zero
+ Integer index = (pkg.getPkgr() instanceof BloomPackager) ? 0 : Integer.valueOf(lrearrange.getIndex());
if(keyInfo.get(index) != null) {
if (isPOSplit) {
// Case of POSplit having more than one input in case of self join or union
@@ -197,12 +200,20 @@
}
- keyInfo.put(index,
- new Pair<Boolean, Map<Integer, Integer>>(
- lrearrange.isProjectStar(), lrearrange.getProjectedColsMap()));
- pkg.getPkgr().setKeyInfo(keyInfo);
- pkg.getPkgr().setKeyTuple(lrearrange.isKeyTuple());
- pkg.getPkgr().setKeyCompound(lrearrange.isKeyCompound());
+ if (pkg.getPkgr() instanceof BloomPackager ) {
+ keyInfo.put(index,
+ new Pair<Boolean, Map<Integer, Integer>>(
+ Boolean.FALSE, new HashMap<Integer, Integer>()));
+ pkg.getPkgr().setKeyInfo(keyInfo);
+ } else {
+ keyInfo.put(index,
+ new Pair<Boolean, Map<Integer, Integer>>(
+ lrearrange.isProjectStar(), lrearrange.getProjectedColsMap()));
+ pkg.getPkgr().setKeyInfo(keyInfo);
+ pkg.getPkgr().setKeyTuple(lrearrange.isKeyTuple());
+ pkg.getPkgr().setKeyCompound(lrearrange.isKeyCompound());
+ }
+
}
/**
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java
new file mode 100644
index 0000000..7e3e325
--- /dev/null
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez.plan.operator;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Iterator;
+
+import org.apache.hadoop.util.bloom.BloomFilter;
+import org.apache.hadoop.util.bloom.Key;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager;
+import org.apache.pig.builtin.BuildBloomBase;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+
+public class BloomPackager extends Packager {
+
+ private static final long serialVersionUID = 1L;
+
+ private boolean bloomCreatedInMap;
+ private int vectorSizeBytes;
+ private int numHash;
+ private int hashType;
+ private byte bloomKeyType;
+ private boolean isCombiner;
+
+ private transient ByteArrayOutputStream baos;
+ private transient Iterator<Object> distinctKeyIter;
+
+ public BloomPackager(boolean bloomCreatedInMap, int vectorSizeBytes,
+ int numHash, int hashType) {
+ super();
+ this.bloomCreatedInMap = bloomCreatedInMap;
+ this.vectorSizeBytes = vectorSizeBytes;
+ this.numHash = numHash;
+ this.hashType = hashType;
+ }
+
+ public void setBloomKeyType(byte keyType) {
+ bloomKeyType = keyType;
+ }
+
+ public void setCombiner(boolean isCombiner) {
+ this.isCombiner = isCombiner;
+ }
+
+ @Override
+ public void attachInput(Object key, DataBag[] bags, boolean[] readOnce)
+ throws ExecException {
+ this.key = key;
+ this.bags = bags;
+ this.readOnce = readOnce;
+ // Bag can be read directly and need not be materialized again
+ }
+
+ @Override
+ public Result getNext() throws ExecException {
+ try {
+ if (bloomCreatedInMap) {
+ if (bags == null) {
+ return new Result(POStatus.STATUS_EOP, null);
+ }
+ // Same function for combiner and reducer
+ return combineBloomFilters();
+ } else {
+ if (isCombiner) {
+ return getDistinctBloomKeys();
+ } else {
+ if (bags == null) {
+ return new Result(POStatus.STATUS_EOP, null);
+ }
+ return createBloomFilter();
+ }
+ }
+ } catch (IOException e) {
+ throw new ExecException("Error while constructing final bloom filter", e);
+ }
+ }
+
+ private Result combineBloomFilters() throws IOException {
+ // We get a bag of bloom filters. combine them into one
+ Iterator<Tuple> iter = bags[0].iterator();
+ Tuple tup = iter.next();
+ DataByteArray bloomBytes = (DataByteArray) tup.get(0);
+ BloomFilter bloomFilter = BuildBloomBase.bloomIn(bloomBytes);
+ while (iter.hasNext()) {
+ tup = iter.next();
+ bloomFilter.or(BuildBloomBase.bloomIn((DataByteArray) tup.get(0)));
+ }
+
+ Object partition = key;
+ detachInput(); // Free up the key and bags reference
+
+ return getSerializedBloomFilter(partition, bloomFilter, bloomBytes.get().length);
+ }
+
+ private Result createBloomFilter() throws IOException {
+ // We get a bag of keys. Create a bloom filter from them
+ // First do distinct of the keys. Not using DistinctBag as memory should not be a problem.
+ HashSet<Object> bloomKeys = new HashSet<>();
+ Iterator<Tuple> iter = bags[0].iterator();
+ while (iter.hasNext()) {
+ bloomKeys.add(iter.next().get(0));
+ }
+
+ Object partition = key;
+ detachInput(); // Free up the key and bags reference
+
+ BloomFilter bloomFilter = new BloomFilter(vectorSizeBytes * 8, numHash, hashType);
+ for (Object bloomKey: bloomKeys) {
+ Key k = new Key(DataType.toBytes(bloomKey, bloomKeyType));
+ bloomFilter.add(k);
+ }
+ bloomKeys = null;
+ return getSerializedBloomFilter(partition, bloomFilter, vectorSizeBytes + 64);
+
+ }
+
+ private Result getSerializedBloomFilter(Object partition,
+ BloomFilter bloomFilter, int serializedSize) throws ExecException,
+ IOException {
+ if (baos == null) {
+ baos = new ByteArrayOutputStream(serializedSize);
+ }
+ baos.reset();
+ DataOutputStream dos = new DataOutputStream(baos);
+ bloomFilter.write(dos);
+ dos.flush();
+
+ Tuple res = mTupleFactory.newTuple(2);
+ res.set(0, partition);
+ res.set(1, new DataByteArray(baos.toByteArray()));
+
+ Result r = new Result();
+ r.result = res;
+ r.returnStatus = POStatus.STATUS_OK;
+ return r;
+ }
+
+ private Result getDistinctBloomKeys() throws ExecException {
+ if (distinctKeyIter == null) {
+ HashSet<Object> bloomKeys = new HashSet<>();
+ Iterator<Tuple> iter = bags[0].iterator();
+ while (iter.hasNext()) {
+ bloomKeys.add(iter.next().get(0));
+ }
+ distinctKeyIter = bloomKeys.iterator();
+ }
+ while (distinctKeyIter.hasNext()) {
+ Tuple res = mTupleFactory.newTuple(2);
+ res.set(0, key);
+ res.set(1, distinctKeyIter.next());
+
+ Result r = new Result();
+ r.result = res;
+ r.returnStatus = POStatus.STATUS_OK;
+ return r;
+ }
+ distinctKeyIter = null;
+ return new Result(POStatus.STATUS_EOP, null);
+ }
+}
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBloomFilterRearrangeTez.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBloomFilterRearrangeTez.java
new file mode 100644
index 0000000..82b599d
--- /dev/null
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBloomFilterRearrangeTez.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez.plan.operator;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.bloom.BloomFilter;
+import org.apache.hadoop.util.bloom.Key;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.HDataType;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.tez.runtime.ObjectCache;
+import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
+import org.apache.pig.builtin.BuildBloomBase;
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.classification.InterfaceStability;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class POBloomFilterRearrangeTez extends POLocalRearrangeTez implements TezInput {
+ private static final long serialVersionUID = 1L;
+
+ private static final Log LOG = LogFactory.getLog(POBloomFilterRearrangeTez.class);
+ private String inputKey;
+ private transient KeyValueReader reader;
+ private transient String cacheKey;
+ private int numBloomFilters;
+ private transient BloomFilter[] bloomFilters;
+
+ public POBloomFilterRearrangeTez(POLocalRearrangeTez lr, int numBloomFilters) {
+ super(lr);
+ this.numBloomFilters = numBloomFilters;
+ }
+
+ public void setInputKey(String inputKey) {
+ this.inputKey = inputKey;
+ }
+
+ @Override
+ public String[] getTezInputs() {
+ return new String[] { inputKey };
+ }
+
+ @Override
+ public void replaceInput(String oldInputKey, String newInputKey) {
+ if (oldInputKey.equals(inputKey)) {
+ inputKey = newInputKey;
+ }
+ }
+
+ @Override
+ public void addInputsToSkip(Set<String> inputsToSkip) {
+ cacheKey = "bloom-" + inputKey;
+ Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey);
+ if (cacheValue != null) {
+ inputsToSkip.add(inputKey);
+ }
+ }
+
+ @Override
+ public void attachInputs(Map<String, LogicalInput> inputs,
+ Configuration conf) throws ExecException {
+ Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey);
+ if (cacheValue != null) {
+ bloomFilters = (BloomFilter[]) cacheValue;
+ return;
+ }
+ LogicalInput input = inputs.get(inputKey);
+ if (input == null) {
+ throw new ExecException("Input from vertex " + inputKey + " is missing");
+ }
+ try {
+ reader = (KeyValueReader) input.getReader();
+ LOG.info("Attached input from vertex " + inputKey + " : input=" + input + ", reader=" + reader);
+ while (reader.next()) {
+ if (bloomFilters == null) {
+ bloomFilters = new BloomFilter[numBloomFilters];
+ }
+ Tuple val = (Tuple) reader.getCurrentValue();
+ int index = (int) val.get(0);
+ bloomFilters[index] = BuildBloomBase.bloomIn((DataByteArray) val.get(1));
+ }
+ ObjectCache.getInstance().cache(cacheKey, bloomFilters);
+ } catch (Exception e) {
+ throw new ExecException(e);
+ }
+ }
+
+ @Override
+ public Result getNextTuple() throws ExecException {
+
+ // If there is no bloom filter, then it means right input was empty
+ // Skip processing
+ if (bloomFilters == null) {
+ return RESULT_EOP;
+ }
+
+ while (true) {
+ res = super.getRearrangedTuple();
+ try {
+ switch (res.returnStatus) {
+ case POStatus.STATUS_OK:
+ if (illustrator == null) {
+ Tuple result = (Tuple) res.result;
+ Byte index = (Byte) result.get(0);
+
+ // Skip the record if key is not in the bloom filter
+ if (!isKeyInBloomFilter(result.get(1))) {
+ continue;
+ }
+ PigNullableWritable key = HDataType.getWritableComparableTypes(result.get(1), keyType);
+ NullableTuple val = new NullableTuple((Tuple)result.get(2));
+ key.setIndex(index);
+ val.setIndex(index);
+ writer.write(key, val);
+ } else {
+ illustratorMarkup(res.result, res.result, 0);
+ }
+ continue;
+ case POStatus.STATUS_NULL:
+ continue;
+ case POStatus.STATUS_EOP:
+ case POStatus.STATUS_ERR:
+ default:
+ return res;
+ }
+ } catch (IOException ioe) {
+ int errCode = 2135;
+ String msg = "Received error from POBloomFilterRearrage function." + ioe.getMessage();
+ throw new ExecException(msg, errCode, ioe);
+ }
+ }
+ }
+
+ private boolean isKeyInBloomFilter(Object key) throws ExecException {
+ if (key == null) {
+ // Null values are dropped in a inner join and in the case of outer join,
+ // POBloomFilterRearrangeTez is only in the plan on the non outer relation.
+ // So just skip them
+ return false;
+ }
+ if (bloomFilters.length == 1) {
+ // Skip computing hashcode
+ Key k = new Key(DataType.toBytes(key, keyType));
+ return bloomFilters[0].membershipTest(k);
+ } else {
+ int partition = (key.hashCode() & Integer.MAX_VALUE) % numBloomFilters;
+ BloomFilter filter = bloomFilters[partition];
+ if (filter != null) {
+ Key k = new Key(DataType.toBytes(key, keyType));
+ return filter.membershipTest(k);
+ }
+ return false;
+ }
+ }
+
+ @Override
+ public POBloomFilterRearrangeTez clone() throws CloneNotSupportedException {
+ return (POBloomFilterRearrangeTez) super.clone();
+ }
+
+ @Override
+ public String name() {
+ return getAliasString() + "BloomFilter Rearrange" + "["
+ + DataType.findTypeName(resultType) + "]" + "{"
+ + DataType.findTypeName(keyType) + "}" + "(" + mIsDistinct
+ + ") - " + mKey.toString() + "\t<-\t " + inputKey + "\t->\t " + outputKey;
+ }
+
+}
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBuildBloomRearrangeTez.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBuildBloomRearrangeTez.java
new file mode 100644
index 0000000..eb8a612
--- /dev/null
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBuildBloomRearrangeTez.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez.plan.operator;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.bloom.BloomFilter;
+import org.apache.hadoop.util.bloom.Key;
+import org.apache.pig.PigConfiguration;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.HDataType;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.classification.InterfaceStability;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.NullableIntWritable;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
+
+/**
+ * This operator writes out the key value for the hash join reduce operation similar to POLocalRearrangeTez.
+ * In addition, it also writes out the bloom filter constructed from the join keys
+ * in the case of bloomjoin map strategy or join keys themselves in case of reduce strategy.
+ *
+ * Using multiple bloom filters partitioned by the hash of the key allows for parallelism.
+ * It also allows us to have lower false positives with smaller vector sizes.
+ *
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class POBuildBloomRearrangeTez extends POLocalRearrangeTez {
+ private static final long serialVersionUID = 1L;
+ private static final Log LOG = LogFactory.getLog(POBuildBloomRearrangeTez.class);
+
+ public static final String DEFAULT_BLOOM_STRATEGY = "map";
+ public static final int DEFAULT_NUM_BLOOM_FILTERS_REDUCE = 11;
+ public static final int DEFAULT_NUM_BLOOM_HASH_FUNCTIONS = 3;
+ public static final String DEFAULT_BLOOM_HASH_TYPE = "murmur";
+ public static final int DEFAULT_BLOOM_VECTOR_SIZE_BYTES = 1024 * 1024;
+
+ private String bloomOutputKey;
+ private boolean skipNullKeys = false;
+ private boolean createBloomInMap;
+ private int numBloomFilters;
+ private int vectorSizeBytes;
+ private int numHash;
+ private int hashType;
+
+ private transient BloomFilter[] bloomFilters;
+ private transient KeyValueWriter bloomWriter;
+ private transient PigNullableWritable nullKey;
+ private transient Tuple bloomValue;
+ private transient NullableTuple bloomNullableTuple;
+
+ public POBuildBloomRearrangeTez(POLocalRearrangeTez lr,
+ boolean createBloomInMap, int numBloomFilters, int vectorSizeBytes,
+ int numHash, int hashType) {
+ super(lr);
+ this.createBloomInMap = createBloomInMap;
+ this.numBloomFilters = numBloomFilters;
+ this.vectorSizeBytes = vectorSizeBytes;
+ this.numHash = numHash;
+ this.hashType = hashType;
+ }
+
+ public static int getNumBloomFilters(Configuration conf) {
+ if ("map".equals(conf.get(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, DEFAULT_BLOOM_STRATEGY))) {
+ return conf.getInt(PigConfiguration.PIG_BLOOMJOIN_NUM_FILTERS, 1);
+ } else {
+ return conf.getInt(PigConfiguration.PIG_BLOOMJOIN_NUM_FILTERS, DEFAULT_NUM_BLOOM_FILTERS_REDUCE);
+ }
+ }
+
+ public void setSkipNullKeys(boolean skipNullKeys) {
+ this.skipNullKeys = skipNullKeys;
+ }
+
+ public void setBloomOutputKey(String bloomOutputKey) {
+ this.bloomOutputKey = bloomOutputKey;
+ }
+
+ @Override
+ public boolean containsOutputKey(String key) {
+ if(super.containsOutputKey(key)) {
+ return true;
+ }
+ return bloomOutputKey.equals(key);
+ }
+
+ @Override
+ public String[] getTezOutputs() {
+ return new String[] { outputKey, bloomOutputKey };
+ }
+
+ @Override
+ public void replaceOutput(String oldOutputKey, String newOutputKey) {
+ if (oldOutputKey.equals(outputKey)) {
+ outputKey = newOutputKey;
+ } else if (oldOutputKey.equals(bloomOutputKey)) {
+ bloomOutputKey = newOutputKey;
+ }
+ }
+
+ @Override
+ public void attachOutputs(Map<String, LogicalOutput> outputs,
+ Configuration conf) throws ExecException {
+ super.attachOutputs(outputs, conf);
+ LogicalOutput output = outputs.get(bloomOutputKey);
+ if (output == null) {
+ throw new ExecException("Output to vertex " + bloomOutputKey + " is missing");
+ }
+ try {
+ bloomWriter = (KeyValueWriter) output.getWriter();
+ LOG.info("Attached output to vertex " + bloomOutputKey + " : output=" + output + ", writer=" + bloomWriter);
+ } catch (Exception e) {
+ throw new ExecException(e);
+ }
+ bloomFilters = new BloomFilter[numBloomFilters];
+ bloomValue = mTupleFactory.newTuple(1);
+ bloomNullableTuple = new NullableTuple(bloomValue);
+ }
+
+ @Override
+ public Result getNextTuple() throws ExecException {
+
+ PigNullableWritable key;
+ while (true) {
+ res = super.getRearrangedTuple();
+ try {
+ switch (res.returnStatus) {
+ case POStatus.STATUS_OK:
+ if (illustrator == null) {
+ Tuple result = (Tuple) res.result;
+ Byte index = (Byte) result.get(0);
+
+ Object keyObj = result.get(1);
+ if (keyObj != null) {
+ key = HDataType.getWritableComparableTypes(keyObj, keyType);
+ // null keys cannot be part of bloom filter
+ // Since they are also dropped during join we can skip them
+ if (createBloomInMap) {
+ addKeyToBloomFilter(keyObj);
+ } else {
+ writeJoinKeyForBloom(keyObj);
+ }
+ } else if (skipNullKeys) {
+ // Inner join. So don't bother writing null key
+ continue;
+ } else {
+ if (nullKey == null) {
+ nullKey = HDataType.getWritableComparableTypes(keyObj, keyType);
+ }
+ key = nullKey;
+ }
+
+ NullableTuple val = new NullableTuple((Tuple)result.get(2));
+ key.setIndex(index);
+ val.setIndex(index);
+ writer.write(key, val);
+ } else {
+ illustratorMarkup(res.result, res.result, 0);
+ }
+ continue;
+ case POStatus.STATUS_NULL:
+ continue;
+ case POStatus.STATUS_EOP:
+ if (this.parentPlan.endOfAllInput && createBloomInMap) {
+ // In case of Split will get EOP after every record.
+ // So check for endOfAllInput
+ writeBloomFilters();
+ }
+ case POStatus.STATUS_ERR:
+ default:
+ return res;
+ }
+ } catch (IOException ioe) {
+ int errCode = 2135;
+ String msg = "Received error from POBuildBloomRearrage function." + ioe.getMessage();
+ throw new ExecException(msg, errCode, ioe);
+ }
+ }
+ }
+
+ private void addKeyToBloomFilter(Object key) throws ExecException {
+ Key k = new Key(DataType.toBytes(key, keyType));
+ if (bloomFilters.length == 1) {
+ if (bloomFilters[0] == null) {
+ bloomFilters[0] = new BloomFilter(vectorSizeBytes * 8, numHash, hashType);
+ }
+ bloomFilters[0].add(k);
+ } else {
+ int partition = (key.hashCode() & Integer.MAX_VALUE) % numBloomFilters;
+ BloomFilter filter = bloomFilters[partition];
+ if (filter == null) {
+ filter = new BloomFilter(vectorSizeBytes * 8, numHash, hashType);
+ bloomFilters[partition] = filter;
+ }
+ filter.add(k);
+ }
+ }
+
+ private void writeJoinKeyForBloom(Object key) throws IOException {
+ int partition = (key.hashCode() & Integer.MAX_VALUE) % numBloomFilters;
+ bloomValue.set(0, key);
+ bloomWriter.write(new NullableIntWritable(partition), bloomNullableTuple);
+ }
+
+ private void writeBloomFilters() throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(vectorSizeBytes + 64);
+ for (int i = 0; i < bloomFilters.length; i++) {
+ if (bloomFilters[i] != null) {
+ DataOutputStream dos = new DataOutputStream(baos);
+ bloomFilters[i].write(dos);
+ dos.flush();
+ bloomValue.set(0, new DataByteArray(baos.toByteArray()));
+ bloomWriter.write(new NullableIntWritable(i), bloomNullableTuple);
+ baos.reset();
+ }
+ }
+ }
+
+ @Override
+ public POBuildBloomRearrangeTez clone() throws CloneNotSupportedException {
+ return (POBuildBloomRearrangeTez) super.clone();
+ }
+
+ @Override
+ public String name() {
+ return getAliasString() + "BuildBloom Rearrange" + "["
+ + DataType.findTypeName(resultType) + "]" + "{"
+ + DataType.findTypeName(keyType) + "}" + "(" + mIsDistinct
+ + ") - " + mKey.toString() + "\t->\t[ " + outputKey + ", " + bloomOutputKey +"]";
+ }
+
+}
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java
index 0f07483..1552103 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java
@@ -71,8 +71,8 @@
}
}
- public String getOutputKey() {
- return outputKey;
+ public boolean containsOutputKey(String key) {
+ return outputKey.equals(key);
}
public void setOutputKey(String outputKey) {
@@ -122,6 +122,10 @@
}
}
+ protected Result getRearrangedTuple() throws ExecException {
+ return super.getNextTuple();
+ }
+
@Override
public Result getNextTuple() throws ExecException {
res = super.getNextTuple();
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java
index 2914128..99ebf1c 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java
@@ -129,7 +129,9 @@
finished[i] = !readers.get(i).next();
}
- this.readOnceOneBag = (numInputs == 1) && (pkgr instanceof CombinerPackager || pkgr instanceof LitePackager);
+ this.readOnceOneBag = (numInputs == 1)
+ && (pkgr instanceof CombinerPackager
+ || pkgr instanceof LitePackager || pkgr instanceof BloomPackager);
if (readOnceOneBag) {
readOnce[0] = true;
}
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/CombinerOptimizer.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/CombinerOptimizer.java
index a14a9b7..d0b994d 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/CombinerOptimizer.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/CombinerOptimizer.java
@@ -69,6 +69,11 @@
}
for (TezOperator from : predecessors) {
+ PhysicalPlan combinePlan = to.inEdges.get(from.getOperatorKey()).combinePlan;
+ if (!combinePlan.isEmpty()) {
+ // Cases like bloom join have combine plan already set
+ continue;
+ }
List<POLocalRearrangeTez> rearranges = PlanHelper.getPhysicalOperators(from.plan, POLocalRearrangeTez.class);
if (rearranges.isEmpty()) {
continue;
@@ -77,7 +82,7 @@
POLocalRearrangeTez connectingLR = null;
PhysicalPlan rearrangePlan = from.plan;
for (POLocalRearrangeTez lr : rearranges) {
- if (lr.getOutputKey().equals(to.getOperatorKey().toString())) {
+ if (lr.containsOutputKey(to.getOperatorKey().toString())) {
connectingLR = lr;
break;
}
@@ -90,7 +95,6 @@
// Detected the POLocalRearrange -> POPackage pattern. Let's add
// combiner if possible.
- PhysicalPlan combinePlan = to.inEdges.get(from.getOperatorKey()).combinePlan;
CombinerOptimizerUtil.addCombiner(rearrangePlan, to.plan, combinePlan, messageCollector, doMapAgg);
if(!combinePlan.isEmpty()) {
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
index 6e35c37..b9de7d0 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
@@ -123,8 +123,8 @@
boolean overrideRequestedParallelism = false;
if (parallelism != -1
&& autoParallelismEnabled
- && tezOp.isIntermediateReducer()
&& !tezOp.isDontEstimateParallelism()
+ && tezOp.isIntermediateReducer()
&& tezOp.isOverrideIntermediateParallelism()) {
overrideRequestedParallelism = true;
}
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java
index c56f83f..caf1786 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java
@@ -75,7 +75,7 @@
POLocalRearrangeTez connectingLR = null;
PhysicalPlan rearrangePlan = from.plan;
for (POLocalRearrangeTez lr : rearranges) {
- if (lr.getOutputKey().equals(to.getOperatorKey().toString())) {
+ if (lr.containsOutputKey(to.getOperatorKey().toString())) {
connectingLR = lr;
break;
}
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezEstimatedParallelismClearer.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezEstimatedParallelismClearer.java
index 5020df8..d17a75d 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezEstimatedParallelismClearer.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezEstimatedParallelismClearer.java
@@ -30,6 +30,8 @@
@Override
public void visitTezOp(TezOperator tezOp) throws VisitorException {
- tezOp.setEstimatedParallelism(-1);
+ if (!tezOp.isDontEstimateParallelism()) {
+ tezOp.setEstimatedParallelism(-1);
+ }
}
}
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
index c8afd61..d892e9e 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
@@ -97,16 +97,16 @@
bytesPerReducer = conf.getLong(PigReducerEstimator.BYTES_PER_REDUCER_PARAM, PigReducerEstimator.DEFAULT_BYTES_PER_REDUCER);
+ // If we have already estimated parallelism, use that one
+ if (tezOper.getEstimatedParallelism() != -1) {
+ return tezOper.getEstimatedParallelism();
+ }
+
// If parallelism is set explicitly, respect it
if (!tezOper.isIntermediateReducer() && tezOper.getRequestedParallelism()!=-1) {
return tezOper.getRequestedParallelism();
}
- // If we have already estimated parallelism, use that one
- if (tezOper.getEstimatedParallelism()!=-1) {
- return tezOper.getEstimatedParallelism();
- }
-
List<TezOperator> preds = plan.getPredecessors(tezOper);
if (preds==null) {
throw new IOException("Cannot estimate parallelism for source vertex");
diff --git a/src/org/apache/pig/newplan/logical/relational/LOJoin.java b/src/org/apache/pig/newplan/logical/relational/LOJoin.java
index d1d5d8c..5698bb5 100644
--- a/src/org/apache/pig/newplan/logical/relational/LOJoin.java
+++ b/src/org/apache/pig/newplan/logical/relational/LOJoin.java
@@ -38,6 +38,7 @@
*/
public static enum JOINTYPE {
HASH, // Hash Join
+ BLOOM, // Bloom Join
REPLICATED, // Fragment Replicated join
SKEWED, // Skewed Join
MERGE, // Sort Merge Join
diff --git a/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java b/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
index 99df7f0..7faa1fd 100644
--- a/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
+++ b/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
@@ -1414,7 +1414,7 @@
return;
}
- else if (loj.getJoinType() == LOJoin.JOINTYPE.HASH){
+ else if (loj.getJoinType() == LOJoin.JOINTYPE.HASH || loj.getJoinType() == LOJoin.JOINTYPE.BLOOM){
POPackage poPackage = compileToLR_GR_PackTrio(loj, loj.getCustomPartitioner(), innerFlags, loj.getExpressionPlans());
POForEach fe = compileFE4Flattening(innerFlags, scope, parallel, alias, location, inputs);
currentPlan.add(fe);
@@ -1425,7 +1425,20 @@
e.getErrorCode(),e.getErrorSource(),e);
}
logToPhyMap.put(loj, fe);
- poPackage.getPkgr().setPackageType(PackageType.JOIN);
+ if (loj.getJoinType() == LOJoin.JOINTYPE.BLOOM) {
+ if (innerFlags.length == 2) {
+ if (innerFlags[0] == false && innerFlags[1] == false) {
+ throw new LogicalToPhysicalTranslatorException(
+ "Error at " + loj.getLocation() + " with alias "+ loj.getAlias() +
+ ". Bloom join cannot be used with a FULL OUTER join.",
+ 1109,
+ PigException.INPUT);
+ }
+ }
+ poPackage.getPkgr().setPackageType(PackageType.BLOOMJOIN);
+ } else {
+ poPackage.getPkgr().setPackageType(PackageType.JOIN);
+ }
}
translateSoftLinks(loj);
}
diff --git a/src/org/apache/pig/parser/LogicalPlanBuilder.java b/src/org/apache/pig/parser/LogicalPlanBuilder.java
index 4cc2e18..b160585 100644
--- a/src/org/apache/pig/parser/LogicalPlanBuilder.java
+++ b/src/org/apache/pig/parser/LogicalPlanBuilder.java
@@ -1788,6 +1788,8 @@
return JOINTYPE.REPLICATED;
} else if( modifier.equalsIgnoreCase( "hash" ) || modifier.equalsIgnoreCase( "default" ) ) {
return LOJoin.JOINTYPE.HASH;
+ } else if( modifier.equalsIgnoreCase( "bloom" ) ) {
+ return LOJoin.JOINTYPE.BLOOM;
} else if( modifier.equalsIgnoreCase( "skewed" ) ) {
return JOINTYPE.SKEWED;
} else if (modifier.equalsIgnoreCase("merge")) {
@@ -1796,7 +1798,7 @@
return JOINTYPE.MERGESPARSE;
} else {
throw new ParserValidationException( intStream, loc,
- "Only REPL, REPLICATED, HASH, SKEWED, MERGE, and MERGE-SPARSE are vaild JOIN modifiers." );
+ "Only REPL, REPLICATED, HASH, BLOOM, SKEWED, MERGE, and MERGE-SPARSE are vaild JOIN modifiers." );
}
}
diff --git a/src/org/apache/pig/tools/pigstats/ScriptState.java b/src/org/apache/pig/tools/pigstats/ScriptState.java
index 6eff0d5..e755f1c 100644
--- a/src/org/apache/pig/tools/pigstats/ScriptState.java
+++ b/src/org/apache/pig/tools/pigstats/ScriptState.java
@@ -133,6 +133,8 @@
MERGE_SPARSE_JOIN,
REPLICATED_JOIN,
SKEWED_JOIN,
+ BUILD_BLOOM,
+ FILTER_BLOOM,
HASH_JOIN,
COLLECTED_GROUP,
MERGE_COGROUP,
@@ -312,7 +314,7 @@
maxScriptSize = Integer.valueOf(prop);
}
}
-
+
this.truncatedScript = (script.length() > maxScriptSize) ? script.substring(0, maxScriptSize)
: script;
@@ -485,6 +487,10 @@
public void visit(LOJoin op) {
if (op.getJoinType() == JOINTYPE.HASH) {
feature.set(PIG_FEATURE.HASH_JOIN.ordinal());
+ } else if (op.getJoinType() == JOINTYPE.BLOOM) {
+ feature.set(PIG_FEATURE.HASH_JOIN.ordinal());
+ feature.set(PIG_FEATURE.BUILD_BLOOM.ordinal());
+ feature.set(PIG_FEATURE.FILTER_BLOOM.ordinal());
} else if (op.getJoinType() == JOINTYPE.MERGE) {
feature.set(PIG_FEATURE.MERGE_JOIN.ordinal());
} else if (op.getJoinType() == JOINTYPE.MERGESPARSE) {
@@ -506,6 +512,7 @@
feature.set(PIG_FEATURE.RANK.ordinal());
}
+ @Override
public void visit(LOSort op) {
feature.set(PIG_FEATURE.ORDER_BY.ordinal());
}
diff --git a/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java b/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java
index fd4256b..716f079 100644
--- a/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java
+++ b/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java
@@ -275,6 +275,12 @@
if (tezOp.isRegularJoin()) {
feature.set(PIG_FEATURE.HASH_JOIN.ordinal());
}
+ if (tezOp.isBuildBloom()) {
+ feature.set(PIG_FEATURE.BUILD_BLOOM.ordinal());
+ }
+ if (tezOp.isFilterBloom()) {
+ feature.set(PIG_FEATURE.FILTER_BLOOM.ordinal());
+ }
if (tezOp.isUnion()) {
feature.set(PIG_FEATURE.UNION.ordinal());
}
diff --git a/test/e2e/pig/build.xml b/test/e2e/pig/build.xml
index a0dc32d..1ec9cf6 100644
--- a/test/e2e/pig/build.xml
+++ b/test/e2e/pig/build.xml
@@ -137,6 +137,7 @@
<path path="${test.location}/tests/multiquery.conf"/>
<path path="${test.location}/tests/negative.conf"/>
<path path="${test.location}/tests/nightly.conf"/>
+ <path path="${test.location}/tests/join.conf"/>
<path path="${test.location}/tests/streaming.conf"/>
<path path="${test.location}/tests/streaming_local.conf"/>
<path path="${test.location}/tests/turing_jython.conf"/>
diff --git a/test/e2e/pig/tests/join.conf b/test/e2e/pig/tests/join.conf
new file mode 100644
index 0000000..97f6c05
--- /dev/null
+++ b/test/e2e/pig/tests/join.conf
@@ -0,0 +1,310 @@
+#!/usr/bin/env perl
+############################################################################
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+###############################################################################
+
+$cfg = {
+ 'driver' => 'Pig',
+
+ 'groups' => [
+ {
+ 'name' => 'BloomJoin_Map',
+ 'execonly' => 'tez',
+ 'tests' => [
+ {
+ # Tuple join key
+ 'num' => 1,
+ 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions);
+--c = filter a by age < 20;
+--d = filter b by age < 20;
+e = join a by (name, age), b by (name, age) using 'bloom';
+store e into ':OUTPATH:';\,
+ 'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions);
+--c = filter a by age < 20;
+--d = filter b by age < 20;
+e = join a by (name, age), b by (name, age);
+store e into ':OUTPATH:';\,
+ },
+ {
+ # bytearray join key
+ 'num' => 2,
+ 'pig' => q\
+SET mapreduce.input.fileinputformat.split.maxsize '50000';
+SET pig.splitCombination false;
+a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions);
+c = filter a by age < 20;
+d = filter b by age < 20;
+e = join c by name, d by name using 'bloom';
+store e into ':OUTPATH:';\,
+ 'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions);
+c = filter a by age < 20;
+d = filter b by age < 20;
+e = join c by name, d by name;
+store e into ':OUTPATH:';\,
+ },
+ {
+ # Left outer join and chararray join key
+ 'num' => 3,
+ 'pig' => q\
+SET mapreduce.input.fileinputformat.split.maxsize '50000';
+SET pig.splitCombination false;
+a = load ':INPATH:/singlefile/studentnulltab10k' as (name:chararray, age, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name:chararray, age, registration, contributions);
+c = join a by name left, b by name using 'bloom';
+d = foreach c generate a::name, a::age, gpa, registration, contributions;
+store d into ':OUTPATH:';\,
+ 'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name:chararray, age, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name:chararray, age, registration, contributions);
+c = join a by name left, b by name;
+d = foreach c generate a::name, a::age, gpa, registration, contributions;
+store d into ':OUTPATH:';\,
+ },
+ {
+ # Right outer join
+ 'num' => 4,
+ 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name:chararray, age:int, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name:chararray, age:int, registration, contributions);
+c = join a by (name,age) right, b by (name,age) using 'bloom';
+store c into ':OUTPATH:';\,
+ 'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name:chararray, age:int, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name:chararray, age:int, registration, contributions);
+c = join a by (name,age) right, b by (name,age);
+store c into ':OUTPATH:';\,
+ },
+ {
+ # Left input from a union
+ 'num' => 5,
+ 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa);
+b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name:chararray, age:int, gpa);
+c = union a, b;
+d = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
+d = filter d by age > 60;
+e = join c by name, d by name using 'bloom' PARALLEL 3;
+store e into ':OUTPATH:';\,
+ 'verify_pig_script' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa);
+b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name:chararray, age:int, gpa);
+c = union a, b;
+d = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
+d = filter d by age > 60;
+e = join c by name, d by name;
+store e into ':OUTPATH:';\,
+ },
+ {
+ # Right input from a union and integer join key
+ 'num' => 6,
+ 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa);
+b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name:chararray, age:int, gpa);
+c = union a, b;
+c = filter c by age > 75;
+d = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
+e = join d by age, c by age using 'bloom' PARALLEL 3;
+store e into ':OUTPATH:';\,
+ 'verify_pig_script' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa);
+b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name:chararray, age:int, gpa);
+c = union a, b;
+c = filter c by age > 75;
+d = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
+e = join d by age, c by age;
+store e into ':OUTPATH:';\,
+ },
+ {
+ # Left input from a split
+ 'num' => 7,
+ 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age:int, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name, age:int, registration, contributions);
+b = filter b by age > 75;
+c = filter a by age > 50;
+d = join a by age, b by age using 'bloom';
+store c into ':OUTPATH:.1';
+store d into ':OUTPATH:.2';\,
+ 'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age:int, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name, age:int, registration, contributions);
+b = filter b by age > 75;
+c = filter a by age > 50;
+d = join a by age, b by age;
+store c into ':OUTPATH:.1';
+store d into ':OUTPATH:.2';\,
+ },
+ {
+ # Right input from a split
+ 'num' => 8,
+ 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age:int, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name, age:int, registration, contributions);
+c = filter a by age > 75;
+d = filter a by name == 'nick miller';
+e = join b by age, c by age using 'bloom';
+store d into ':OUTPATH:.1';
+store e into ':OUTPATH:.2';\,
+ 'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age:int, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name, age:int, registration, contributions);
+c = filter a by age > 75;
+d = filter a by name == 'nick miller';
+e = join b by age, c by age;
+store d into ':OUTPATH:.1';
+store e into ':OUTPATH:.2';\,
+ },
+ ] # end of tests
+ },
+ {
+ 'name' => 'BloomJoin_Reduce',
+ 'execonly' => 'tez',
+ 'java_params' => ['-Dpig.bloomjoin.strategy=reduce'],
+ 'tests' => [
+ {
+ # Tuple join key
+ 'num' => 1,
+ 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions);
+--c = filter a by age < 20;
+--d = filter b by age < 20;
+e = join a by (name, age), b by (name, age) using 'bloom';
+store e into ':OUTPATH:';\,
+ 'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions);
+--c = filter a by age < 20;
+--d = filter b by age < 20;
+e = join a by (name, age), b by (name, age);
+store e into ':OUTPATH:';\,
+ },
+ {
+ # bytearray join key
+ 'num' => 2,
+ 'pig' => q\
+SET mapreduce.input.fileinputformat.split.maxsize '50000';
+SET pig.splitCombination false;
+a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions);
+c = filter a by age < 20;
+d = filter b by age < 20;
+e = join c by name, d by name using 'bloom';
+store e into ':OUTPATH:';\,
+ 'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions);
+c = filter a by age < 20;
+d = filter b by age < 20;
+e = join c by name, d by name;
+store e into ':OUTPATH:';\,
+ },
+ {
+ # Left outer join and chararray join key
+ 'num' => 3,
+ 'pig' => q\
+SET mapreduce.input.fileinputformat.split.maxsize '50000';
+SET pig.splitCombination false;
+a = load ':INPATH:/singlefile/studentnulltab10k' as (name:chararray, age, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name:chararray, age, registration, contributions);
+c = join a by name left, b by name using 'bloom';
+d = foreach c generate a::name, a::age, gpa, registration, contributions;
+store d into ':OUTPATH:';\,
+ 'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name:chararray, age, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name:chararray, age, registration, contributions);
+c = join a by name left, b by name;
+d = foreach c generate a::name, a::age, gpa, registration, contributions;
+store d into ':OUTPATH:';\,
+ },
+ {
+ # Right outer join
+ 'num' => 4,
+ 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name:chararray, age:int, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name:chararray, age:int, registration, contributions);
+c = join a by (name,age) right, b by (name,age) using 'bloom';
+store c into ':OUTPATH:';\,
+ 'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name:chararray, age:int, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name:chararray, age:int, registration, contributions);
+c = join a by (name,age) right, b by (name,age);
+store c into ':OUTPATH:';\,
+ },
+ {
+ # Left input from a union
+ 'num' => 5,
+ 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa);
+b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name:chararray, age:int, gpa);
+c = union a, b;
+d = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
+d = filter d by age > 60;
+e = join c by name, d by name using 'bloom' PARALLEL 3;
+store e into ':OUTPATH:';\,
+ 'verify_pig_script' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa);
+b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name:chararray, age:int, gpa);
+c = union a, b;
+d = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
+d = filter d by age > 60;
+e = join c by name, d by name;
+store e into ':OUTPATH:';\,
+ },
+ {
+ # Right input from a union and integer join key
+ 'num' => 6,
+ 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa);
+b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name:chararray, age:int, gpa);
+c = union a, b;
+c = filter c by age > 75;
+d = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
+e = join d by age, c by age using 'bloom' PARALLEL 3;
+store e into ':OUTPATH:';\,
+ 'verify_pig_script' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa);
+b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name:chararray, age:int, gpa);
+c = union a, b;
+c = filter c by age > 75;
+d = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
+e = join d by age, c by age;
+store e into ':OUTPATH:';\,
+ },
+ {
+ # Left input from a split
+ 'num' => 7,
+ 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age:int, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name, age:int, registration, contributions);
+b = filter b by age > 75;
+c = filter a by age > 50;
+d = join a by age, b by age using 'bloom';
+store c into ':OUTPATH:.1';
+store d into ':OUTPATH:.2';\,
+ 'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age:int, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name, age:int, registration, contributions);
+b = filter b by age > 75;
+c = filter a by age > 50;
+d = join a by age, b by age;
+store c into ':OUTPATH:.1';
+store d into ':OUTPATH:.2';\,
+ },
+ {
+ # Right input from a split
+ 'num' => 8,
+ 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age:int, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name, age:int, registration, contributions);
+c = filter a by age > 75;
+d = filter a by name == 'nick miller';
+e = join b by age, c by age using 'bloom';
+store d into ':OUTPATH:.1';
+store e into ':OUTPATH:.2';\,
+ 'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age:int, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name, age:int, registration, contributions);
+c = filter a by age > 75;
+d = filter a by name == 'nick miller';
+e = join b by age, c by age;
+store d into ':OUTPATH:.1';
+store e into ':OUTPATH:.2';\,
+ },
+ ] # end of tests
+ }
+ ] # end of groups
+};
\ No newline at end of file
diff --git a/test/e2e/pig/tests/multiquery.conf b/test/e2e/pig/tests/multiquery.conf
index 4bef81a..667659a 100644
--- a/test/e2e/pig/tests/multiquery.conf
+++ b/test/e2e/pig/tests/multiquery.conf
@@ -906,7 +906,38 @@
n = JOIN a BY name, m BY name;
store n into ':OUTPATH:';\,
- }
+ },
+ {
+ # Self join bloom left outer
+ 'num' => 12,
+ 'execonly' => 'tez',
+ 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
+b = filter a by gpa >= 3.9;
+c = filter a by gpa > 3;
+d = join b by name left outer, c by name using 'bloom';
+store d into ':OUTPATH:';\,
+ 'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
+b = filter a by gpa >= 3.9;
+c = filter a by gpa > 3;
+d = join b by name left outer, c by name;
+store d into ':OUTPATH:';\,
+ },
+ {
+ # Self join bloom left outer with strategy as reduce
+ 'num' => 13,
+ 'execonly' => 'tez',
+ 'java_params' => ['-Dpig.bloomjoin.strategy=reduce'],
+ 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
+b = filter a by gpa >= 3.9;
+c = filter a by gpa > 3;
+d = join b by name left outer, c by name using 'bloom';
+store d into ':OUTPATH:';\,
+ 'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
+b = filter a by gpa >= 3.9;
+c = filter a by gpa > 3;
+d = join b by name left outer, c by name;
+store d into ':OUTPATH:';\,
+ },
] # end of tests
},
diff --git a/test/e2e/pig/tests/orc.conf b/test/e2e/pig/tests/orc.conf
index 6277586..9498d88 100644
--- a/test/e2e/pig/tests/orc.conf
+++ b/test/e2e/pig/tests/orc.conf
@@ -1,3 +1,21 @@
+#!/usr/bin/env perl
+############################################################################
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+###############################################################################
$cfg = {
'driver' => 'Pig',
'nummachines' => 5,
diff --git a/test/org/apache/pig/test/TestEmptyInputDir.java b/test/org/apache/pig/test/TestEmptyInputDir.java
index ffeb34b..a9a46af 100644
--- a/test/org/apache/pig/test/TestEmptyInputDir.java
+++ b/test/org/apache/pig/test/TestEmptyInputDir.java
@@ -246,6 +246,66 @@
}
}
+ @Test
+ public void testBloomJoin() throws Exception {
+ PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
+ w.println("A = load '" + INPUT_FILE + "' as (x:int);");
+ w.println("B = load '" + EMPTY_DIR + "' as (x:int);");
+ w.println("C = join B by $0, A by $0 using 'bloom';");
+ w.println("D = join A by $0, B by $0 using 'bloom';");
+ w.println("store C into '" + OUTPUT_FILE + "';");
+ w.println("store D into 'output1';");
+ w.close();
+
+ try {
+ String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, };
+ PigStats stats = PigRunner.run(args, null);
+
+ assertTrue(stats.isSuccessful());
+ assertEquals(0, stats.getNumberRecords(OUTPUT_FILE));
+ assertEquals(0, stats.getNumberRecords("output1"));
+ assertEmptyOutputFile();
+ } finally {
+ new File(PIG_FILE).delete();
+ Util.deleteFile(cluster, OUTPUT_FILE);
+ Util.deleteFile(cluster, "output1");
+ }
+ }
+
+ @Test
+ public void testBloomJoinOuter() throws Exception {
+ PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
+ w.println("A = load '" + INPUT_FILE + "' as (x:int);");
+ w.println("B = load '" + EMPTY_DIR + "' as (x:int);");
+ w.println("C = join B by $0 left outer, A by $0 using 'bloom';");
+ w.println("D = join A by $0 left outer, B by $0 using 'bloom';");
+ w.println("E = join B by $0 right outer, A by $0 using 'bloom';");
+ w.println("F = join A by $0 right outer, B by $0 using 'bloom';");
+ w.println("store C into '" + OUTPUT_FILE + "';");
+ w.println("store D into 'output1';");
+ w.println("store E into 'output2';");
+ w.println("store F into 'output3';");
+ w.close();
+
+ try {
+ String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, };
+ PigStats stats = PigRunner.run(args, null);
+
+ assertTrue(stats.isSuccessful());
+ assertEquals(0, stats.getNumberRecords(OUTPUT_FILE));
+ assertEquals(2, stats.getNumberRecords("output1"));
+ assertEquals(2, stats.getNumberRecords("output2"));
+ assertEquals(0, stats.getNumberRecords("output3"));
+ assertEmptyOutputFile();
+ } finally {
+ new File(PIG_FILE).delete();
+ Util.deleteFile(cluster, OUTPUT_FILE);
+ Util.deleteFile(cluster, "output1");
+ Util.deleteFile(cluster, "output2");
+ Util.deleteFile(cluster, "output3");
+ }
+ }
+
private void assertEmptyOutputFile() throws IllegalArgumentException, IOException {
FileSystem fs = cluster.getFileSystem();
FileStatus status = fs.getFileStatus(new Path(OUTPUT_FILE));
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-1-KeyToReducer.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-1-KeyToReducer.gld
new file mode 100644
index 0000000..c767523
--- /dev/null
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-1-KeyToReducer.gld
@@ -0,0 +1,91 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-48 -> Tez vertex scope-49,Tez vertex scope-50,
+Tez vertex scope-50 -> Tez vertex scope-46,Tez vertex scope-47,
+Tez vertex scope-46 -> Tez vertex scope-49,
+Tez vertex scope-47 -> Tez vertex scope-49,
+Tez vertex scope-49
+
+Tez vertex scope-48
+# Plan on vertex
+d: BuildBloom Rearrange[tuple]{bytearray}(false) - scope-30 -> [ scope-49, scope-50]
+| |
+| Project[bytearray][0] - scope-31
+|
+|---c: New For Each(false,false)[bag] - scope-20
+ | |
+ | Project[bytearray][0] - scope-15
+ | |
+ | Cast[int] - scope-18
+ | |
+ | |---Project[bytearray][1] - scope-17
+ |
+ |---c: Load(file:///tmp/input2:org.apache.pig.builtin.PigStorage) - scope-14
+Tez vertex scope-50
+# Combine plan on edge <scope-48>
+Local Rearrange[tuple]{int}(false) - scope-55 -> scope-50
+| |
+| Project[int][0] - scope-54
+|
+|---Package(BloomPackager)[tuple]{int} - scope-53
+# Plan on vertex
+POValueOutputTez - scope-52 -> [scope-46, scope-47]
+|
+|---Package(BloomPackager)[tuple]{int} - scope-51
+Tez vertex scope-46
+# Plan on vertex
+d: BloomFilter Rearrange[tuple]{bytearray}(false) - scope-26 <- scope-50 -> scope-49
+| |
+| Project[bytearray][0] - scope-27
+|
+|---b: New For Each(false,false)[bag] - scope-6
+ | |
+ | Project[bytearray][0] - scope-1
+ | |
+ | Cast[int] - scope-4
+ | |
+ | |---Project[bytearray][1] - scope-3
+ |
+ |---b: Load(file:///tmp/input2:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-47
+# Plan on vertex
+d: BloomFilter Rearrange[tuple]{bytearray}(false) - scope-28 <- scope-50 -> scope-49
+| |
+| Project[bytearray][0] - scope-29
+|
+|---a: New For Each(false,false)[bag] - scope-13
+ | |
+ | Project[bytearray][0] - scope-8
+ | |
+ | Cast[int] - scope-11
+ | |
+ | |---Project[bytearray][1] - scope-10
+ |
+ |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-7
+Tez vertex scope-49
+# Plan on vertex
+e: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-45
+|
+|---e: New For Each(false,false,false,false)[bag] - scope-44
+ | |
+ | Project[bytearray][2] - scope-36
+ | |
+ | Project[int][3] - scope-38
+ | |
+ | Project[int][1] - scope-40
+ | |
+ | Project[int][5] - scope-42
+ |
+ |---d: New For Each(true,true,true)[tuple] - scope-35
+ | |
+ | Project[bag][1] - scope-32
+ | |
+ | Project[bag][2] - scope-33
+ | |
+ | Project[bag][3] - scope-34
+ |
+ |---d: Package(Packager)[tuple]{bytearray} - scope-25
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-1.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-1.gld
new file mode 100644
index 0000000..c767523
--- /dev/null
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-1.gld
@@ -0,0 +1,91 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-48 -> Tez vertex scope-49,Tez vertex scope-50,
+Tez vertex scope-50 -> Tez vertex scope-46,Tez vertex scope-47,
+Tez vertex scope-46 -> Tez vertex scope-49,
+Tez vertex scope-47 -> Tez vertex scope-49,
+Tez vertex scope-49
+
+Tez vertex scope-48
+# Plan on vertex
+d: BuildBloom Rearrange[tuple]{bytearray}(false) - scope-30 -> [ scope-49, scope-50]
+| |
+| Project[bytearray][0] - scope-31
+|
+|---c: New For Each(false,false)[bag] - scope-20
+ | |
+ | Project[bytearray][0] - scope-15
+ | |
+ | Cast[int] - scope-18
+ | |
+ | |---Project[bytearray][1] - scope-17
+ |
+ |---c: Load(file:///tmp/input2:org.apache.pig.builtin.PigStorage) - scope-14
+Tez vertex scope-50
+# Combine plan on edge <scope-48>
+Local Rearrange[tuple]{int}(false) - scope-55 -> scope-50
+| |
+| Project[int][0] - scope-54
+|
+|---Package(BloomPackager)[tuple]{int} - scope-53
+# Plan on vertex
+POValueOutputTez - scope-52 -> [scope-46, scope-47]
+|
+|---Package(BloomPackager)[tuple]{int} - scope-51
+Tez vertex scope-46
+# Plan on vertex
+d: BloomFilter Rearrange[tuple]{bytearray}(false) - scope-26 <- scope-50 -> scope-49
+| |
+| Project[bytearray][0] - scope-27
+|
+|---b: New For Each(false,false)[bag] - scope-6
+ | |
+ | Project[bytearray][0] - scope-1
+ | |
+ | Cast[int] - scope-4
+ | |
+ | |---Project[bytearray][1] - scope-3
+ |
+ |---b: Load(file:///tmp/input2:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-47
+# Plan on vertex
+d: BloomFilter Rearrange[tuple]{bytearray}(false) - scope-28 <- scope-50 -> scope-49
+| |
+| Project[bytearray][0] - scope-29
+|
+|---a: New For Each(false,false)[bag] - scope-13
+ | |
+ | Project[bytearray][0] - scope-8
+ | |
+ | Cast[int] - scope-11
+ | |
+ | |---Project[bytearray][1] - scope-10
+ |
+ |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-7
+Tez vertex scope-49
+# Plan on vertex
+e: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-45
+|
+|---e: New For Each(false,false,false,false)[bag] - scope-44
+ | |
+ | Project[bytearray][2] - scope-36
+ | |
+ | Project[int][3] - scope-38
+ | |
+ | Project[int][1] - scope-40
+ | |
+ | Project[int][5] - scope-42
+ |
+ |---d: New For Each(true,true,true)[tuple] - scope-35
+ | |
+ | Project[bag][1] - scope-32
+ | |
+ | Project[bag][2] - scope-33
+ | |
+ | Project[bag][3] - scope-34
+ |
+ |---d: Package(Packager)[tuple]{bytearray} - scope-25
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-2-KeyToReducer.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-2-KeyToReducer.gld
new file mode 100644
index 0000000..de14a55
--- /dev/null
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-2-KeyToReducer.gld
@@ -0,0 +1,83 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-39 -> Tez vertex scope-41,Tez vertex scope-42,
+Tez vertex scope-42 -> Tez vertex scope-40,
+Tez vertex scope-40 -> Tez vertex scope-41,
+Tez vertex scope-41
+
+Tez vertex scope-39
+# Plan on vertex
+d: BuildBloom Rearrange[tuple]{chararray}(false) - scope-20 -> [ scope-41, scope-42]
+| |
+| Project[chararray][0] - scope-21
+|
+|---a: New For Each(false,false)[bag] - scope-7
+ | |
+ | Cast[chararray] - scope-2
+ | |
+ | |---Project[bytearray][0] - scope-1
+ | |
+ | Cast[int] - scope-5
+ | |
+ | |---Project[bytearray][1] - scope-4
+ |
+ |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-42
+# Combine plan on edge <scope-39>
+Local Rearrange[tuple]{int}(false) - scope-47 -> scope-42
+| |
+| Project[int][0] - scope-46
+|
+|---Package(BloomPackager)[tuple]{int} - scope-45
+# Plan on vertex
+POValueOutputTez - scope-44 -> [scope-40]
+|
+|---Package(BloomPackager)[tuple]{int} - scope-43
+Tez vertex scope-40
+# Plan on vertex
+d: BloomFilter Rearrange[tuple]{chararray}(false) - scope-22 <- scope-42 -> scope-41
+| |
+| Project[chararray][0] - scope-23
+|
+|---b: New For Each(false,false)[bag] - scope-15
+ | |
+ | Cast[chararray] - scope-10
+ | |
+ | |---Project[bytearray][0] - scope-9
+ | |
+ | Cast[int] - scope-13
+ | |
+ | |---Project[bytearray][1] - scope-12
+ |
+ |---b: Load(file:///tmp/input2:org.apache.pig.builtin.PigStorage) - scope-8
+Tez vertex scope-41
+# Plan on vertex
+e: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-38
+|
+|---e: New For Each(false,false,false)[bag] - scope-37
+ | |
+ | Project[chararray][0] - scope-31
+ | |
+ | Project[int][1] - scope-33
+ | |
+ | Project[int][3] - scope-35
+ |
+ |---d: New For Each(true,true)[tuple] - scope-30
+ | |
+ | Project[bag][1] - scope-24
+ | |
+ | POBinCond[bag] - scope-29
+ | |
+ | |---Project[bag][2] - scope-25
+ | |
+ | |---POUserFunc(org.apache.pig.builtin.IsEmpty)[boolean] - scope-27
+ | | |
+ | | |---Project[bag][2] - scope-26
+ | |
+ | |---Constant({(,)}) - scope-28
+ |
+ |---d: Package(Packager)[tuple]{chararray} - scope-19
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-2.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-2.gld
new file mode 100644
index 0000000..de14a55
--- /dev/null
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-2.gld
@@ -0,0 +1,83 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-39 -> Tez vertex scope-41,Tez vertex scope-42,
+Tez vertex scope-42 -> Tez vertex scope-40,
+Tez vertex scope-40 -> Tez vertex scope-41,
+Tez vertex scope-41
+
+Tez vertex scope-39
+# Plan on vertex
+d: BuildBloom Rearrange[tuple]{chararray}(false) - scope-20 -> [ scope-41, scope-42]
+| |
+| Project[chararray][0] - scope-21
+|
+|---a: New For Each(false,false)[bag] - scope-7
+ | |
+ | Cast[chararray] - scope-2
+ | |
+ | |---Project[bytearray][0] - scope-1
+ | |
+ | Cast[int] - scope-5
+ | |
+ | |---Project[bytearray][1] - scope-4
+ |
+ |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-42
+# Combine plan on edge <scope-39>
+Local Rearrange[tuple]{int}(false) - scope-47 -> scope-42
+| |
+| Project[int][0] - scope-46
+|
+|---Package(BloomPackager)[tuple]{int} - scope-45
+# Plan on vertex
+POValueOutputTez - scope-44 -> [scope-40]
+|
+|---Package(BloomPackager)[tuple]{int} - scope-43
+Tez vertex scope-40
+# Plan on vertex
+d: BloomFilter Rearrange[tuple]{chararray}(false) - scope-22 <- scope-42 -> scope-41
+| |
+| Project[chararray][0] - scope-23
+|
+|---b: New For Each(false,false)[bag] - scope-15
+ | |
+ | Cast[chararray] - scope-10
+ | |
+ | |---Project[bytearray][0] - scope-9
+ | |
+ | Cast[int] - scope-13
+ | |
+ | |---Project[bytearray][1] - scope-12
+ |
+ |---b: Load(file:///tmp/input2:org.apache.pig.builtin.PigStorage) - scope-8
+Tez vertex scope-41
+# Plan on vertex
+e: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-38
+|
+|---e: New For Each(false,false,false)[bag] - scope-37
+ | |
+ | Project[chararray][0] - scope-31
+ | |
+ | Project[int][1] - scope-33
+ | |
+ | Project[int][3] - scope-35
+ |
+ |---d: New For Each(true,true)[tuple] - scope-30
+ | |
+ | Project[bag][1] - scope-24
+ | |
+ | POBinCond[bag] - scope-29
+ | |
+ | |---Project[bag][2] - scope-25
+ | |
+ | |---POUserFunc(org.apache.pig.builtin.IsEmpty)[boolean] - scope-27
+ | | |
+ | | |---Project[bag][2] - scope-26
+ | |
+ | |---Constant({(,)}) - scope-28
+ |
+ |---d: Package(Packager)[tuple]{chararray} - scope-19
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-3-KeyToReducer.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-3-KeyToReducer.gld
new file mode 100644
index 0000000..72a9b16
--- /dev/null
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-3-KeyToReducer.gld
@@ -0,0 +1,105 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-45 -> Tez vertex group scope-58,Tez vertex group scope-59,
+Tez vertex scope-46 -> Tez vertex group scope-58,Tez vertex group scope-59,
+Tez vertex group scope-59 -> Tez vertex scope-52,
+Tez vertex scope-52 -> Tez vertex scope-44,
+Tez vertex scope-44 -> Tez vertex scope-51,
+Tez vertex group scope-58 -> Tez vertex scope-51,
+Tez vertex scope-51
+
+Tez vertex scope-45
+# Plan on vertex
+d: BuildBloom Rearrange[tuple]{int}(false) - scope-60 -> [ scope-51, scope-52]
+| |
+| Project[int][0] - scope-61
+|
+|---b: New For Each(false,false)[bag] - scope-15
+ | |
+ | Cast[int] - scope-10
+ | |
+ | |---Project[bytearray][0] - scope-9
+ | |
+ | Cast[int] - scope-13
+ | |
+ | |---Project[bytearray][1] - scope-12
+ |
+ |---b: Load(file:///tmp/input2:org.apache.pig.builtin.PigStorage) - scope-8
+Tez vertex scope-46
+# Plan on vertex
+d: BuildBloom Rearrange[tuple]{int}(false) - scope-62 -> [ scope-51, scope-52]
+| |
+| Project[int][0] - scope-63
+|
+|---c: New For Each(false,false)[bag] - scope-23
+ | |
+ | Cast[int] - scope-18
+ | |
+ | |---Project[bytearray][0] - scope-17
+ | |
+ | Cast[int] - scope-21
+ | |
+ | |---Project[bytearray][1] - scope-20
+ |
+ |---c: Load(file:///tmp/input3:org.apache.pig.builtin.PigStorage) - scope-16
+Tez vertex group scope-59 <- [scope-45, scope-46] -> scope-52
+# No plan on vertex group
+Tez vertex scope-52
+# Combine plan on edge <scope-45>
+Local Rearrange[tuple]{int}(false) - scope-57 -> scope-52
+| |
+| Project[int][0] - scope-56
+|
+|---Package(BloomPackager)[tuple]{int} - scope-55
+# Combine plan on edge <scope-46>
+Local Rearrange[tuple]{int}(false) - scope-57 -> scope-52
+| |
+| Project[int][0] - scope-56
+|
+|---Package(BloomPackager)[tuple]{int} - scope-55
+# Plan on vertex
+POValueOutputTez - scope-54 -> [scope-44]
+|
+|---Package(BloomPackager)[tuple]{int} - scope-53
+Tez vertex scope-44
+# Plan on vertex
+d: BloomFilter Rearrange[tuple]{int}(false) - scope-29 <- scope-52 -> scope-51
+| |
+| Project[int][0] - scope-30
+|
+|---a: New For Each(false,false)[bag] - scope-7
+ | |
+ | Cast[int] - scope-2
+ | |
+ | |---Project[bytearray][0] - scope-1
+ | |
+ | Cast[int] - scope-5
+ | |
+ | |---Project[bytearray][1] - scope-4
+ |
+ |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex group scope-58 <- [scope-45, scope-46] -> scope-51
+# No plan on vertex group
+Tez vertex scope-51
+# Plan on vertex
+e: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-43
+|
+|---e: New For Each(false,false,false)[bag] - scope-42
+ | |
+ | Project[int][0] - scope-36
+ | |
+ | Project[int][1] - scope-38
+ | |
+ | Project[int][3] - scope-40
+ |
+ |---d: New For Each(true,true)[tuple] - scope-35
+ | |
+ | Project[bag][1] - scope-33
+ | |
+ | Project[bag][2] - scope-34
+ |
+ |---d: Package(Packager)[tuple]{int} - scope-28
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-3.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-3.gld
new file mode 100644
index 0000000..72a9b16
--- /dev/null
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-3.gld
@@ -0,0 +1,105 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-45 -> Tez vertex group scope-58,Tez vertex group scope-59,
+Tez vertex scope-46 -> Tez vertex group scope-58,Tez vertex group scope-59,
+Tez vertex group scope-59 -> Tez vertex scope-52,
+Tez vertex scope-52 -> Tez vertex scope-44,
+Tez vertex scope-44 -> Tez vertex scope-51,
+Tez vertex group scope-58 -> Tez vertex scope-51,
+Tez vertex scope-51
+
+Tez vertex scope-45
+# Plan on vertex
+d: BuildBloom Rearrange[tuple]{int}(false) - scope-60 -> [ scope-51, scope-52]
+| |
+| Project[int][0] - scope-61
+|
+|---b: New For Each(false,false)[bag] - scope-15
+ | |
+ | Cast[int] - scope-10
+ | |
+ | |---Project[bytearray][0] - scope-9
+ | |
+ | Cast[int] - scope-13
+ | |
+ | |---Project[bytearray][1] - scope-12
+ |
+ |---b: Load(file:///tmp/input2:org.apache.pig.builtin.PigStorage) - scope-8
+Tez vertex scope-46
+# Plan on vertex
+d: BuildBloom Rearrange[tuple]{int}(false) - scope-62 -> [ scope-51, scope-52]
+| |
+| Project[int][0] - scope-63
+|
+|---c: New For Each(false,false)[bag] - scope-23
+ | |
+ | Cast[int] - scope-18
+ | |
+ | |---Project[bytearray][0] - scope-17
+ | |
+ | Cast[int] - scope-21
+ | |
+ | |---Project[bytearray][1] - scope-20
+ |
+ |---c: Load(file:///tmp/input3:org.apache.pig.builtin.PigStorage) - scope-16
+Tez vertex group scope-59 <- [scope-45, scope-46] -> scope-52
+# No plan on vertex group
+Tez vertex scope-52
+# Combine plan on edge <scope-45>
+Local Rearrange[tuple]{int}(false) - scope-57 -> scope-52
+| |
+| Project[int][0] - scope-56
+|
+|---Package(BloomPackager)[tuple]{int} - scope-55
+# Combine plan on edge <scope-46>
+Local Rearrange[tuple]{int}(false) - scope-57 -> scope-52
+| |
+| Project[int][0] - scope-56
+|
+|---Package(BloomPackager)[tuple]{int} - scope-55
+# Plan on vertex
+POValueOutputTez - scope-54 -> [scope-44]
+|
+|---Package(BloomPackager)[tuple]{int} - scope-53
+Tez vertex scope-44
+# Plan on vertex
+d: BloomFilter Rearrange[tuple]{int}(false) - scope-29 <- scope-52 -> scope-51
+| |
+| Project[int][0] - scope-30
+|
+|---a: New For Each(false,false)[bag] - scope-7
+ | |
+ | Cast[int] - scope-2
+ | |
+ | |---Project[bytearray][0] - scope-1
+ | |
+ | Cast[int] - scope-5
+ | |
+ | |---Project[bytearray][1] - scope-4
+ |
+ |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex group scope-58 <- [scope-45, scope-46] -> scope-51
+# No plan on vertex group
+Tez vertex scope-51
+# Plan on vertex
+e: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-43
+|
+|---e: New For Each(false,false,false)[bag] - scope-42
+ | |
+ | Project[int][0] - scope-36
+ | |
+ | Project[int][1] - scope-38
+ | |
+ | Project[int][3] - scope-40
+ |
+ |---d: New For Each(true,true)[tuple] - scope-35
+ | |
+ | Project[bag][1] - scope-33
+ | |
+ | Project[bag][2] - scope-34
+ |
+ |---d: Package(Packager)[tuple]{int} - scope-28
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-4-KeyToReducer.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-4-KeyToReducer.gld
new file mode 100644
index 0000000..928b9a5
--- /dev/null
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-4-KeyToReducer.gld
@@ -0,0 +1,97 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-44 -> Tez vertex scope-46,
+Tez vertex scope-45 -> Tez vertex scope-46,
+Tez vertex scope-50 -> Tez vertex scope-51,Tez vertex scope-52,
+Tez vertex scope-52 -> Tez vertex scope-46,
+Tez vertex scope-46 -> Tez vertex scope-51,
+Tez vertex scope-51
+
+Tez vertex scope-44
+# Plan on vertex
+POValueOutputTez - scope-48 -> [scope-46]
+|
+|---b: New For Each(false,false)[bag] - scope-7
+ | |
+ | Cast[int] - scope-2
+ | |
+ | |---Project[bytearray][0] - scope-1
+ | |
+ | Cast[int] - scope-5
+ | |
+ | |---Project[bytearray][1] - scope-4
+ |
+ |---b: Load(file:///tmp/input2:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-45
+# Plan on vertex
+POValueOutputTez - scope-49 -> [scope-46]
+|
+|---c: New For Each(false,false)[bag] - scope-15
+ | |
+ | Cast[int] - scope-10
+ | |
+ | |---Project[bytearray][0] - scope-9
+ | |
+ | Cast[int] - scope-13
+ | |
+ | |---Project[bytearray][1] - scope-12
+ |
+ |---c: Load(file:///tmp/input3:org.apache.pig.builtin.PigStorage) - scope-8
+Tez vertex scope-50
+# Plan on vertex
+d: BuildBloom Rearrange[tuple]{int}(false) - scope-31 -> [ scope-51, scope-52]
+| |
+| Project[int][0] - scope-32
+|
+|---a: New For Each(false,false)[bag] - scope-24
+ | |
+ | Cast[int] - scope-19
+ | |
+ | |---Project[bytearray][0] - scope-18
+ | |
+ | Cast[int] - scope-22
+ | |
+ | |---Project[bytearray][1] - scope-21
+ |
+ |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-17
+Tez vertex scope-52
+# Combine plan on edge <scope-50>
+Local Rearrange[tuple]{int}(false) - scope-57 -> scope-52
+| |
+| Project[int][0] - scope-56
+|
+|---Package(BloomPackager)[tuple]{int} - scope-55
+# Plan on vertex
+POValueOutputTez - scope-54 -> [scope-46]
+|
+|---Package(BloomPackager)[tuple]{int} - scope-53
+Tez vertex scope-46
+# Plan on vertex
+d: BloomFilter Rearrange[tuple]{int}(false) - scope-29 <- scope-52 -> scope-51
+| |
+| Project[int][0] - scope-30
+|
+|---POShuffledValueInputTez - scope-47 <- [scope-44, scope-45]
+Tez vertex scope-51
+# Plan on vertex
+e: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-43
+|
+|---e: New For Each(false,false,false)[bag] - scope-42
+ | |
+ | Project[int][2] - scope-36
+ | |
+ | Project[int][3] - scope-38
+ | |
+ | Project[int][1] - scope-40
+ |
+ |---d: New For Each(true,true)[tuple] - scope-35
+ | |
+ | Project[bag][1] - scope-33
+ | |
+ | Project[bag][2] - scope-34
+ |
+ |---d: Package(Packager)[tuple]{int} - scope-28
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-4.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-4.gld
new file mode 100644
index 0000000..928b9a5
--- /dev/null
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-4.gld
@@ -0,0 +1,97 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-44 -> Tez vertex scope-46,
+Tez vertex scope-45 -> Tez vertex scope-46,
+Tez vertex scope-50 -> Tez vertex scope-51,Tez vertex scope-52,
+Tez vertex scope-52 -> Tez vertex scope-46,
+Tez vertex scope-46 -> Tez vertex scope-51,
+Tez vertex scope-51
+
+Tez vertex scope-44
+# Plan on vertex
+POValueOutputTez - scope-48 -> [scope-46]
+|
+|---b: New For Each(false,false)[bag] - scope-7
+ | |
+ | Cast[int] - scope-2
+ | |
+ | |---Project[bytearray][0] - scope-1
+ | |
+ | Cast[int] - scope-5
+ | |
+ | |---Project[bytearray][1] - scope-4
+ |
+ |---b: Load(file:///tmp/input2:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-45
+# Plan on vertex
+POValueOutputTez - scope-49 -> [scope-46]
+|
+|---c: New For Each(false,false)[bag] - scope-15
+ | |
+ | Cast[int] - scope-10
+ | |
+ | |---Project[bytearray][0] - scope-9
+ | |
+ | Cast[int] - scope-13
+ | |
+ | |---Project[bytearray][1] - scope-12
+ |
+ |---c: Load(file:///tmp/input3:org.apache.pig.builtin.PigStorage) - scope-8
+Tez vertex scope-50
+# Plan on vertex
+d: BuildBloom Rearrange[tuple]{int}(false) - scope-31 -> [ scope-51, scope-52]
+| |
+| Project[int][0] - scope-32
+|
+|---a: New For Each(false,false)[bag] - scope-24
+ | |
+ | Cast[int] - scope-19
+ | |
+ | |---Project[bytearray][0] - scope-18
+ | |
+ | Cast[int] - scope-22
+ | |
+ | |---Project[bytearray][1] - scope-21
+ |
+ |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-17
+Tez vertex scope-52
+# Combine plan on edge <scope-50>
+Local Rearrange[tuple]{int}(false) - scope-57 -> scope-52
+| |
+| Project[int][0] - scope-56
+|
+|---Package(BloomPackager)[tuple]{int} - scope-55
+# Plan on vertex
+POValueOutputTez - scope-54 -> [scope-46]
+|
+|---Package(BloomPackager)[tuple]{int} - scope-53
+Tez vertex scope-46
+# Plan on vertex
+d: BloomFilter Rearrange[tuple]{int}(false) - scope-29 <- scope-52 -> scope-51
+| |
+| Project[int][0] - scope-30
+|
+|---POShuffledValueInputTez - scope-47 <- [scope-44, scope-45]
+Tez vertex scope-51
+# Plan on vertex
+e: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-43
+|
+|---e: New For Each(false,false,false)[bag] - scope-42
+ | |
+ | Project[int][2] - scope-36
+ | |
+ | Project[int][3] - scope-38
+ | |
+ | Project[int][1] - scope-40
+ |
+ |---d: New For Each(true,true)[tuple] - scope-35
+ | |
+ | Project[bag][1] - scope-33
+ | |
+ | Project[bag][2] - scope-34
+ |
+ |---d: Package(Packager)[tuple]{int} - scope-28
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-5-KeyToReducer.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-5-KeyToReducer.gld
new file mode 100644
index 0000000..a426025
--- /dev/null
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-5-KeyToReducer.gld
@@ -0,0 +1,107 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-60 -> Tez vertex scope-61,Tez vertex scope-62,
+Tez vertex scope-62 -> Tez vertex scope-54,Tez vertex scope-58,
+Tez vertex scope-54 -> Tez vertex scope-58,Tez vertex scope-61,
+Tez vertex scope-58 -> Tez vertex scope-61,
+Tez vertex scope-61
+
+Tez vertex scope-60
+# Plan on vertex
+d: BuildBloom Rearrange[tuple]{int}(false) - scope-38 -> [ scope-61, scope-62]
+| |
+| Project[int][0] - scope-39
+|
+|---b: New For Each(false,false)[bag] - scope-28
+ | |
+ | Cast[int] - scope-23
+ | |
+ | |---Project[bytearray][0] - scope-22
+ | |
+ | Cast[int] - scope-26
+ | |
+ | |---Project[bytearray][1] - scope-25
+ |
+ |---b: Load(file:///tmp/input2:org.apache.pig.builtin.PigStorage) - scope-21
+Tez vertex scope-62
+# Combine plan on edge <scope-60>
+Local Rearrange[tuple]{int}(false) - scope-67 -> scope-62
+| |
+| Project[int][0] - scope-66
+|
+|---Package(BloomPackager)[tuple]{int} - scope-65
+# Plan on vertex
+POValueOutputTez - scope-64 -> [scope-54, scope-58]
+|
+|---Package(BloomPackager)[tuple]{int} - scope-63
+Tez vertex scope-54
+# Plan on vertex
+a: Split - scope-68
+| |
+| d: BloomFilter Rearrange[tuple]{int}(false) - scope-34 <- scope-62 -> scope-61
+| | |
+| | Project[int][0] - scope-35
+| |
+| |---a1: Filter[bag] - scope-11
+| | |
+| | Equal To[boolean] - scope-14
+| | |
+| | |---Project[int][0] - scope-12
+| | |
+| | |---Constant(3) - scope-13
+| |
+| POValueOutputTez - scope-55 -> [scope-58]
+|
+|---a: New For Each(false,false)[bag] - scope-7
+ | |
+ | Cast[int] - scope-2
+ | |
+ | |---Project[bytearray][0] - scope-1
+ | |
+ | Cast[int] - scope-5
+ | |
+ | |---Project[bytearray][1] - scope-4
+ |
+ |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-58
+# Plan on vertex
+d: BloomFilter Rearrange[tuple]{int}(false) - scope-36 <- scope-62 -> scope-61
+| |
+| Project[int][0] - scope-37
+|
+|---a2: Filter[bag] - scope-17
+ | |
+ | Equal To[boolean] - scope-20
+ | |
+ | |---Project[int][0] - scope-18
+ | |
+ | |---Constant(4) - scope-19
+ |
+ |---POValueInputTez - scope-59 <- scope-54
+Tez vertex scope-61
+# Plan on vertex
+e: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-53
+|
+|---e: New For Each(false,false,false,false)[bag] - scope-52
+ | |
+ | Project[int][0] - scope-44
+ | |
+ | Project[int][1] - scope-46
+ | |
+ | Project[int][3] - scope-48
+ | |
+ | Project[int][5] - scope-50
+ |
+ |---d: New For Each(true,true,true)[tuple] - scope-43
+ | |
+ | Project[bag][1] - scope-40
+ | |
+ | Project[bag][2] - scope-41
+ | |
+ | Project[bag][3] - scope-42
+ |
+ |---d: Package(Packager)[tuple]{int} - scope-33
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-5.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-5.gld
new file mode 100644
index 0000000..a426025
--- /dev/null
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-5.gld
@@ -0,0 +1,107 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-60 -> Tez vertex scope-61,Tez vertex scope-62,
+Tez vertex scope-62 -> Tez vertex scope-54,Tez vertex scope-58,
+Tez vertex scope-54 -> Tez vertex scope-58,Tez vertex scope-61,
+Tez vertex scope-58 -> Tez vertex scope-61,
+Tez vertex scope-61
+
+Tez vertex scope-60
+# Plan on vertex
+d: BuildBloom Rearrange[tuple]{int}(false) - scope-38 -> [ scope-61, scope-62]
+| |
+| Project[int][0] - scope-39
+|
+|---b: New For Each(false,false)[bag] - scope-28
+ | |
+ | Cast[int] - scope-23
+ | |
+ | |---Project[bytearray][0] - scope-22
+ | |
+ | Cast[int] - scope-26
+ | |
+ | |---Project[bytearray][1] - scope-25
+ |
+ |---b: Load(file:///tmp/input2:org.apache.pig.builtin.PigStorage) - scope-21
+Tez vertex scope-62
+# Combine plan on edge <scope-60>
+Local Rearrange[tuple]{int}(false) - scope-67 -> scope-62
+| |
+| Project[int][0] - scope-66
+|
+|---Package(BloomPackager)[tuple]{int} - scope-65
+# Plan on vertex
+POValueOutputTez - scope-64 -> [scope-54, scope-58]
+|
+|---Package(BloomPackager)[tuple]{int} - scope-63
+Tez vertex scope-54
+# Plan on vertex
+a: Split - scope-68
+| |
+| d: BloomFilter Rearrange[tuple]{int}(false) - scope-34 <- scope-62 -> scope-61
+| | |
+| | Project[int][0] - scope-35
+| |
+| |---a1: Filter[bag] - scope-11
+| | |
+| | Equal To[boolean] - scope-14
+| | |
+| | |---Project[int][0] - scope-12
+| | |
+| | |---Constant(3) - scope-13
+| |
+| POValueOutputTez - scope-55 -> [scope-58]
+|
+|---a: New For Each(false,false)[bag] - scope-7
+ | |
+ | Cast[int] - scope-2
+ | |
+ | |---Project[bytearray][0] - scope-1
+ | |
+ | Cast[int] - scope-5
+ | |
+ | |---Project[bytearray][1] - scope-4
+ |
+ |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-58
+# Plan on vertex
+d: BloomFilter Rearrange[tuple]{int}(false) - scope-36 <- scope-62 -> scope-61
+| |
+| Project[int][0] - scope-37
+|
+|---a2: Filter[bag] - scope-17
+ | |
+ | Equal To[boolean] - scope-20
+ | |
+ | |---Project[int][0] - scope-18
+ | |
+ | |---Constant(4) - scope-19
+ |
+ |---POValueInputTez - scope-59 <- scope-54
+Tez vertex scope-61
+# Plan on vertex
+e: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-53
+|
+|---e: New For Each(false,false,false,false)[bag] - scope-52
+ | |
+ | Project[int][0] - scope-44
+ | |
+ | Project[int][1] - scope-46
+ | |
+ | Project[int][3] - scope-48
+ | |
+ | Project[int][5] - scope-50
+ |
+ |---d: New For Each(true,true,true)[tuple] - scope-43
+ | |
+ | Project[bag][1] - scope-40
+ | |
+ | Project[bag][2] - scope-41
+ | |
+ | Project[bag][3] - scope-42
+ |
+ |---d: Package(Packager)[tuple]{int} - scope-33
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-6-KeyToReducer.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-6-KeyToReducer.gld
new file mode 100644
index 0000000..c5a2000
--- /dev/null
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-6-KeyToReducer.gld
@@ -0,0 +1,95 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-49 -> Tez vertex scope-56,Tez vertex scope-57,
+Tez vertex scope-57 -> Tez vertex scope-53,
+Tez vertex scope-53 -> Tez vertex scope-56,
+Tez vertex scope-56
+
+Tez vertex scope-49
+# Plan on vertex
+a: Split - scope-63
+| |
+| a2: Store(file:///tmp/pigoutput/a2:org.apache.pig.builtin.PigStorage) - scope-15
+| |
+| |---a2: Filter[bag] - scope-11
+| | |
+| | Equal To[boolean] - scope-14
+| | |
+| | |---Project[int][0] - scope-12
+| | |
+| | |---Constant(4) - scope-13
+| |
+| d: BuildBloom Rearrange[tuple]{int}(false) - scope-36 -> [ scope-56, scope-57]
+| | |
+| | Project[int][0] - scope-37
+| |
+| |---a1: Filter[bag] - scope-26
+| | |
+| | Equal To[boolean] - scope-29
+| | |
+| | |---Project[int][0] - scope-27
+| | |
+| | |---Constant(3) - scope-28
+|
+|---a: New For Each(false,false)[bag] - scope-7
+ | |
+ | Cast[int] - scope-2
+ | |
+ | |---Project[bytearray][0] - scope-1
+ | |
+ | Cast[int] - scope-5
+ | |
+ | |---Project[bytearray][1] - scope-4
+ |
+ |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-57
+# Combine plan on edge <scope-49>
+Local Rearrange[tuple]{int}(false) - scope-62 -> scope-57
+| |
+| Project[int][0] - scope-61
+|
+|---Package(BloomPackager)[tuple]{int} - scope-60
+# Plan on vertex
+POValueOutputTez - scope-59 -> [scope-53]
+|
+|---Package(BloomPackager)[tuple]{int} - scope-58
+Tez vertex scope-53
+# Plan on vertex
+d: BloomFilter Rearrange[tuple]{int}(false) - scope-34 <- scope-57 -> scope-56
+| |
+| Project[int][0] - scope-35
+|
+|---b: New For Each(false,false)[bag] - scope-23
+ | |
+ | Cast[int] - scope-18
+ | |
+ | |---Project[bytearray][0] - scope-17
+ | |
+ | Cast[int] - scope-21
+ | |
+ | |---Project[bytearray][1] - scope-20
+ |
+ |---b: Load(file:///tmp/input2:org.apache.pig.builtin.PigStorage) - scope-16
+Tez vertex scope-56
+# Plan on vertex
+e: Store(file:///tmp/pigoutput/e:org.apache.pig.builtin.PigStorage) - scope-48
+|
+|---e: New For Each(false,false,false)[bag] - scope-47
+ | |
+ | Project[int][2] - scope-41
+ | |
+ | Project[int][3] - scope-43
+ | |
+ | Project[int][1] - scope-45
+ |
+ |---d: New For Each(true,true)[tuple] - scope-40
+ | |
+ | Project[bag][1] - scope-38
+ | |
+ | Project[bag][2] - scope-39
+ |
+ |---d: Package(Packager)[tuple]{int} - scope-33
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-6.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-6.gld
new file mode 100644
index 0000000..c5a2000
--- /dev/null
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-6.gld
@@ -0,0 +1,95 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-49 -> Tez vertex scope-56,Tez vertex scope-57,
+Tez vertex scope-57 -> Tez vertex scope-53,
+Tez vertex scope-53 -> Tez vertex scope-56,
+Tez vertex scope-56
+
+Tez vertex scope-49
+# Plan on vertex
+a: Split - scope-63
+| |
+| a2: Store(file:///tmp/pigoutput/a2:org.apache.pig.builtin.PigStorage) - scope-15
+| |
+| |---a2: Filter[bag] - scope-11
+| | |
+| | Equal To[boolean] - scope-14
+| | |
+| | |---Project[int][0] - scope-12
+| | |
+| | |---Constant(4) - scope-13
+| |
+| d: BuildBloom Rearrange[tuple]{int}(false) - scope-36 -> [ scope-56, scope-57]
+| | |
+| | Project[int][0] - scope-37
+| |
+| |---a1: Filter[bag] - scope-26
+| | |
+| | Equal To[boolean] - scope-29
+| | |
+| | |---Project[int][0] - scope-27
+| | |
+| | |---Constant(3) - scope-28
+|
+|---a: New For Each(false,false)[bag] - scope-7
+ | |
+ | Cast[int] - scope-2
+ | |
+ | |---Project[bytearray][0] - scope-1
+ | |
+ | Cast[int] - scope-5
+ | |
+ | |---Project[bytearray][1] - scope-4
+ |
+ |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-57
+# Combine plan on edge <scope-49>
+Local Rearrange[tuple]{int}(false) - scope-62 -> scope-57
+| |
+| Project[int][0] - scope-61
+|
+|---Package(BloomPackager)[tuple]{int} - scope-60
+# Plan on vertex
+POValueOutputTez - scope-59 -> [scope-53]
+|
+|---Package(BloomPackager)[tuple]{int} - scope-58
+Tez vertex scope-53
+# Plan on vertex
+d: BloomFilter Rearrange[tuple]{int}(false) - scope-34 <- scope-57 -> scope-56
+| |
+| Project[int][0] - scope-35
+|
+|---b: New For Each(false,false)[bag] - scope-23
+ | |
+ | Cast[int] - scope-18
+ | |
+ | |---Project[bytearray][0] - scope-17
+ | |
+ | Cast[int] - scope-21
+ | |
+ | |---Project[bytearray][1] - scope-20
+ |
+ |---b: Load(file:///tmp/input2:org.apache.pig.builtin.PigStorage) - scope-16
+Tez vertex scope-56
+# Plan on vertex
+e: Store(file:///tmp/pigoutput/e:org.apache.pig.builtin.PigStorage) - scope-48
+|
+|---e: New For Each(false,false,false)[bag] - scope-47
+ | |
+ | Project[int][2] - scope-41
+ | |
+ | Project[int][3] - scope-43
+ | |
+ | Project[int][1] - scope-45
+ |
+ |---d: New For Each(true,true)[tuple] - scope-40
+ | |
+ | Project[bag][1] - scope-38
+ | |
+ | Project[bag][2] - scope-39
+ |
+ |---d: Package(Packager)[tuple]{int} - scope-33
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-7-KeyToReducer.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-7-KeyToReducer.gld
new file mode 100644
index 0000000..1acbb13
--- /dev/null
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-7-KeyToReducer.gld
@@ -0,0 +1,95 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-43 -> Tez vertex scope-45,Tez vertex scope-47,Tez vertex scope-51,Tez vertex scope-52,
+Tez vertex scope-52 -> Tez vertex scope-45,Tez vertex scope-47,
+Tez vertex scope-45 -> Tez vertex scope-51,
+Tez vertex scope-47 -> Tez vertex scope-51,
+Tez vertex scope-51
+
+Tez vertex scope-43
+# Plan on vertex
+a: Split - scope-58
+| |
+| e: BuildBloom Rearrange[tuple]{int}(false) - scope-36 -> [ scope-51, scope-52]
+| | |
+| | Project[int][0] - scope-37
+| |
+| |---d: Filter[bag] - scope-23
+| | |
+| | Greater Than[boolean] - scope-26
+| | |
+| | |---Project[int][0] - scope-24
+| | |
+| | |---Constant(10) - scope-25
+| |
+| POValueOutputTez - scope-44 -> [scope-45, scope-47]
+|
+|---a: New For Each(false,false)[bag] - scope-7
+ | |
+ | Cast[int] - scope-2
+ | |
+ | |---Project[bytearray][0] - scope-1
+ | |
+ | Cast[int] - scope-5
+ | |
+ | |---Project[bytearray][1] - scope-4
+ |
+ |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-52
+# Combine plan on edge <scope-43>
+Local Rearrange[tuple]{int}(false) - scope-57 -> scope-52
+| |
+| Project[int][0] - scope-56
+|
+|---Package(BloomPackager)[tuple]{int} - scope-55
+# Plan on vertex
+POValueOutputTez - scope-54 -> [scope-45, scope-47]
+|
+|---Package(BloomPackager)[tuple]{int} - scope-53
+Tez vertex scope-45
+# Plan on vertex
+e: BloomFilter Rearrange[tuple]{int}(false) - scope-32 <- scope-52 -> scope-51
+| |
+| Project[int][0] - scope-33
+|
+|---b: Filter[bag] - scope-11
+ | |
+ | Less Than[boolean] - scope-14
+ | |
+ | |---Project[int][0] - scope-12
+ | |
+ | |---Constant(5) - scope-13
+ |
+ |---POValueInputTez - scope-46 <- scope-43
+Tez vertex scope-47
+# Plan on vertex
+e: BloomFilter Rearrange[tuple]{int}(false) - scope-34 <- scope-52 -> scope-51
+| |
+| Project[int][0] - scope-35
+|
+|---c: Filter[bag] - scope-17
+ | |
+ | Equal To[boolean] - scope-20
+ | |
+ | |---Project[int][0] - scope-18
+ | |
+ | |---Constant(10) - scope-19
+ |
+ |---POValueInputTez - scope-48 <- scope-43
+Tez vertex scope-51
+# Plan on vertex
+e: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-42
+|
+|---e: New For Each(true,true,true)[tuple] - scope-41
+ | |
+ | Project[bag][1] - scope-38
+ | |
+ | Project[bag][2] - scope-39
+ | |
+ | Project[bag][3] - scope-40
+ |
+ |---e: Package(Packager)[tuple]{int} - scope-31
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-7.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-7.gld
new file mode 100644
index 0000000..1acbb13
--- /dev/null
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-7.gld
@@ -0,0 +1,95 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-43 -> Tez vertex scope-45,Tez vertex scope-47,Tez vertex scope-51,Tez vertex scope-52,
+Tez vertex scope-52 -> Tez vertex scope-45,Tez vertex scope-47,
+Tez vertex scope-45 -> Tez vertex scope-51,
+Tez vertex scope-47 -> Tez vertex scope-51,
+Tez vertex scope-51
+
+Tez vertex scope-43
+# Plan on vertex
+a: Split - scope-58
+| |
+| e: BuildBloom Rearrange[tuple]{int}(false) - scope-36 -> [ scope-51, scope-52]
+| | |
+| | Project[int][0] - scope-37
+| |
+| |---d: Filter[bag] - scope-23
+| | |
+| | Greater Than[boolean] - scope-26
+| | |
+| | |---Project[int][0] - scope-24
+| | |
+| | |---Constant(10) - scope-25
+| |
+| POValueOutputTez - scope-44 -> [scope-45, scope-47]
+|
+|---a: New For Each(false,false)[bag] - scope-7
+ | |
+ | Cast[int] - scope-2
+ | |
+ | |---Project[bytearray][0] - scope-1
+ | |
+ | Cast[int] - scope-5
+ | |
+ | |---Project[bytearray][1] - scope-4
+ |
+ |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-52
+# Combine plan on edge <scope-43>
+Local Rearrange[tuple]{int}(false) - scope-57 -> scope-52
+| |
+| Project[int][0] - scope-56
+|
+|---Package(BloomPackager)[tuple]{int} - scope-55
+# Plan on vertex
+POValueOutputTez - scope-54 -> [scope-45, scope-47]
+|
+|---Package(BloomPackager)[tuple]{int} - scope-53
+Tez vertex scope-45
+# Plan on vertex
+e: BloomFilter Rearrange[tuple]{int}(false) - scope-32 <- scope-52 -> scope-51
+| |
+| Project[int][0] - scope-33
+|
+|---b: Filter[bag] - scope-11
+ | |
+ | Less Than[boolean] - scope-14
+ | |
+ | |---Project[int][0] - scope-12
+ | |
+ | |---Constant(5) - scope-13
+ |
+ |---POValueInputTez - scope-46 <- scope-43
+Tez vertex scope-47
+# Plan on vertex
+e: BloomFilter Rearrange[tuple]{int}(false) - scope-34 <- scope-52 -> scope-51
+| |
+| Project[int][0] - scope-35
+|
+|---c: Filter[bag] - scope-17
+ | |
+ | Equal To[boolean] - scope-20
+ | |
+ | |---Project[int][0] - scope-18
+ | |
+ | |---Constant(10) - scope-19
+ |
+ |---POValueInputTez - scope-48 <- scope-43
+Tez vertex scope-51
+# Plan on vertex
+e: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-42
+|
+|---e: New For Each(true,true,true)[tuple] - scope-41
+ | |
+ | Project[bag][1] - scope-38
+ | |
+ | Project[bag][2] - scope-39
+ | |
+ | Project[bag][3] - scope-40
+ |
+ |---e: Package(Packager)[tuple]{int} - scope-31
diff --git a/test/org/apache/pig/tez/TestTezCompiler.java b/test/org/apache/pig/tez/TestTezCompiler.java
index d01a4c2..801c195 100644
--- a/test/org/apache/pig/tez/TestTezCompiler.java
+++ b/test/org/apache/pig/tez/TestTezCompiler.java
@@ -88,6 +88,7 @@
pc.getProperties().remove(PigConfiguration.PIG_OPT_MULTIQUERY);
pc.getProperties().remove(PigConfiguration.PIG_TEZ_OPT_UNION);
pc.getProperties().remove(PigConfiguration.PIG_EXEC_NO_SECONDARY_KEY);
+ pc.getProperties().remove(PigConfiguration.PIG_BLOOMJOIN_STRATEGY);
pigServer = new PigServer(pc);
}
@@ -178,6 +179,125 @@
}
@Test
+ public void testBloomJoin() throws Exception {
+ String query =
+ "a = load 'file:///tmp/input1' as (x, y:int);" +
+ "b = load 'file:///tmp/input2' as (x, z:int);" +
+ "c = load 'file:///tmp/input2' as (x, w:int);" +
+ "d = join b by x, a by x, c by x using 'bloom';" +
+ "e = foreach d generate a::x as x, y, z, w;" +
+ "store e into 'file:///tmp/pigoutput';";
+
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-1.gld");
+ resetScope();
+ setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, "reduce");
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-1-KeyToReducer.gld");
+ }
+
+ @Test
+ public void testBloomJoinLeftOuter() throws Exception {
+ String query =
+ "a = load 'file:///tmp/input1' as (x:chararray, y:int);" +
+ "b = load 'file:///tmp/input2' as (x:chararray, z:int);" +
+ "d = join a by x left, b by x using 'bloom';" +
+ "e = foreach d generate a::x as x, y, z;" +
+ "store e into 'file:///tmp/pigoutput';";
+
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-2.gld");
+ resetScope();
+ setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, "reduce");
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-2-KeyToReducer.gld");
+ }
+
+ @Test
+ public void testBloomJoinUnion() throws Exception {
+ // Left input from a union
+ String query =
+ "a = load 'file:///tmp/input1' as (x:int, y:int);" +
+ "b = load 'file:///tmp/input2' as (x:int, z:int);" +
+ "c = load 'file:///tmp/input3' as (x:int, z:int);" +
+ "b = union b, c;" +
+ "d = join a by x, b by x using 'bloom';" +
+ "e = foreach d generate a::x as x, y, z;" +
+ "store e into 'file:///tmp/pigoutput';";
+
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-3.gld");
+ resetScope();
+ setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, "reduce");
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-3-KeyToReducer.gld");
+ setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, null);
+
+ resetScope();
+ // Right input from a union
+ query =
+ "a = load 'file:///tmp/input1' as (x:int, y:int);" +
+ "b = load 'file:///tmp/input2' as (x:int, z:int);" +
+ "c = load 'file:///tmp/input3' as (x:int, z:int);" +
+ "b = union b, c;" +
+ "d = join b by x, a by x using 'bloom';" +
+ "e = foreach d generate a::x as x, y, z;" +
+ "store e into 'file:///tmp/pigoutput';";
+
+ // Needs shared edges and PIG-3856 to be a more optimial plan
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-4.gld");
+ resetScope();
+ setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, "reduce");
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-4-KeyToReducer.gld");
+ }
+
+ @Test
+ public void testBloomJoinSplit() throws Exception {
+ // Left input from a split
+ String query =
+ "a = load 'file:///tmp/input1' as (x:int, y:int);" +
+ "b = load 'file:///tmp/input2' as (x:int, z:int);" +
+ "a1 = filter a by x == 3;" +
+ "a2 = filter a by x == 4;" +
+ "d = join a1 by x, a2 by x, b by x using 'bloom';" +
+ "e = foreach d generate a1::x as x, a1::y as y1, a2::y as y2, z;" +
+ "store e into 'file:///tmp/pigoutput';";
+
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-5.gld");
+ resetScope();
+ setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, "reduce");
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-5-KeyToReducer.gld");
+ setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, null);
+
+ resetScope();
+ // Right input from a split
+ query =
+ "a = load 'file:///tmp/input1' as (x:int, y:int);" +
+ "b = load 'file:///tmp/input2' as (x:int, z:int);" +
+ "a1 = filter a by x == 3;" +
+ "a2 = filter a by x == 4;" +
+ "d = join b by x, a1 by x using 'bloom';" +
+ "e = foreach d generate a1::x as x, y, z;" +
+ "store a2 into 'file:///tmp/pigoutput/a2';" +
+ "store e into 'file:///tmp/pigoutput/e';";
+
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-6.gld");
+ resetScope();
+ setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, "reduce");
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-6-KeyToReducer.gld");
+ }
+
+ @Test
+ public void testBloomSelfJoin() throws Exception {
+ String query =
+ "a = load 'file:///tmp/input1' as (x:int, y:int);" +
+ "b = filter a by x < 5;" +
+ "c = filter a by x == 10;" +
+ "d = filter a by x > 10;" +
+ "e = join b by x, c by x, d by x using 'bloom';" +
+ "store e into 'file:///tmp/pigoutput';";
+
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-7.gld");
+ resetScope();
+ setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, "reduce");
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-7-KeyToReducer.gld");
+ }
+
+ @Test
public void testSelfJoin() throws Exception {
String query =
"a = load 'file:///tmp/input1' as (x:int, y:int);" +