PIG-5439: pig.exec.reducers.max does not take effect for skewed join (vnarayanan7 via rohini)
git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1906719 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 82f8bf9..8019aff 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -120,6 +120,8 @@
BUG FIXES
+PIG-5439: pig.exec.reducers.max does not take effect for skewed join (vnarayanan7 via rohini)
+
PIG-5431: Date datatype is different between Hive 1.x and Hive 3.x (rohini)
PIG-5433: Fix test failures with TestHBaseStorage and htrace dependency (rohini)
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/udf/FindQuantilesTez.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/udf/FindQuantilesTez.java
index 249b897..f224d57 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/udf/FindQuantilesTez.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/udf/FindQuantilesTez.java
@@ -84,7 +84,8 @@
long estimatedInputSize = (long)((double)sampleSize/mySamples.size() * totalInputRows);
estimatedNumReducers = (int)Math.ceil((double)estimatedInputSize/bytesPerTask);
- estimatedNumReducers = Math.min(estimatedNumReducers, InputSizeReducerEstimator.DEFAULT_MAX_REDUCER_COUNT_PARAM);
+ int maxReducers = PigMapReduce.sJobConfInternal.get().getInt(InputSizeReducerEstimator.MAX_REDUCER_COUNT_PARAM, InputSizeReducerEstimator.DEFAULT_MAX_REDUCER_COUNT_PARAM);
+ estimatedNumReducers = Math.min(estimatedNumReducers, maxReducers);
if (estimatedNumReducers==0) {
estimatedNumReducers = 1;
}
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/udf/PartitionSkewedKeysTez.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/udf/PartitionSkewedKeysTez.java
index e4a8044..052d541 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/udf/PartitionSkewedKeysTez.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/udf/PartitionSkewedKeysTez.java
@@ -70,7 +70,8 @@
InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER);
estimatedNumReducers = (int)Math.ceil((double)estimatedInputSize/bytesPerTask);
- estimatedNumReducers = Math.min(estimatedNumReducers, InputSizeReducerEstimator.DEFAULT_MAX_REDUCER_COUNT_PARAM);
+ int maxReducers = PigMapReduce.sJobConfInternal.get().getInt(InputSizeReducerEstimator.MAX_REDUCER_COUNT_PARAM, InputSizeReducerEstimator.DEFAULT_MAX_REDUCER_COUNT_PARAM);
+ estimatedNumReducers = Math.min(estimatedNumReducers, maxReducers);
LOG.info("Estimating parallelism: estimatedInputSize is " + estimatedInputSize + ". bytesPerTask is " + bytesPerTask + ". estimatedNumReducers is " + estimatedNumReducers + ".");