Added the ability to specify the amount of reducers when doing a sharded join.
Signed-off-by: Josh Wills <jwills@apache.org>
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/join/ShardedJoinStrategy.java b/crunch-core/src/main/java/org/apache/crunch/lib/join/ShardedJoinStrategy.java
index b881e66..2a38457 100644
--- a/crunch-core/src/main/java/org/apache/crunch/lib/join/ShardedJoinStrategy.java
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/join/ShardedJoinStrategy.java
@@ -63,6 +63,16 @@
public ShardedJoinStrategy(int numShards) {
this(new ConstantShardingStrategy<K>(numShards));
}
+
+ /**
+ * Instantiate with a constant number of shards to use for all keys.
+ *
+ * @param numShards number of shards to use
+ * @param numReducers the amount of reducers to run the join with
+ */
+ public ShardedJoinStrategy(int numShards, int numReducers) {
+ this(new ConstantShardingStrategy<K>(numShards), numReducers);
+ }
/**
* Instantiate with a custom sharding strategy.
@@ -74,6 +84,20 @@
this.shardingStrategy = shardingStrategy;
}
+ /**
+ * Instantiate with a custom sharding strategy and a specified number of reducers.
+ *
+ * @param shardingStrategy strategy to be used for sharding
+ * @param numReducers the amount of reducers to run the join with
+ */
+ public ShardedJoinStrategy(ShardingStrategy<K> shardingStrategy, int numReducers) {
+ if (numReducers < 1) {
+ throw new IllegalArgumentException("Num reducers must be > 0, got " + numReducers);
+ }
+ this.wrappedJoinStrategy = new DefaultJoinStrategy<Pair<K, Integer>, U, V>(numReducers);
+ this.shardingStrategy = shardingStrategy;
+ }
+
@Override
public PTable<K, Pair<U, V>> join(PTable<K, U> left, PTable<K, V> right, JoinType joinType) {