PIG-5439: pig.exec.reducers.max does not take effect for skewed join (vnarayanan7 via rohini)

git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1906719 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 82f8bf9..8019aff 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -120,6 +120,8 @@
  
 BUG FIXES
 
+PIG-5439: pig.exec.reducers.max does not take effect for skewed join (vnarayanan7 via rohini)
+
 PIG-5431: Date datatype is different between Hive 1.x and Hive 3.x (rohini)
 
 PIG-5433: Fix test failures with TestHBaseStorage and htrace dependency (rohini)
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/udf/FindQuantilesTez.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/udf/FindQuantilesTez.java
index 249b897..f224d57 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/udf/FindQuantilesTez.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/udf/FindQuantilesTez.java
@@ -84,7 +84,8 @@
 
             long estimatedInputSize = (long)((double)sampleSize/mySamples.size() * totalInputRows);
             estimatedNumReducers = (int)Math.ceil((double)estimatedInputSize/bytesPerTask);
-            estimatedNumReducers = Math.min(estimatedNumReducers, InputSizeReducerEstimator.DEFAULT_MAX_REDUCER_COUNT_PARAM);
+            int maxReducers = PigMapReduce.sJobConfInternal.get().getInt(InputSizeReducerEstimator.MAX_REDUCER_COUNT_PARAM, InputSizeReducerEstimator.DEFAULT_MAX_REDUCER_COUNT_PARAM);
+            estimatedNumReducers = Math.min(estimatedNumReducers, maxReducers);
             if (estimatedNumReducers==0) {
                 estimatedNumReducers = 1;
             }
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/udf/PartitionSkewedKeysTez.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/udf/PartitionSkewedKeysTez.java
index e4a8044..052d541 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/udf/PartitionSkewedKeysTez.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/udf/PartitionSkewedKeysTez.java
@@ -70,7 +70,8 @@
                     InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER);
 
             estimatedNumReducers = (int)Math.ceil((double)estimatedInputSize/bytesPerTask);
-            estimatedNumReducers = Math.min(estimatedNumReducers, InputSizeReducerEstimator.DEFAULT_MAX_REDUCER_COUNT_PARAM);
+            int maxReducers = PigMapReduce.sJobConfInternal.get().getInt(InputSizeReducerEstimator.MAX_REDUCER_COUNT_PARAM, InputSizeReducerEstimator.DEFAULT_MAX_REDUCER_COUNT_PARAM);
+            estimatedNumReducers = Math.min(estimatedNumReducers, maxReducers);
 
             LOG.info("Estimating parallelism: estimatedInputSize is " + estimatedInputSize + ". bytesPerTask is " + bytesPerTask + ". estimatedNumReducers is " + estimatedNumReducers + ".");