[CALCITE-6214] Remove DISTINCT in aggregate function if field is unique
diff --git a/core/src/main/java/org/apache/calcite/tools/RelBuilder.java b/core/src/main/java/org/apache/calcite/tools/RelBuilder.java
index 9d8d32b..ad58474 100644
--- a/core/src/main/java/org/apache/calcite/tools/RelBuilder.java
+++ b/core/src/main/java/org/apache/calcite/tools/RelBuilder.java
@@ -2430,7 +2430,11 @@
     RelNode r = frame.rel;
     final List<AggregateCall> aggregateCalls = new ArrayList<>();
     for (AggCallPlus aggCall : aggCalls) {
-      aggregateCalls.add(aggCall.aggregateCall(registrar, groupSet, r));
+      AggregateCall aggregateCall = aggCall.aggregateCall(registrar, groupSet, r);
+      if (groupSets.size() <= 1) {
+        aggregateCall = removeRedundantAggregateDistinct(aggregateCall, groupSet, r);
+      }
+      aggregateCalls.add(aggregateCall);
     }
 
     assert ImmutableBitSet.ORDERING.isStrictlyOrdered(groupSets) : groupSets;
@@ -2525,6 +2529,30 @@
     return project(projects.transform((i, name) -> aliasMaybe(field(i), name)));
   }
 
+  /**
+   * Removed redundant distinct if an input is already unique.
+   */
+  private AggregateCall removeRedundantAggregateDistinct(
+      AggregateCall aggregateCall,
+      ImmutableBitSet groupSet,
+      RelNode relNode) {
+    if (aggregateCall.isDistinct() && config.removeRedundantDistinct()) {
+      final RelMetadataQuery mq = relNode.getCluster().getMetadataQuery();
+      final List<Integer> argList = aggregateCall.getArgList();
+      final ImmutableBitSet distinctArg = ImmutableBitSet.builder()
+          .addAll(argList)
+          .build();
+      final ImmutableBitSet columns = groupSet.union(distinctArg);
+      final Boolean alreadyUnique =
+          mq.areColumnsUnique(relNode, columns);
+      if (alreadyUnique != null && alreadyUnique) {
+        // columns have been distinct or columns are primary keys
+        return aggregateCall.withDistinct(false);
+      }
+    }
+    return aggregateCall;
+  }
+
   /** Returns whether an input is already unique, and therefore a Project
    * can be created instead of an Aggregate.
    *
@@ -4902,6 +4930,18 @@
 
     /** Sets {@link #convertCorrelateToJoin()}. */
     Config withConvertCorrelateToJoin(boolean convertCorrelateToJoin);
+
+    /** Whether to remove the distinct that in aggregate if we know that the input is
+     * already unique; default false. */
+    @Value.Default
+    default boolean removeRedundantDistinct() {
+      return false;
+    }
+
+    /**
+     * Sets {@link #removeRedundantDistinct()}.
+     */
+    Config withRemoveRedundantDistinct(boolean removeRedundantDistinct);
   }
 
 }
diff --git a/core/src/test/java/org/apache/calcite/test/SqlToRelConverterTest.java b/core/src/test/java/org/apache/calcite/test/SqlToRelConverterTest.java
index 810a1b3..e62eeb7 100644
--- a/core/src/test/java/org/apache/calcite/test/SqlToRelConverterTest.java
+++ b/core/src/test/java/org/apache/calcite/test/SqlToRelConverterTest.java
@@ -5079,6 +5079,128 @@
   }
 
   /** Test case for
+   * <a href="https://issues.apache.org/jira/browse/CALCITE-6214">[CALCITE-6214]
+   * Remove DISTINCT in COUNT if field is unique</a>. */
+  @Test void testRemoveDistinctIfUnique1() {
+    final String sql = "SELECT\n"
+        + "    deptno,\n"
+        + "    COUNT(DISTINCT sal) as cds,\n"
+        + "    COUNT(sal) as cs,\n"
+        + "    SUM(DISTINCT sal) AS sds,\n"
+        + "    SUM(sal) AS ss\n"
+        + "FROM (\n"
+        + "    SELECT DISTINCT deptno, sal\n"
+        + "    FROM emp)\n"
+        + "GROUP BY deptno";
+    sql(sql)
+        .withConfig(c ->
+            c.addRelBuilderConfigTransform(c2 ->
+                c2.withRemoveRedundantDistinct(true))).ok();
+  }
+
+  /** Test case for
+   * <a href="https://issues.apache.org/jira/browse/CALCITE-6214">[CALCITE-6214]
+   * Remove DISTINCT in COUNT if field is unique</a>. */
+  @Test void testRemoveDistinctIfUnique2() {
+    final String sql = "SELECT\n"
+        + "    COUNT(DISTINCT sal) as cds,\n"
+        + "    COUNT(sal) as cs,\n"
+        + "    SUM(DISTINCT sal) AS sds,\n"
+        + "    SUM(sal) AS ss\n"
+        + "FROM (\n"
+        + "    SELECT deptno, 1 as sal\n"
+        + "    FROM emp"
+        + "    GROUP BY deptno)"
+        + "GROUP BY deptno\n";
+    sql(sql)
+        .withConfig(c ->
+            c.addRelBuilderConfigTransform(c2 ->
+                c2.withRemoveRedundantDistinct(true))).ok();
+  }
+
+  /** Test case for
+   * <a href="https://issues.apache.org/jira/browse/CALCITE-6214">[CALCITE-6214]
+   * Remove DISTINCT in COUNT if field is unique</a>. */
+  @Test void testRemoveDistinctIfUnique3() {
+    final String sql = "SELECT\n"
+        + "    COUNT(DISTINCT sal) as cds,\n"
+        + "    COUNT(sal) as cs,\n"
+        + "    SUM(DISTINCT sal) AS sds,\n"
+        + "    SUM(sal) AS ss\n"
+        + "FROM (\n"
+        + "    SELECT DISTINCT deptno, sal\n"
+        + "    FROM emp)\n";
+    sql(sql)
+        .withConfig(c ->
+            c.addRelBuilderConfigTransform(c2 ->
+                c2.withRemoveRedundantDistinct(true))).ok();
+  }
+
+  /** Test case for
+   * <a href="https://issues.apache.org/jira/browse/CALCITE-6214">[CALCITE-6214]
+   * Remove DISTINCT in COUNT if field is unique</a>. */
+  @Test void testRemoveDistinctIfUnique4() {
+    final String sql = "SELECT\n"
+        + "    COUNT(DISTINCT sal) as cds,\n"
+        + "    COUNT(sal) as cs,\n"
+        + "    SUM(DISTINCT sal) AS sds,\n"
+        + "    SUM(sal) AS ss\n"
+        + "FROM (\n"
+        + "    SELECT deptno, sal\n"
+        + "    FROM emp"
+        + "    GROUP BY deptno, sal)"
+        + "GROUP BY deptno\n";
+    // Default save redundant distinct
+    sql(sql).ok();
+  }
+
+  /** Test case for
+   * <a href="https://issues.apache.org/jira/browse/CALCITE-6214">[CALCITE-6214]
+   * Remove DISTINCT in COUNT if field is unique</a>. */
+  @Test void testRemoveDistinctIfUnique5() {
+    // empno is unique key
+    final String sql = "SELECT COUNT(DISTINCT empno)\n"
+        + "FROM emp\n";
+    // Default save redundant distinct
+    sql(sql)
+        .withConfig(c ->
+            c.addRelBuilderConfigTransform(c2 ->
+                c2.withRemoveRedundantDistinct(true))).ok();
+  }
+
+  /** Test case for
+   * <a href="https://issues.apache.org/jira/browse/CALCITE-6214">[CALCITE-6214]
+   * Remove DISTINCT in COUNT if field is unique</a>.
+   * See {@link org.apache.calcite.test.catalog.MockCatalogReaderSimple#registerTableEmp}
+   * */
+  @Test void testRemoveDistinctIfUnique6() {
+    // empno is unique key in emp table
+    final String sql = "SELECT deptno, COUNT(DISTINCT empno)\n"
+        + "FROM emp\n"
+        + "GROUP BY deptno";
+    // Default save redundant distinct
+    sql(sql)
+        .withConfig(c ->
+            c.addRelBuilderConfigTransform(c2 ->
+                c2.withRemoveRedundantDistinct(true))).ok();
+  }
+
+  /** Test case for
+   * <a href="https://issues.apache.org/jira/browse/CALCITE-6214">[CALCITE-6214]
+   * Remove DISTINCT in COUNT if field is unique</a>. */
+  @Test void testRemoveDistinctIfUnique7() {
+    // empno is unique key
+    final String sql = "SELECT deptno, COUNT(DISTINCT empno)\n"
+        + "FROM emp\n"
+        + "GROUP BY ROLLUP(deptno)";
+    // Default save redundant distinct
+    sql(sql)
+        .withConfig(c ->
+            c.addRelBuilderConfigTransform(c2 ->
+                c2.withRemoveRedundantDistinct(true))).ok();
+  }
+
+  /** Test case for
    * <a href="https://issues.apache.org/jira/browse/CALCITE-5089">[CALCITE-5089]
    * Allow GROUP BY ALL or DISTINCT set quantifier on GROUPING SETS</a>. */
   @Test void testGroupByDistinct() {
diff --git a/core/src/test/resources/org/apache/calcite/test/SqlToRelConverterTest.xml b/core/src/test/resources/org/apache/calcite/test/SqlToRelConverterTest.xml
index aeab9a0..1bf1556 100644
--- a/core/src/test/resources/org/apache/calcite/test/SqlToRelConverterTest.xml
+++ b/core/src/test/resources/org/apache/calcite/test/SqlToRelConverterTest.xml
@@ -6346,6 +6346,138 @@
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testRemoveDistinctIfUnique1">
+    <Resource name="sql">
+      <![CDATA[SELECT
+    deptno,
+    COUNT(DISTINCT sal) as cds,
+    COUNT(sal) as cs,
+    SUM(DISTINCT sal) AS sds,
+    SUM(sal) AS ss
+FROM (
+    SELECT DISTINCT deptno, sal
+    FROM emp)
+GROUP BY deptno]]>
+    </Resource>
+    <Resource name="plan">
+      <![CDATA[
+LogicalProject(DEPTNO=[$0], CDS=[$1], CS=[$2], SDS=[$3], SS=[$3])
+  LogicalAggregate(group=[{0}], CDS=[COUNT($1)], CS=[COUNT()], SDS=[SUM($1)])
+    LogicalAggregate(group=[{0, 1}])
+      LogicalProject(DEPTNO=[$7], SAL=[$5])
+        LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testRemoveDistinctIfUnique2">
+    <Resource name="sql">
+      <![CDATA[SELECT
+    COUNT(DISTINCT sal) as cds,
+    COUNT(sal) as cs,
+    SUM(DISTINCT sal) AS sds,
+    SUM(sal) AS ss
+FROM (
+    SELECT deptno, 1 as sal
+    FROM emp    GROUP BY deptno)GROUP BY deptno
+]]>
+    </Resource>
+    <Resource name="plan">
+      <![CDATA[
+LogicalProject(CDS=[$1], CS=[$2], SDS=[$3], SS=[$3])
+  LogicalAggregate(group=[{0}], CDS=[COUNT($1)], CS=[COUNT()], SDS=[SUM($1)])
+    LogicalProject(DEPTNO=[$0], SAL=[1])
+      LogicalAggregate(group=[{0}])
+        LogicalProject(DEPTNO=[$7])
+          LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testRemoveDistinctIfUnique3">
+    <Resource name="sql">
+      <![CDATA[SELECT
+    COUNT(DISTINCT sal) as cds,
+    COUNT(sal) as cs,
+    SUM(DISTINCT sal) AS sds,
+    SUM(sal) AS ss
+FROM (
+    SELECT DISTINCT deptno, sal
+    FROM emp)
+]]>
+    </Resource>
+    <Resource name="plan">
+      <![CDATA[
+LogicalAggregate(group=[{}], CDS=[COUNT(DISTINCT $0)], CS=[COUNT()], SDS=[SUM(DISTINCT $0)], SS=[SUM($0)])
+  LogicalProject(SAL=[$1])
+    LogicalAggregate(group=[{0, 1}])
+      LogicalProject(DEPTNO=[$7], SAL=[$5])
+        LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testRemoveDistinctIfUnique4">
+    <Resource name="sql">
+      <![CDATA[SELECT
+    COUNT(DISTINCT sal) as cds,
+    COUNT(sal) as cs,
+    SUM(DISTINCT sal) AS sds,
+    SUM(sal) AS ss
+FROM (
+    SELECT deptno, sal
+    FROM emp    GROUP BY deptno, sal)GROUP BY deptno
+]]>
+    </Resource>
+    <Resource name="plan">
+      <![CDATA[
+LogicalProject(CDS=[$1], CS=[$2], SDS=[$3], SS=[$4])
+  LogicalAggregate(group=[{0}], CDS=[COUNT(DISTINCT $1)], CS=[COUNT()], SDS=[SUM(DISTINCT $1)], SS=[SUM($1)])
+    LogicalAggregate(group=[{0, 1}])
+      LogicalProject(DEPTNO=[$7], SAL=[$5])
+        LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testRemoveDistinctIfUnique5">
+    <Resource name="sql">
+      <![CDATA[SELECT COUNT(DISTINCT empno)
+FROM emp
+]]>
+    </Resource>
+    <Resource name="plan">
+      <![CDATA[
+LogicalAggregate(group=[{}], EXPR$0=[COUNT($0)])
+  LogicalProject(EMPNO=[$0])
+    LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testRemoveDistinctIfUnique6">
+    <Resource name="sql">
+      <![CDATA[SELECT deptno, COUNT(DISTINCT empno)
+FROM emp
+GROUP BY deptno]]>
+    </Resource>
+    <Resource name="plan">
+      <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[COUNT($1)])
+  LogicalProject(DEPTNO=[$7], EMPNO=[$0])
+    LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testRemoveDistinctIfUnique7">
+    <Resource name="sql">
+      <![CDATA[SELECT deptno, COUNT(DISTINCT empno)
+FROM emp
+GROUP BY ROLLUP(deptno)]]>
+    </Resource>
+    <Resource name="plan">
+      <![CDATA[
+LogicalAggregate(group=[{0}], groups=[[{0}, {}]], EXPR$1=[COUNT(DISTINCT $1)])
+  LogicalProject(DEPTNO=[$7], EMPNO=[$0])
+    LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testRollup">
     <Resource name="sql">
       <![CDATA[select 1