HIVE-28428: Performance regression in map-hash aggregation (Ryu Kobayashi, reviewed by Denys Kuzmenko)

Closes #5380
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index af3a757..51dc93d 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2012,6 +2012,8 @@ public static enum ConfVars {
         "Set to 1 to make sure hash aggregation is never turned off."),
     HIVE_MAP_AGGR_HASH_MIN_REDUCTION_LOWER_BOUND("hive.map.aggr.hash.min.reduction.lower.bound", (float) 0.4,
         "Lower bound of Hash aggregate reduction filter. See also: hive.map.aggr.hash.min.reduction"),
+    HIVE_MAP_AGGR_HASH_FLUSH_SIZE_PERCENT("hive.map.aggr.hash.flush.size.percent", (float) 0.1,
+        "Percentage of hash table entries to flush in map-side group aggregation."),
     HIVE_MAP_AGGR_HASH_MIN_REDUCTION_STATS_ADJUST("hive.map.aggr.hash.min.reduction.stats", true,
         "Whether the value for hive.map.aggr.hash.min.reduction should be set statically using stats estimates. \n" +
         "If this is enabled, the default value for hive.map.aggr.hash.min.reduction is only used as an upper-bound\n" +
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
index 326c351..88b7d54 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
@@ -119,6 +119,7 @@ public class GroupByOperator extends Operator<GroupByDesc> implements IConfigure
   private transient int groupbyMapAggrInterval;
   private transient long numRowsCompareHashAggr;
   private transient float minReductionHashAggr;
+  private transient float hashAggrFlushPercent;
 
   private transient int outputKeyLength;
 
@@ -372,6 +373,7 @@ protected void initializeOp(Configuration hconf) throws HiveException {
       // compare every groupbyMapAggrInterval rows
       numRowsCompareHashAggr = groupbyMapAggrInterval;
       minReductionHashAggr = conf.getMinReductionHashAggr();
+      hashAggrFlushPercent = conf.getHashAggrFlushPercent();
     }
 
     List<String> fieldNames = new ArrayList<String>(conf.getOutputColumnNames());
@@ -951,9 +953,6 @@ else if (o instanceof ByteArrayRef){
   private void flushHashTable(boolean complete) throws HiveException {
     countAfterReport = 0;
 
-    // Currently, the algorithm flushes 10% of the entries - this can be
-    // changed in the future
-
     if (complete) {
       for (Map.Entry<KeyWrapper, AggregationBuffer[]> entry : hashAggregations.entrySet()) {
         forward(entry.getKey().getKeyArray(), entry.getValue());
@@ -964,7 +963,8 @@ private void flushHashTable(boolean complete) throws HiveException {
     }
 
     int oldSize = hashAggregations.size();
-    LOG.info("Hash Tbl flush: #hash table = {}", oldSize);
+    int flushSize = (int) (oldSize * hashAggrFlushPercent);
+    LOG.trace("Hash Tbl flush: #hash table = {}, flush size = {}", oldSize, flushSize);
 
     Iterator<Map.Entry<KeyWrapper, AggregationBuffer[]>> iter = hashAggregations
         .entrySet().iterator();
@@ -974,8 +974,8 @@ private void flushHashTable(boolean complete) throws HiveException {
       forward(m.getKey().getKeyArray(), m.getValue());
       iter.remove();
       numDel++;
-      if (numDel * 10 >= oldSize) {
-        LOG.info("Hash Table flushed: new size = {}", hashAggregations.size());
+      if (numDel >= flushSize) {
+        LOG.trace("Hash Table flushed: new size = {}", hashAggregations.size());
         return;
       }
     }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java
index bc4bff8..7f473e0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java
@@ -502,6 +502,9 @@ private void generateEventOperatorPlan(DynamicListContext ctx, ParseContext pars
     float minReductionHashAggrLowerBound =
         HiveConf.getFloatVar(parseContext.getConf(),
             ConfVars.HIVE_MAP_AGGR_HASH_MIN_REDUCTION_LOWER_BOUND);
+    float hashAggrFlushPercent =
+        HiveConf.getFloatVar(parseContext.getConf(),
+            ConfVars.HIVE_MAP_AGGR_HASH_FLUSH_SIZE_PERCENT);
 
     List<ExprNodeDesc> groupByExprs = new ArrayList<ExprNodeDesc>();
     ExprNodeDesc groupByExpr =
@@ -511,7 +514,8 @@ private void generateEventOperatorPlan(DynamicListContext ctx, ParseContext pars
     GroupByDesc groupBy =
         new GroupByDesc(GroupByDesc.Mode.HASH, outputNames, groupByExprs,
             new ArrayList<AggregationDesc>(), false, groupByMemoryUsage, memoryThreshold,
-            minReductionHashAggr, minReductionHashAggrLowerBound, null, false, -1, true);
+            minReductionHashAggr, minReductionHashAggrLowerBound,
+            hashAggrFlushPercent, null, false, -1, true);
 
     List<ColumnInfo> groupbyColInfos = new ArrayList<ColumnInfo>();
     groupbyColInfos.add(new ColumnInfo(outputNames.get(0), key.getTypeInfo(), "", false));
@@ -623,6 +627,9 @@ private boolean generateSemiJoinOperatorPlan(DynamicListContext ctx, ParseContex
     float minReductionHashAggrLowerBound =
         HiveConf.getFloatVar(parseContext.getConf(),
             ConfVars.HIVE_MAP_AGGR_HASH_MIN_REDUCTION_LOWER_BOUND);
+    float hashAggrFlushPercent =
+        HiveConf.getFloatVar(parseContext.getConf(),
+            ConfVars.HIVE_MAP_AGGR_HASH_FLUSH_SIZE_PERCENT);
 
     // Add min/max and bloom filter aggregations
     List<ObjectInspector> aggFnOIs = new ArrayList<ObjectInspector>();
@@ -673,7 +680,7 @@ private boolean generateSemiJoinOperatorPlan(DynamicListContext ctx, ParseContex
     GroupByDesc groupBy = new GroupByDesc(GroupByDesc.Mode.HASH,
         gbOutputNames, new ArrayList<ExprNodeDesc>(), aggs, false,
         groupByMemoryUsage, memoryThreshold, minReductionHashAggr, minReductionHashAggrLowerBound,
-            null, false, -1, false);
+            hashAggrFlushPercent, null, false, -1, false);
 
     List<ColumnInfo> groupbyColInfos = new ArrayList<ColumnInfo>();
     groupbyColInfos.add(new ColumnInfo(gbOutputNames.get(0), key.getTypeInfo(), "", false));
@@ -771,7 +778,7 @@ private boolean generateSemiJoinOperatorPlan(DynamicListContext ctx, ParseContex
     GroupByDesc groupByDescFinal = new GroupByDesc(GroupByDesc.Mode.FINAL,
             gbOutputNames, new ArrayList<ExprNodeDesc>(), aggsFinal, false,
             groupByMemoryUsage, memoryThreshold, minReductionHashAggr, minReductionHashAggrLowerBound,
-            null, false, 0, false);
+            hashAggrFlushPercent, null, false, 0, false);
     GroupByOperator groupByOpFinal = (GroupByOperator)OperatorFactory.getAndMakeChild(
             groupByDescFinal, new RowSchema(rsOp.getSchema()), rsOp);
     groupByOpFinal.setColumnExprMap(new HashMap<String, ExprNodeDesc>());
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SemiJoinReductionMerge.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SemiJoinReductionMerge.java
index d3764dc..37cd180 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SemiJoinReductionMerge.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SemiJoinReductionMerge.java
@@ -403,9 +403,10 @@ private static GroupByOperator createGroupBy(SelectOperator selectOp, Operator<?
     float memoryThreshold = HiveConf.getFloatVar(hiveConf, HiveConf.ConfVars.HIVE_MAP_AGGR_MEMORY_THRESHOLD);
     float minReductionHashAggr = HiveConf.getFloatVar(hiveConf, HiveConf.ConfVars.HIVE_MAP_AGGR_HASH_MIN_REDUCTION);
     float minReductionHashAggrLowerBound = HiveConf.getFloatVar(hiveConf, HiveConf.ConfVars.HIVE_MAP_AGGR_HASH_MIN_REDUCTION_LOWER_BOUND);
+    float hashAggrFlushPercent = HiveConf.getFloatVar(hiveConf, HiveConf.ConfVars.HIVE_MAP_AGGR_HASH_FLUSH_SIZE_PERCENT);
     GroupByDesc groupBy =
         new GroupByDesc(gbMode, gbOutputNames, Collections.emptyList(), gbAggs, false, groupByMemoryUsage,
-            memoryThreshold, minReductionHashAggr, minReductionHashAggrLowerBound, null, false, -1, false);
+            memoryThreshold, minReductionHashAggr, minReductionHashAggrLowerBound, hashAggrFlushPercent, null, false, -1, false);
     groupBy.setColumnExprMap(Collections.emptyMap());
     return (GroupByOperator) OperatorFactory.getAndMakeChild(groupBy, new RowSchema(gbColInfos), parentOp);
   }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/opconventer/HiveGBOpConvUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/opconventer/HiveGBOpConvUtil.java
index 5142f4f..871c31b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/opconventer/HiveGBOpConvUtil.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/opconventer/HiveGBOpConvUtil.java
@@ -124,6 +124,7 @@ private static class GBInfo {
     float                             memoryThreshold;
     float                             minReductionHashAggr;
     float                             minReductionHashAggrLowerBound;
+    float                             hashAggrFlushPercent;
 
     private HIVEGBPHYSICALMODE        gbPhysicalPipelineMode;
 
@@ -834,7 +835,7 @@ private static OpAttr genReduceSideGB2(OpAttr inputOpAf, GBInfo gbInfo) throws S
     Operator rsGBOp2 = OperatorFactory.getAndMakeChild(new GroupByDesc(GroupByDesc.Mode.FINAL,
         outputColNames, gbKeys, aggregations, false, gbInfo.groupByMemoryUsage,
         gbInfo.memoryThreshold, gbInfo.minReductionHashAggr, gbInfo.minReductionHashAggrLowerBound,
-        null, false, groupingSetsPosition, gbInfo.containsDistinctAggr),
+        gbInfo.hashAggrFlushPercent, null, false, groupingSetsPosition, gbInfo.containsDistinctAggr),
         new RowSchema(colInfoLst), rs);
 
     rsGBOp2.setColumnExprMap(colExprMap);
@@ -973,7 +974,7 @@ private static OpAttr genReduceSideGB1(OpAttr inputOpAf, GBInfo gbInfo, boolean
         && !(gbInfo.gbPhysicalPipelineMode == HIVEGBPHYSICALMODE.MAP_SIDE_GB_SKEW_GBKEYS_OR_DIST_UDAF_PRESENT);
     Operator rsGBOp = OperatorFactory.getAndMakeChild(new GroupByDesc(gbMode, outputColNames,
         gbKeys, aggregations, gbInfo.groupByMemoryUsage, gbInfo.memoryThreshold,
-        gbInfo.minReductionHashAggr, gbInfo.minReductionHashAggrLowerBound,
+        gbInfo.minReductionHashAggr, gbInfo.minReductionHashAggrLowerBound, gbInfo.hashAggrFlushPercent,
         gbInfo.grpSets, includeGrpSetInGBDesc, groupingSetsColPosition, gbInfo.containsDistinctAggr),
         new RowSchema(colInfoLst), rs);
 
@@ -1116,7 +1117,7 @@ private static OpAttr genReduceSideGB1NoMapGB(OpAttr inputOpAf, GBInfo gbInfo,
 
     Operator rsGB1 = OperatorFactory.getAndMakeChild(new GroupByDesc(gbMode, outputColNames,
         gbKeys, aggregations, false, gbInfo.groupByMemoryUsage, gbInfo.minReductionHashAggrLowerBound,
-        gbInfo.memoryThreshold, gbInfo.minReductionHashAggr, null,
+        gbInfo.hashAggrFlushPercent, gbInfo.memoryThreshold, gbInfo.minReductionHashAggr, null,
         false, -1, numDistinctUDFs > 0), new RowSchema(colInfoLst), rs);
     rsGB1.setColumnExprMap(colExprMap);
 
@@ -1213,7 +1214,7 @@ private static OpAttr genMapSideGB(OpAttr inputOpAf, GBInfo gbAttrs) throws Sema
     Operator gbOp = OperatorFactory.getAndMakeChild(new GroupByDesc(GroupByDesc.Mode.HASH,
         outputColNames, gbKeys, aggregations, false, gbAttrs.groupByMemoryUsage,
         gbAttrs.memoryThreshold, gbAttrs.minReductionHashAggr, gbAttrs.minReductionHashAggrLowerBound,
-        gbAttrs.grpSets, inclGrpID, groupingSetsPosition,
+        gbAttrs.hashAggrFlushPercent, gbAttrs.grpSets, inclGrpID, groupingSetsPosition,
         gbAttrs.containsDistinctAggr), new RowSchema(colInfoLst), inputOpAf.inputs.get(0));
 
     // 5. Setup Expr Col Map
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index ca9d599..cd19ce7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -5418,11 +5418,13 @@ private Operator genGroupByPlanGroupByOperator(QBParseInfo parseInfo,
         .getFloatVar(conf, HiveConf.ConfVars.HIVE_MAP_AGGR_HASH_MIN_REDUCTION);
     float minReductionHashAggrLowerBound = HiveConf
         .getFloatVar(conf, ConfVars.HIVE_MAP_AGGR_HASH_MIN_REDUCTION_LOWER_BOUND);
+    float hashAggrFlushPercent = HiveConf
+        .getFloatVar(conf, ConfVars.HIVE_MAP_AGGR_HASH_FLUSH_SIZE_PERCENT);
 
     Operator op = putOpInsertMap(OperatorFactory.getAndMakeChild(
         new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations,
             false, groupByMemoryUsage, memoryThreshold, minReductionHashAggr, minReductionHashAggrLowerBound,
-                null, false, -1, numDistinctUDFs > 0),
+                hashAggrFlushPercent, null, false, -1, numDistinctUDFs > 0),
         new RowSchema(groupByOutputRowResolver.getColumnInfos()),
         input), groupByOutputRowResolver);
     op.setColumnExprMap(colExprMap);
@@ -5687,6 +5689,8 @@ private Operator genGroupByPlanGroupByOperator1(QBParseInfo parseInfo,
         .getFloatVar(conf, HiveConf.ConfVars.HIVE_MAP_AGGR_HASH_MIN_REDUCTION);
     float minReductionHashAggrLowerBound = HiveConf
             .getFloatVar(conf, ConfVars.HIVE_MAP_AGGR_HASH_MIN_REDUCTION_LOWER_BOUND);
+    float hashAggrFlushPercent = HiveConf
+        .getFloatVar(conf, ConfVars.HIVE_MAP_AGGR_HASH_FLUSH_SIZE_PERCENT);
 
     // Nothing special needs to be done for grouping sets if
     // this is the final group by operator, and multiple rows corresponding to the
@@ -5696,6 +5700,7 @@ private Operator genGroupByPlanGroupByOperator1(QBParseInfo parseInfo,
     Operator op = putOpInsertMap(OperatorFactory.getAndMakeChild(
         new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations,
             groupByMemoryUsage, memoryThreshold, minReductionHashAggr, minReductionHashAggrLowerBound,
+            hashAggrFlushPercent,
             groupingSets,
             groupingSetsPresent && groupingSetsNeedAdditionalMRJob,
             groupingSetsPosition, containsDistinctAggr),
@@ -5868,10 +5873,12 @@ private Operator genGroupByPlanMapGroupByOperator(QB qb,
         .getFloatVar(conf, HiveConf.ConfVars.HIVE_MAP_AGGR_HASH_MIN_REDUCTION);
     float minReductionHashAggrLowerBound = HiveConf
             .getFloatVar(conf, ConfVars.HIVE_MAP_AGGR_HASH_MIN_REDUCTION_LOWER_BOUND);
+    float hashAggrFlushPercent = HiveConf
+        .getFloatVar(conf, ConfVars.HIVE_MAP_AGGR_HASH_FLUSH_SIZE_PERCENT);
     Operator op = putOpInsertMap(OperatorFactory.getAndMakeChild(
         new GroupByDesc(GroupByDesc.Mode.HASH, outputColumnNames, groupByKeys, aggregations,
             false, groupByMemoryUsage, memoryThreshold, minReductionHashAggr, minReductionHashAggrLowerBound,
-            groupingSetKeys, groupingSetsPresent, groupingSetsPosition, containsDistinctAggr),
+            hashAggrFlushPercent, groupingSetKeys, groupingSetsPresent, groupingSetsPosition, containsDistinctAggr),
         new RowSchema(groupByOutputRowResolver.getColumnInfos()),
         inputOperatorInfo), groupByOutputRowResolver);
     op.setColumnExprMap(colExprMap);
@@ -6405,11 +6412,13 @@ private Operator genGroupByPlanGroupByOperator2MR(QBParseInfo parseInfo,
         .getFloatVar(conf, HiveConf.ConfVars.HIVE_MAP_AGGR_HASH_MIN_REDUCTION);
     float minReductionHashAggrLowerBound = HiveConf
             .getFloatVar(conf, ConfVars.HIVE_MAP_AGGR_HASH_MIN_REDUCTION_LOWER_BOUND);
+    float hashAggrFlushPercent = HiveConf
+        .getFloatVar(conf, ConfVars.HIVE_MAP_AGGR_HASH_FLUSH_SIZE_PERCENT);
 
     Operator op = putOpInsertMap(OperatorFactory.getAndMakeChild(
         new GroupByDesc(GroupByDesc.Mode.FINAL, outputColumnNames, groupByKeys, aggregations,
             false, groupByMemoryUsage, memoryThreshold, minReductionHashAggr, minReductionHashAggrLowerBound,
-                null, false,
+                hashAggrFlushPercent, null, false,
             groupingSetsPosition, containsDistinctAggr),
         new RowSchema(groupByOutputRowResolver2.getColumnInfos()),
         reduceSinkOperatorInfo2), groupByOutputRowResolver2);
@@ -10108,10 +10117,12 @@ private Operator genMapGroupByForSemijoin(List<ASTNode> fields, Operator<?> inpu
         .getFloatVar(conf, HiveConf.ConfVars.HIVE_MAP_AGGR_HASH_MIN_REDUCTION);
     float minReductionHashAggrLowerBound = HiveConf
             .getFloatVar(conf, ConfVars.HIVE_MAP_AGGR_HASH_MIN_REDUCTION_LOWER_BOUND);
+    float hashAggrFlushPercent = HiveConf
+        .getFloatVar(conf, ConfVars.HIVE_MAP_AGGR_HASH_FLUSH_SIZE_PERCENT);
     Operator op = putOpInsertMap(OperatorFactory.getAndMakeChild(
         new GroupByDesc(GroupByDesc.Mode.HASH, outputColumnNames, groupByKeys, aggregations,
             false, groupByMemoryUsage, memoryThreshold, minReductionHashAggr, minReductionHashAggrLowerBound,
-                null, false, -1, false),
+                hashAggrFlushPercent, null, false, -1, false),
         new RowSchema(groupByOutputRowResolver.getColumnInfos()),
         input), groupByOutputRowResolver);
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java
index 6fe0cd7..f8f1daa 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java
@@ -74,6 +74,7 @@ public static enum Mode {
   private float memoryThreshold;
   private float minReductionHashAggr;
   private float minReductionHashAggrLowerBound;
+  private float hashAggrFlushPercent;
   transient private boolean isDistinct;
   private boolean dontResetAggrsDistinct;
 
@@ -89,13 +90,14 @@ public GroupByDesc(
       final float memoryThreshold,
       final float minReductionHashAggr,
       final float minReductionHashAggrLowerBound,
+      final float hashAggrFlushPercent,
       final List<Long> listGroupingSets,
       final boolean groupingSetsPresent,
       final int groupingSetsPosition,
       final boolean isDistinct) {
     this(mode, outputColumnNames, keys, aggregators,
         false, groupByMemoryUsage, memoryThreshold, minReductionHashAggr, minReductionHashAggrLowerBound,
-            listGroupingSets, groupingSetsPresent, groupingSetsPosition, isDistinct);
+            hashAggrFlushPercent, listGroupingSets, groupingSetsPresent, groupingSetsPosition, isDistinct);
   }
 
   public GroupByDesc(
@@ -108,6 +110,7 @@ public GroupByDesc(
       final float memoryThreshold,
       final float minReductionHashAggr,
       final float minReductionHashAggrLowerBound,
+      final float hashAggrFlushPercent,
       final List<Long> listGroupingSets,
       final boolean groupingSetsPresent,
       final int groupingSetsPosition,
@@ -121,6 +124,7 @@ public GroupByDesc(
     this.memoryThreshold = memoryThreshold;
     this.minReductionHashAggr = minReductionHashAggr;
     this.minReductionHashAggrLowerBound = minReductionHashAggrLowerBound;
+    this.hashAggrFlushPercent = hashAggrFlushPercent;
     this.listGroupingSets = listGroupingSets;
     this.groupingSetsPresent = groupingSetsPresent;
     this.groupingSetPosition = groupingSetsPosition;
@@ -336,6 +340,14 @@ public void setDistinct(boolean isDistinct) {
     this.isDistinct = isDistinct;
   }
 
+  public float getHashAggrFlushPercent() {
+    return hashAggrFlushPercent;
+  }
+
+  public void setHashAggrFlushPercent(float hashAggrFlushPercent) {
+    this.hashAggrFlushPercent = hashAggrFlushPercent;
+  }
+
   @Override
   public Object clone() {
     List<String> outputColumnNames = new ArrayList<>();
@@ -348,7 +360,7 @@ public Object clone() {
     listGroupingSets.addAll(this.listGroupingSets);
     return new GroupByDesc(this.mode, outputColumnNames, keys, aggregators,
         this.groupByMemoryUsage, this.memoryThreshold, this.minReductionHashAggr, this.minReductionHashAggrLowerBound,
-        listGroupingSets, this.groupingSetsPresent,
+        this.hashAggrFlushPercent, listGroupingSets, this.groupingSetsPresent,
         this.groupingSetPosition, this.isDistinct);
   }