[CALCITE-4817] Expand SubstitutionVisitor of Aggregate with max/min, which column is the group by list of target (xurenhe)

Close #2556
diff --git a/core/src/main/java/org/apache/calcite/plan/SubstitutionVisitor.java b/core/src/main/java/org/apache/calcite/plan/SubstitutionVisitor.java
index c0899fa..25423b4 100644
--- a/core/src/main/java/org/apache/calcite/plan/SubstitutionVisitor.java
+++ b/core/src/main/java/org/apache/calcite/plan/SubstitutionVisitor.java
@@ -61,6 +61,7 @@
 import org.apache.calcite.util.ControlFlowException;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.Litmus;
+import org.apache.calcite.util.Optionality;
 import org.apache.calcite.util.Pair;
 import org.apache.calcite.util.Util;
 import org.apache.calcite.util.mapping.Mapping;
@@ -1935,12 +1936,15 @@
       }
       final List<AggregateCall> aggregateCalls = new ArrayList<>();
       for (AggregateCall aggregateCall : query.aggCalls) {
-        if (aggregateCall.isDistinct() && aggregateCall.getArgList().size() == 1) {
+        final SqlAggFunction aggregation = aggregateCall.getAggregation();
+        if ((aggregateCall.isDistinct()
+            || aggregation.getDistinctOptionality() == Optionality.IGNORED)
+            && aggregateCall.getArgList().size() == 1) {
           final int aggIndex = aggregateCall.getArgList().get(0);
           final int newIndex = targetGroupByIndexList.indexOf(aggIndex);
           if (newIndex >= 0) {
             aggregateCalls.add(
-                AggregateCall.create(aggregateCall.getAggregation(),
+                AggregateCall.create(aggregation,
                     aggregateCall.isDistinct(), aggregateCall.isApproximate(),
                     aggregateCall.ignoreNulls(),
                     ImmutableList.of(newIndex), -1, aggregateCall.distinctKeys,
@@ -1956,7 +1960,7 @@
         }
         // When an SqlAggFunction does not support roll up, it will return null, which means that
         // it cannot do secondary aggregation and the materialization recognition will fail.
-        final SqlAggFunction aggFunction = aggregateCall.getAggregation().getRollup();
+        final SqlAggFunction aggFunction = aggregation.getRollup();
         if (aggFunction == null) {
           return null;
         }
diff --git a/core/src/test/java/org/apache/calcite/test/MaterializedViewSubstitutionVisitorTest.java b/core/src/test/java/org/apache/calcite/test/MaterializedViewSubstitutionVisitorTest.java
index dede0a7..4f54ed8 100644
--- a/core/src/test/java/org/apache/calcite/test/MaterializedViewSubstitutionVisitorTest.java
+++ b/core/src/test/java/org/apache/calcite/test/MaterializedViewSubstitutionVisitorTest.java
@@ -1659,6 +1659,47 @@
     sql(mv, query).ok();
   }
 
+  @Test void testQueryNoDistinctOptionalityAggCallColInTargetGroupBy1() {
+    final String mv = ""
+        + "select \"name\", \"deptno\" "
+        + "from \"emps\" group by \"name\", \"deptno\"";
+    final String query = ""
+        + "select \"name\", min(\"deptno\")\n"
+        + "from \"emps\" group by \"name\"";
+    sql(mv, query).ok();
+  }
+
+  @Test void testQueryNoDistinctOptionalityAggCallColInTargetGroupBy2() {
+    final String mv = ""
+        + "select \"name\", \"commission\", \"deptno\"\n"
+        + "from \"emps\" group by \"name\", \"commission\", \"deptno\"";
+    final String query = ""
+        + "select \"name\", \"commission\", max(\"deptno\") as cnt\n"
+        + "from \"emps\" group by \"name\", \"commission\"";
+    sql(mv, query).ok();
+  }
+
+  @Test void testQueryNoDistinctOptionalityAggCallColInTargetGroupBy3() {
+    final String mv = ""
+        + "select \"name\", \"deptno\", \"empid\", count(\"commission\")\n"
+        + "from \"emps\" group by \"name\", \"deptno\", \"empid\"";
+    final String query = ""
+        + "select \"name\", max(\"deptno\"), count(distinct \"empid\"), count"
+        + "(\"commission\")\n"
+        + "from \"emps\" group by \"name\"";
+    sql(mv, query).ok();
+  }
+
+  @Test void testQueryNoDistinctOptionalityAggCallColInTargetGroupBy4() {
+    final String mv = ""
+        + "select \"name\", \"deptno\", \"empid\"\n"
+        + "from \"emps\" group by \"name\", \"deptno\", \"empid\"";
+    final String query = ""
+        + "select \"name\", min(\"deptno\")\n"
+        + "from \"emps\" group by \"name\"";
+    sql(mv, query).ok();
+  }
+
   @Test void testRexPredicate() {
     final String mv = ""
         + "select \"name\"\n"