HIVE-28428: Performance regression in map-hash aggregation (Ryu Kobayashi, reviewed by Denys Kuzmenko)
Closes #5380
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index af3a757..51dc93d 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2012,6 +2012,8 @@ public static enum ConfVars {
"Set to 1 to make sure hash aggregation is never turned off."),
HIVE_MAP_AGGR_HASH_MIN_REDUCTION_LOWER_BOUND("hive.map.aggr.hash.min.reduction.lower.bound", (float) 0.4,
"Lower bound of Hash aggregate reduction filter. See also: hive.map.aggr.hash.min.reduction"),
+ HIVE_MAP_AGGR_HASH_FLUSH_SIZE_PERCENT("hive.map.aggr.hash.flush.size.percent", (float) 0.1,
+ "Percentage of hash table entries to flush in map-side group aggregation."),
HIVE_MAP_AGGR_HASH_MIN_REDUCTION_STATS_ADJUST("hive.map.aggr.hash.min.reduction.stats", true,
"Whether the value for hive.map.aggr.hash.min.reduction should be set statically using stats estimates. \n" +
"If this is enabled, the default value for hive.map.aggr.hash.min.reduction is only used as an upper-bound\n" +
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
index 326c351..88b7d54 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
@@ -119,6 +119,7 @@ public class GroupByOperator extends Operator<GroupByDesc> implements IConfigure
private transient int groupbyMapAggrInterval;
private transient long numRowsCompareHashAggr;
private transient float minReductionHashAggr;
+ private transient float hashAggrFlushPercent;
private transient int outputKeyLength;
@@ -372,6 +373,7 @@ protected void initializeOp(Configuration hconf) throws HiveException {
// compare every groupbyMapAggrInterval rows
numRowsCompareHashAggr = groupbyMapAggrInterval;
minReductionHashAggr = conf.getMinReductionHashAggr();
+ hashAggrFlushPercent = conf.getHashAggrFlushPercent();
}
List<String> fieldNames = new ArrayList<String>(conf.getOutputColumnNames());
@@ -951,9 +953,6 @@ else if (o instanceof ByteArrayRef){
private void flushHashTable(boolean complete) throws HiveException {
countAfterReport = 0;
- // Currently, the algorithm flushes 10% of the entries - this can be
- // changed in the future
-
if (complete) {
for (Map.Entry<KeyWrapper, AggregationBuffer[]> entry : hashAggregations.entrySet()) {
forward(entry.getKey().getKeyArray(), entry.getValue());
@@ -964,7 +963,8 @@ private void flushHashTable(boolean complete) throws HiveException {
}
int oldSize = hashAggregations.size();
- LOG.info("Hash Tbl flush: #hash table = {}", oldSize);
+ int flushSize = (int) (oldSize * hashAggrFlushPercent);
+ LOG.trace("Hash Tbl flush: #hash table = {}, flush size = {}", oldSize, flushSize);
Iterator<Map.Entry<KeyWrapper, AggregationBuffer[]>> iter = hashAggregations
.entrySet().iterator();
@@ -974,8 +974,8 @@ private void flushHashTable(boolean complete) throws HiveException {
forward(m.getKey().getKeyArray(), m.getValue());
iter.remove();
numDel++;
- if (numDel * 10 >= oldSize) {
- LOG.info("Hash Table flushed: new size = {}", hashAggregations.size());
+ if (numDel >= flushSize) {
+ LOG.trace("Hash Table flushed: new size = {}", hashAggregations.size());
return;
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java
index bc4bff8..7f473e0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java
@@ -502,6 +502,9 @@ private void generateEventOperatorPlan(DynamicListContext ctx, ParseContext pars
float minReductionHashAggrLowerBound =
HiveConf.getFloatVar(parseContext.getConf(),
ConfVars.HIVE_MAP_AGGR_HASH_MIN_REDUCTION_LOWER_BOUND);
+ float hashAggrFlushPercent =
+ HiveConf.getFloatVar(parseContext.getConf(),
+ ConfVars.HIVE_MAP_AGGR_HASH_FLUSH_SIZE_PERCENT);
List<ExprNodeDesc> groupByExprs = new ArrayList<ExprNodeDesc>();
ExprNodeDesc groupByExpr =
@@ -511,7 +514,8 @@ private void generateEventOperatorPlan(DynamicListContext ctx, ParseContext pars
GroupByDesc groupBy =
new GroupByDesc(GroupByDesc.Mode.HASH, outputNames, groupByExprs,
new ArrayList<AggregationDesc>(), false, groupByMemoryUsage, memoryThreshold,
- minReductionHashAggr, minReductionHashAggrLowerBound, null, false, -1, true);
+ minReductionHashAggr, minReductionHashAggrLowerBound,
+ hashAggrFlushPercent, null, false, -1, true);
List<ColumnInfo> groupbyColInfos = new ArrayList<ColumnInfo>();
groupbyColInfos.add(new ColumnInfo(outputNames.get(0), key.getTypeInfo(), "", false));
@@ -623,6 +627,9 @@ private boolean generateSemiJoinOperatorPlan(DynamicListContext ctx, ParseContex
float minReductionHashAggrLowerBound =
HiveConf.getFloatVar(parseContext.getConf(),
ConfVars.HIVE_MAP_AGGR_HASH_MIN_REDUCTION_LOWER_BOUND);
+ float hashAggrFlushPercent =
+ HiveConf.getFloatVar(parseContext.getConf(),
+ ConfVars.HIVE_MAP_AGGR_HASH_FLUSH_SIZE_PERCENT);
// Add min/max and bloom filter aggregations
List<ObjectInspector> aggFnOIs = new ArrayList<ObjectInspector>();
@@ -673,7 +680,7 @@ private boolean generateSemiJoinOperatorPlan(DynamicListContext ctx, ParseContex
GroupByDesc groupBy = new GroupByDesc(GroupByDesc.Mode.HASH,
gbOutputNames, new ArrayList<ExprNodeDesc>(), aggs, false,
groupByMemoryUsage, memoryThreshold, minReductionHashAggr, minReductionHashAggrLowerBound,
- null, false, -1, false);
+ hashAggrFlushPercent, null, false, -1, false);
List<ColumnInfo> groupbyColInfos = new ArrayList<ColumnInfo>();
groupbyColInfos.add(new ColumnInfo(gbOutputNames.get(0), key.getTypeInfo(), "", false));
@@ -771,7 +778,7 @@ private boolean generateSemiJoinOperatorPlan(DynamicListContext ctx, ParseContex
GroupByDesc groupByDescFinal = new GroupByDesc(GroupByDesc.Mode.FINAL,
gbOutputNames, new ArrayList<ExprNodeDesc>(), aggsFinal, false,
groupByMemoryUsage, memoryThreshold, minReductionHashAggr, minReductionHashAggrLowerBound,
- null, false, 0, false);
+ hashAggrFlushPercent, null, false, 0, false);
GroupByOperator groupByOpFinal = (GroupByOperator)OperatorFactory.getAndMakeChild(
groupByDescFinal, new RowSchema(rsOp.getSchema()), rsOp);
groupByOpFinal.setColumnExprMap(new HashMap<String, ExprNodeDesc>());
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SemiJoinReductionMerge.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SemiJoinReductionMerge.java
index d3764dc..37cd180 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SemiJoinReductionMerge.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SemiJoinReductionMerge.java
@@ -403,9 +403,10 @@ private static GroupByOperator createGroupBy(SelectOperator selectOp, Operator<?
float memoryThreshold = HiveConf.getFloatVar(hiveConf, HiveConf.ConfVars.HIVE_MAP_AGGR_MEMORY_THRESHOLD);
float minReductionHashAggr = HiveConf.getFloatVar(hiveConf, HiveConf.ConfVars.HIVE_MAP_AGGR_HASH_MIN_REDUCTION);
float minReductionHashAggrLowerBound = HiveConf.getFloatVar(hiveConf, HiveConf.ConfVars.HIVE_MAP_AGGR_HASH_MIN_REDUCTION_LOWER_BOUND);
+ float hashAggrFlushPercent = HiveConf.getFloatVar(hiveConf, HiveConf.ConfVars.HIVE_MAP_AGGR_HASH_FLUSH_SIZE_PERCENT);
GroupByDesc groupBy =
new GroupByDesc(gbMode, gbOutputNames, Collections.emptyList(), gbAggs, false, groupByMemoryUsage,
- memoryThreshold, minReductionHashAggr, minReductionHashAggrLowerBound, null, false, -1, false);
+ memoryThreshold, minReductionHashAggr, minReductionHashAggrLowerBound, hashAggrFlushPercent, null, false, -1, false);
groupBy.setColumnExprMap(Collections.emptyMap());
return (GroupByOperator) OperatorFactory.getAndMakeChild(groupBy, new RowSchema(gbColInfos), parentOp);
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/opconventer/HiveGBOpConvUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/opconventer/HiveGBOpConvUtil.java
index 5142f4f..871c31b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/opconventer/HiveGBOpConvUtil.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/opconventer/HiveGBOpConvUtil.java
@@ -124,6 +124,7 @@ private static class GBInfo {
float memoryThreshold;
float minReductionHashAggr;
float minReductionHashAggrLowerBound;
+ float hashAggrFlushPercent;
private HIVEGBPHYSICALMODE gbPhysicalPipelineMode;
@@ -834,7 +835,7 @@ private static OpAttr genReduceSideGB2(OpAttr inputOpAf, GBInfo gbInfo) throws S
Operator rsGBOp2 = OperatorFactory.getAndMakeChild(new GroupByDesc(GroupByDesc.Mode.FINAL,
outputColNames, gbKeys, aggregations, false, gbInfo.groupByMemoryUsage,
gbInfo.memoryThreshold, gbInfo.minReductionHashAggr, gbInfo.minReductionHashAggrLowerBound,
- null, false, groupingSetsPosition, gbInfo.containsDistinctAggr),
+ gbInfo.hashAggrFlushPercent, null, false, groupingSetsPosition, gbInfo.containsDistinctAggr),
new RowSchema(colInfoLst), rs);
rsGBOp2.setColumnExprMap(colExprMap);
@@ -973,7 +974,7 @@ private static OpAttr genReduceSideGB1(OpAttr inputOpAf, GBInfo gbInfo, boolean
&& !(gbInfo.gbPhysicalPipelineMode == HIVEGBPHYSICALMODE.MAP_SIDE_GB_SKEW_GBKEYS_OR_DIST_UDAF_PRESENT);
Operator rsGBOp = OperatorFactory.getAndMakeChild(new GroupByDesc(gbMode, outputColNames,
gbKeys, aggregations, gbInfo.groupByMemoryUsage, gbInfo.memoryThreshold,
- gbInfo.minReductionHashAggr, gbInfo.minReductionHashAggrLowerBound,
+ gbInfo.minReductionHashAggr, gbInfo.minReductionHashAggrLowerBound, gbInfo.hashAggrFlushPercent,
gbInfo.grpSets, includeGrpSetInGBDesc, groupingSetsColPosition, gbInfo.containsDistinctAggr),
new RowSchema(colInfoLst), rs);
@@ -1116,7 +1117,7 @@ private static OpAttr genReduceSideGB1NoMapGB(OpAttr inputOpAf, GBInfo gbInfo,
Operator rsGB1 = OperatorFactory.getAndMakeChild(new GroupByDesc(gbMode, outputColNames,
gbKeys, aggregations, false, gbInfo.groupByMemoryUsage, gbInfo.minReductionHashAggrLowerBound,
- gbInfo.memoryThreshold, gbInfo.minReductionHashAggr, null,
+ gbInfo.hashAggrFlushPercent, gbInfo.memoryThreshold, gbInfo.minReductionHashAggr, null,
false, -1, numDistinctUDFs > 0), new RowSchema(colInfoLst), rs);
rsGB1.setColumnExprMap(colExprMap);
@@ -1213,7 +1214,7 @@ private static OpAttr genMapSideGB(OpAttr inputOpAf, GBInfo gbAttrs) throws Sema
Operator gbOp = OperatorFactory.getAndMakeChild(new GroupByDesc(GroupByDesc.Mode.HASH,
outputColNames, gbKeys, aggregations, false, gbAttrs.groupByMemoryUsage,
gbAttrs.memoryThreshold, gbAttrs.minReductionHashAggr, gbAttrs.minReductionHashAggrLowerBound,
- gbAttrs.grpSets, inclGrpID, groupingSetsPosition,
+ gbAttrs.hashAggrFlushPercent, gbAttrs.grpSets, inclGrpID, groupingSetsPosition,
gbAttrs.containsDistinctAggr), new RowSchema(colInfoLst), inputOpAf.inputs.get(0));
// 5. Setup Expr Col Map
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index ca9d599..cd19ce7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -5418,11 +5418,13 @@ private Operator genGroupByPlanGroupByOperator(QBParseInfo parseInfo,
.getFloatVar(conf, HiveConf.ConfVars.HIVE_MAP_AGGR_HASH_MIN_REDUCTION);
float minReductionHashAggrLowerBound = HiveConf
.getFloatVar(conf, ConfVars.HIVE_MAP_AGGR_HASH_MIN_REDUCTION_LOWER_BOUND);
+ float hashAggrFlushPercent = HiveConf
+ .getFloatVar(conf, ConfVars.HIVE_MAP_AGGR_HASH_FLUSH_SIZE_PERCENT);
Operator op = putOpInsertMap(OperatorFactory.getAndMakeChild(
new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations,
false, groupByMemoryUsage, memoryThreshold, minReductionHashAggr, minReductionHashAggrLowerBound,
- null, false, -1, numDistinctUDFs > 0),
+ hashAggrFlushPercent, null, false, -1, numDistinctUDFs > 0),
new RowSchema(groupByOutputRowResolver.getColumnInfos()),
input), groupByOutputRowResolver);
op.setColumnExprMap(colExprMap);
@@ -5687,6 +5689,8 @@ private Operator genGroupByPlanGroupByOperator1(QBParseInfo parseInfo,
.getFloatVar(conf, HiveConf.ConfVars.HIVE_MAP_AGGR_HASH_MIN_REDUCTION);
float minReductionHashAggrLowerBound = HiveConf
.getFloatVar(conf, ConfVars.HIVE_MAP_AGGR_HASH_MIN_REDUCTION_LOWER_BOUND);
+ float hashAggrFlushPercent = HiveConf
+ .getFloatVar(conf, ConfVars.HIVE_MAP_AGGR_HASH_FLUSH_SIZE_PERCENT);
// Nothing special needs to be done for grouping sets if
// this is the final group by operator, and multiple rows corresponding to the
@@ -5696,6 +5700,7 @@ private Operator genGroupByPlanGroupByOperator1(QBParseInfo parseInfo,
Operator op = putOpInsertMap(OperatorFactory.getAndMakeChild(
new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations,
groupByMemoryUsage, memoryThreshold, minReductionHashAggr, minReductionHashAggrLowerBound,
+ hashAggrFlushPercent,
groupingSets,
groupingSetsPresent && groupingSetsNeedAdditionalMRJob,
groupingSetsPosition, containsDistinctAggr),
@@ -5868,10 +5873,12 @@ private Operator genGroupByPlanMapGroupByOperator(QB qb,
.getFloatVar(conf, HiveConf.ConfVars.HIVE_MAP_AGGR_HASH_MIN_REDUCTION);
float minReductionHashAggrLowerBound = HiveConf
.getFloatVar(conf, ConfVars.HIVE_MAP_AGGR_HASH_MIN_REDUCTION_LOWER_BOUND);
+ float hashAggrFlushPercent = HiveConf
+ .getFloatVar(conf, ConfVars.HIVE_MAP_AGGR_HASH_FLUSH_SIZE_PERCENT);
Operator op = putOpInsertMap(OperatorFactory.getAndMakeChild(
new GroupByDesc(GroupByDesc.Mode.HASH, outputColumnNames, groupByKeys, aggregations,
false, groupByMemoryUsage, memoryThreshold, minReductionHashAggr, minReductionHashAggrLowerBound,
- groupingSetKeys, groupingSetsPresent, groupingSetsPosition, containsDistinctAggr),
+ hashAggrFlushPercent, groupingSetKeys, groupingSetsPresent, groupingSetsPosition, containsDistinctAggr),
new RowSchema(groupByOutputRowResolver.getColumnInfos()),
inputOperatorInfo), groupByOutputRowResolver);
op.setColumnExprMap(colExprMap);
@@ -6405,11 +6412,13 @@ private Operator genGroupByPlanGroupByOperator2MR(QBParseInfo parseInfo,
.getFloatVar(conf, HiveConf.ConfVars.HIVE_MAP_AGGR_HASH_MIN_REDUCTION);
float minReductionHashAggrLowerBound = HiveConf
.getFloatVar(conf, ConfVars.HIVE_MAP_AGGR_HASH_MIN_REDUCTION_LOWER_BOUND);
+ float hashAggrFlushPercent = HiveConf
+ .getFloatVar(conf, ConfVars.HIVE_MAP_AGGR_HASH_FLUSH_SIZE_PERCENT);
Operator op = putOpInsertMap(OperatorFactory.getAndMakeChild(
new GroupByDesc(GroupByDesc.Mode.FINAL, outputColumnNames, groupByKeys, aggregations,
false, groupByMemoryUsage, memoryThreshold, minReductionHashAggr, minReductionHashAggrLowerBound,
- null, false,
+ hashAggrFlushPercent, null, false,
groupingSetsPosition, containsDistinctAggr),
new RowSchema(groupByOutputRowResolver2.getColumnInfos()),
reduceSinkOperatorInfo2), groupByOutputRowResolver2);
@@ -10108,10 +10117,12 @@ private Operator genMapGroupByForSemijoin(List<ASTNode> fields, Operator<?> inpu
.getFloatVar(conf, HiveConf.ConfVars.HIVE_MAP_AGGR_HASH_MIN_REDUCTION);
float minReductionHashAggrLowerBound = HiveConf
.getFloatVar(conf, ConfVars.HIVE_MAP_AGGR_HASH_MIN_REDUCTION_LOWER_BOUND);
+ float hashAggrFlushPercent = HiveConf
+ .getFloatVar(conf, ConfVars.HIVE_MAP_AGGR_HASH_FLUSH_SIZE_PERCENT);
Operator op = putOpInsertMap(OperatorFactory.getAndMakeChild(
new GroupByDesc(GroupByDesc.Mode.HASH, outputColumnNames, groupByKeys, aggregations,
false, groupByMemoryUsage, memoryThreshold, minReductionHashAggr, minReductionHashAggrLowerBound,
- null, false, -1, false),
+ hashAggrFlushPercent, null, false, -1, false),
new RowSchema(groupByOutputRowResolver.getColumnInfos()),
input), groupByOutputRowResolver);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java
index 6fe0cd7..f8f1daa 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java
@@ -74,6 +74,7 @@ public static enum Mode {
private float memoryThreshold;
private float minReductionHashAggr;
private float minReductionHashAggrLowerBound;
+ private float hashAggrFlushPercent;
transient private boolean isDistinct;
private boolean dontResetAggrsDistinct;
@@ -89,13 +90,14 @@ public GroupByDesc(
final float memoryThreshold,
final float minReductionHashAggr,
final float minReductionHashAggrLowerBound,
+ final float hashAggrFlushPercent,
final List<Long> listGroupingSets,
final boolean groupingSetsPresent,
final int groupingSetsPosition,
final boolean isDistinct) {
this(mode, outputColumnNames, keys, aggregators,
false, groupByMemoryUsage, memoryThreshold, minReductionHashAggr, minReductionHashAggrLowerBound,
- listGroupingSets, groupingSetsPresent, groupingSetsPosition, isDistinct);
+ hashAggrFlushPercent, listGroupingSets, groupingSetsPresent, groupingSetsPosition, isDistinct);
}
public GroupByDesc(
@@ -108,6 +110,7 @@ public GroupByDesc(
final float memoryThreshold,
final float minReductionHashAggr,
final float minReductionHashAggrLowerBound,
+ final float hashAggrFlushPercent,
final List<Long> listGroupingSets,
final boolean groupingSetsPresent,
final int groupingSetsPosition,
@@ -121,6 +124,7 @@ public GroupByDesc(
this.memoryThreshold = memoryThreshold;
this.minReductionHashAggr = minReductionHashAggr;
this.minReductionHashAggrLowerBound = minReductionHashAggrLowerBound;
+ this.hashAggrFlushPercent = hashAggrFlushPercent;
this.listGroupingSets = listGroupingSets;
this.groupingSetsPresent = groupingSetsPresent;
this.groupingSetPosition = groupingSetsPosition;
@@ -336,6 +340,14 @@ public void setDistinct(boolean isDistinct) {
this.isDistinct = isDistinct;
}
+ public float getHashAggrFlushPercent() {
+ return hashAggrFlushPercent;
+ }
+
+ public void setHashAggrFlushPercent(float hashAggrFlushPercent) {
+ this.hashAggrFlushPercent = hashAggrFlushPercent;
+ }
+
@Override
public Object clone() {
List<String> outputColumnNames = new ArrayList<>();
@@ -348,7 +360,7 @@ public Object clone() {
listGroupingSets.addAll(this.listGroupingSets);
return new GroupByDesc(this.mode, outputColumnNames, keys, aggregators,
this.groupByMemoryUsage, this.memoryThreshold, this.minReductionHashAggr, this.minReductionHashAggrLowerBound,
- listGroupingSets, this.groupingSetsPresent,
+ this.hashAggrFlushPercent, listGroupingSets, this.groupingSetsPresent,
this.groupingSetPosition, this.isDistinct);
}