DRILL-8013: Drill attempts to push "$SUM0" to JDBC storage plugin for AVG (#2521)

diff --git a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithPostgres.java b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithPostgres.java
index b31cff7..dd11ce5 100644
--- a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithPostgres.java
+++ b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithPostgres.java
@@ -313,4 +313,16 @@
       .exclude("Limit\\(")
       .match();
   }
+
+  @Test // DRILL-8013
+  public void testAvgFunction() throws Exception {
+    String query = "select avg(person_id) `avg` from pg.`public`.person";
+
+    testBuilder()
+      .sqlQuery(query)
+      .unOrdered()
+      .baselineColumns("avg")
+      .baselineValues(2.75)
+      .go();
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillReduceAggregatesRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillReduceAggregatesRule.java
index 0097665..062fda0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillReduceAggregatesRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillReduceAggregatesRule.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.planner.logical;
 
+import org.apache.drill.exec.planner.sql.DrillCalciteSqlSumEmptyIsZeroAggFunctionWrapper;
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
@@ -340,7 +341,8 @@
             sumType,
             sumType.isNullable() || nGroups == 0);
     SqlAggFunction sumAgg =
-        new DrillCalciteSqlAggFunctionWrapper(new SqlSumEmptyIsZeroAggFunction(), sumType);
+        new DrillCalciteSqlSumEmptyIsZeroAggFunctionWrapper(
+          new SqlSumEmptyIsZeroAggFunction(), sumType);
     AggregateCall sumCall = AggregateCall.create(sumAgg, oldCall.isDistinct(),
         oldCall.isApproximate(), oldCall.getArgList(), -1, sumType, null);
     final SqlCountAggFunction countAgg = (SqlCountAggFunction) SqlStdOperatorTable.COUNT;
@@ -437,7 +439,7 @@
           typeFactory.createTypeWithNullability(
               oldCall.getType(), argType.isNullable());
     }
-    sumZeroAgg = new DrillCalciteSqlAggFunctionWrapper(
+    sumZeroAgg = new DrillCalciteSqlSumEmptyIsZeroAggFunctionWrapper(
         new SqlSumEmptyIsZeroAggFunction(), sumType);
     AggregateCall sumZeroCall = AggregateCall.create(sumZeroAgg, oldCall.isDistinct(),
         oldCall.isApproximate(), oldCall.getArgList(), -1, sumType, null);
@@ -713,7 +715,7 @@
           final RelDataType argType = oldAggregateCall.getType();
           final RelDataType sumType = oldAggRel.getCluster().getTypeFactory()
               .createTypeWithNullability(argType, argType.isNullable());
-          final SqlAggFunction sumZeroAgg = new DrillCalciteSqlAggFunctionWrapper(
+          final SqlAggFunction sumZeroAgg = new DrillCalciteSqlSumEmptyIsZeroAggFunctionWrapper(
               new SqlSumEmptyIsZeroAggFunction(), sumType);
           AggregateCall sumZeroCall =
               AggregateCall.create(
@@ -775,7 +777,7 @@
             final RelDataType argType = rexWinAggCall.getType();
             final RelDataType sumType = oldWinRel.getCluster().getTypeFactory()
                 .createTypeWithNullability(argType, argType.isNullable());
-            final SqlAggFunction sumZeroAgg = new DrillCalciteSqlAggFunctionWrapper(
+            final SqlAggFunction sumZeroAgg = new DrillCalciteSqlSumEmptyIsZeroAggFunctionWrapper(
                 new SqlSumEmptyIsZeroAggFunction(), sumType);
             final Window.RexWinAggCall sumZeroCall =
                 new Window.RexWinAggCall(
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillCalciteSqlSumEmptyIsZeroAggFunctionWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillCalciteSqlSumEmptyIsZeroAggFunctionWrapper.java
new file mode 100644
index 0000000..7f221a0
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillCalciteSqlSumEmptyIsZeroAggFunctionWrapper.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.sql;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlCallBinding;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlOperatorBinding;
+import org.apache.calcite.sql.SqlSyntax;
+import org.apache.calcite.sql.fun.SqlSumEmptyIsZeroAggFunction;
+import org.apache.calcite.sql.type.SqlReturnTypeInference;
+import org.apache.calcite.sql.validate.SqlMonotonicity;
+import org.apache.calcite.sql.validate.SqlValidator;
+import org.apache.calcite.sql.validate.SqlValidatorScope;
+import org.apache.calcite.util.Litmus;
+import org.apache.calcite.util.Util;
+import org.apache.drill.exec.expr.fn.DrillFuncHolder;
+
+import java.util.List;
+
+/**
+ * This class serves as a wrapper class for {@link SqlSumEmptyIsZeroAggFunction}
+ * with the same goal as {@link DrillCalciteSqlAggFunctionWrapper}
+ * but extends {@link SqlSumEmptyIsZeroAggFunction} to allow using
+ * additional Calcite functionality designated for {@link SqlSumEmptyIsZeroAggFunction}.
+ */
+public class DrillCalciteSqlSumEmptyIsZeroAggFunctionWrapper
+  extends SqlSumEmptyIsZeroAggFunction implements DrillCalciteSqlWrapper {
+
+  private final SqlAggFunction operator;
+
+  private final SqlReturnTypeInference sqlReturnTypeInference;
+
+  private DrillCalciteSqlSumEmptyIsZeroAggFunctionWrapper(
+    SqlSumEmptyIsZeroAggFunction sqlAggFunction,
+    SqlReturnTypeInference sqlReturnTypeInference) {
+    this.sqlReturnTypeInference = sqlReturnTypeInference;
+    this.operator = sqlAggFunction;
+  }
+
+  public DrillCalciteSqlSumEmptyIsZeroAggFunctionWrapper(
+    SqlSumEmptyIsZeroAggFunction sqlAggFunction,
+    List<DrillFuncHolder> functions) {
+    this(sqlAggFunction,
+      TypeInferenceUtils.getDrillSqlReturnTypeInference(
+        sqlAggFunction.getName(),
+        functions));
+  }
+
+  public DrillCalciteSqlSumEmptyIsZeroAggFunctionWrapper(
+    SqlSumEmptyIsZeroAggFunction sqlAggFunction,
+    RelDataType relDataType) {
+    this(sqlAggFunction, opBinding -> relDataType);
+  }
+
+  @Override
+  public SqlOperator getOperator() {
+    return operator;
+  }
+
+  @Override
+  public SqlReturnTypeInference getReturnTypeInference() {
+    return this.sqlReturnTypeInference;
+  }
+
+  @Override
+  public RelDataType inferReturnType(SqlOperatorBinding opBinding) {
+    if (this.sqlReturnTypeInference != null) {
+      RelDataType returnType = this.sqlReturnTypeInference.inferReturnType(opBinding);
+      if (returnType == null) {
+        throw new IllegalArgumentException(String.format(
+          "Cannot infer return type for %s; operand types: %s",
+          opBinding.getOperator(), opBinding.collectOperandTypes()));
+      } else {
+        return returnType;
+      }
+    } else {
+      throw Util.needToImplement(this);
+    }
+  }
+
+  @Override
+  public boolean validRexOperands(int count, Litmus litmus) {
+    return true;
+  }
+
+  @Override
+  public String getAllowedSignatures(String opNameToUse) {
+    return operator.getAllowedSignatures(opNameToUse);
+  }
+
+  @Override
+  public boolean isAggregator() {
+    return operator.isAggregator();
+  }
+
+  @Override
+  public boolean allowsFraming() {
+    return operator.allowsFraming();
+  }
+
+  @Override
+  public SqlMonotonicity getMonotonicity(SqlOperatorBinding call) {
+    return operator.getMonotonicity(call);
+  }
+
+  @Override
+  public boolean isDeterministic() {
+    return operator.isDeterministic();
+  }
+
+  @Override
+  public boolean isDynamicFunction() {
+    return operator.isDynamicFunction();
+  }
+
+  @Override
+  public boolean requiresDecimalExpansion() {
+    return operator.requiresDecimalExpansion();
+  }
+
+  @Override
+  public boolean argumentMustBeScalar(int ordinal) {
+    return operator.argumentMustBeScalar(ordinal);
+  }
+
+  @Override
+  public boolean checkOperandTypes(
+    SqlCallBinding callBinding,
+    boolean throwOnFailure) {
+    return true;
+  }
+
+  @Override
+  public SqlSyntax getSyntax() {
+    return operator.getSyntax();
+  }
+
+  @Override
+  public List<String> getParamNames() {
+    return operator.getParamNames();
+  }
+
+  @Override
+  public String getSignatureTemplate(final int operandsCount) {
+    return operator.getSignatureTemplate(operandsCount);
+  }
+
+  @Override
+  public RelDataType deriveType(
+    SqlValidator validator,
+    SqlValidatorScope scope,
+    SqlCall call) {
+    return operator.deriveType(validator, scope, call);
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java
index d402257..0b112eb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.planner.sql;
 
+import org.apache.calcite.sql.fun.SqlSumEmptyIsZeroAggFunction;
 import org.apache.calcite.sql.validate.SqlNameMatcher;
 import org.apache.drill.shaded.guava.com.google.common.collect.ArrayListMultimap;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
@@ -169,7 +170,11 @@
   private void populateWrappedCalciteOperators() {
     for (SqlOperator calciteOperator : inner.getOperatorList()) {
       final SqlOperator wrapper;
-      if (calciteOperator instanceof SqlAggFunction) {
+      if (calciteOperator instanceof SqlSumEmptyIsZeroAggFunction) {
+        wrapper = new DrillCalciteSqlSumEmptyIsZeroAggFunctionWrapper(
+          (SqlSumEmptyIsZeroAggFunction) calciteOperator,
+          getFunctionListWithInference(calciteOperator.getName()));
+      } else if (calciteOperator instanceof SqlAggFunction) {
         wrapper = new DrillCalciteSqlAggFunctionWrapper((SqlAggFunction) calciteOperator,
             getFunctionListWithInference(calciteOperator.getName()));
       } else if (calciteOperator instanceof SqlFunction) {