DRILL-7491: Incorrect count() returned for complex types in parquet

closes #1955
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScanWithMetadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScanWithMetadata.java
index 40ab594..9293a8d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScanWithMetadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScanWithMetadata.java
@@ -167,29 +167,46 @@
    */
   @Override
   public long getColumnValueCount(SchemaPath column) {
-    long tableRowCount, colNulls;
-    Long nulls;
     ColumnStatistics<?> columnStats = getTableMetadata().getColumnStatistics(column);
-    ColumnStatistics<?> nonInterestingColStats = null;
-    if (columnStats == null) {
-      nonInterestingColStats = getNonInterestingColumnsMetadata().getColumnStatistics(column);
-    }
+    ColumnStatistics<?> nonInterestingColStats = columnStats == null
+        ? getNonInterestingColumnsMetadata().getColumnStatistics(column) : null;
 
+    long tableRowCount;
     if (columnStats != null) {
       tableRowCount = TableStatisticsKind.ROW_COUNT.getValue(getTableMetadata());
     } else if (nonInterestingColStats != null) {
       tableRowCount = TableStatisticsKind.ROW_COUNT.getValue(getNonInterestingColumnsMetadata());
+      columnStats = nonInterestingColStats;
+    } else if (hasNestedStatsForColumn(column, getTableMetadata())
+        || hasNestedStatsForColumn(column, getNonInterestingColumnsMetadata())) {
+      // When statistics for nested field exists, this is complex column which is present in table.
+      // But its nested fields statistics can't be used to extract tableRowCount for this column.
+      // So NO_COLUMN_STATS returned here to avoid problems described in DRILL-7491.
+      return Statistic.NO_COLUMN_STATS;
     } else {
       return 0; // returns 0 if the column doesn't exist in the table.
     }
 
-    columnStats = columnStats != null ? columnStats : nonInterestingColStats;
-    nulls = ColumnStatisticsKind.NULLS_COUNT.getFrom(columnStats);
-    colNulls = nulls != null ? nulls : Statistic.NO_COLUMN_STATS;
+    Long nulls = ColumnStatisticsKind.NULLS_COUNT.getFrom(columnStats);
+    if (nulls == null || Statistic.NO_COLUMN_STATS == nulls || Statistic.NO_COLUMN_STATS == tableRowCount) {
+      return Statistic.NO_COLUMN_STATS;
+    } else {
+      return tableRowCount - nulls;
+    }
+  }
 
-    return Statistic.NO_COLUMN_STATS == tableRowCount
-            || Statistic.NO_COLUMN_STATS == colNulls
-            ? Statistic.NO_COLUMN_STATS : tableRowCount - colNulls;
+  /**
+   * For complex columns, stats may be present only for nested fields. For example, a column path is `a`,
+   * but stats present for `a`.`b`. So before making a decision that column is absent, the case needs
+   * to be tested.
+   *
+   * @param column   parent column path
+   * @param metadata metadata with column statistics
+   * @return whether stats exists for nested fields
+   */
+  private boolean hasNestedStatsForColumn(SchemaPath column, Metadata metadata) {
+    return metadata.getColumnsStatistics().keySet().stream()
+        .anyMatch(path -> path.contains(column));
   }
 
   @Override
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/planner/logical/TestConvertCountToDirectScan.java b/exec/java-exec/src/test/java/org/apache/drill/exec/planner/logical/TestConvertCountToDirectScan.java
index ae33f0a..8a5202b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/planner/logical/TestConvertCountToDirectScan.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/planner/logical/TestConvertCountToDirectScan.java
@@ -287,6 +287,24 @@
   }
 
   @Test
+  public void textConvertAbsentColumn() throws Exception {
+    String sql = "select count(abc) as cnt from cp.`tpch/nation.parquet`";
+
+    queryBuilder()
+        .sql(sql)
+        .planMatcher()
+        .include("DynamicPojoRecordReader")
+        .match();
+
+    testBuilder()
+        .sqlQuery(sql)
+        .unOrdered()
+        .baselineColumns("cnt")
+        .baselineValues(0L)
+        .go();
+  }
+
+  @Test
   public void testCountsWithWildCard() throws Exception {
     run("use dfs.tmp");
     String tableName = "parquet_table_counts";
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetComplex.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetComplex.java
index 54e8252..6e02004 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetComplex.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetComplex.java
@@ -861,4 +861,18 @@
         )
         .go();
   }
+
+  @Test // DRILL-7491
+  public void testCountOnComplexTypes() throws Exception {
+    String query = "SELECT " +
+        "COUNT(c13) cnt13, COUNT(c14) cnt14, " +
+        "COUNT(c15) cnt15, COUNT(c16) cnt16 " +
+        "FROM cp.`parquet/hive_all/hive_alltypes.parquet`";
+    testBuilder()
+        .sqlQuery(query)
+        .unOrdered()
+        .baselineColumns("cnt13", "cnt14", "cnt15", "cnt16")
+        .baselineValues(3L, 0L, 3L, 3L)
+        .go();
+  }
 }
diff --git a/exec/java-exec/src/test/resources/parquet/hive_all/hive_alltypes.parquet b/exec/java-exec/src/test/resources/parquet/hive_all/hive_alltypes.parquet
new file mode 100644
index 0000000..f3b3063
--- /dev/null
+++ b/exec/java-exec/src/test/resources/parquet/hive_all/hive_alltypes.parquet
Binary files differ
diff --git a/logical/src/main/java/org/apache/drill/common/expression/SchemaPath.java b/logical/src/main/java/org/apache/drill/common/expression/SchemaPath.java
index 933e4ff..8ec81ea 100644
--- a/logical/src/main/java/org/apache/drill/common/expression/SchemaPath.java
+++ b/logical/src/main/java/org/apache/drill/common/expression/SchemaPath.java
@@ -20,6 +20,7 @@
 import java.io.IOException;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.Objects;
 
 import org.apache.drill.common.expression.PathSegment.ArraySegment;
 import org.apache.drill.common.expression.PathSegment.NameSegment;
@@ -334,36 +335,13 @@
 
   @Override
   public boolean equals(Object obj) {
-    if (this == obj) {
-      return true;
-    }
-    if (obj == null) {
-      return false;
-    }
-    if (!(obj instanceof SchemaPath)) {
-      return false;
-    }
-
-    SchemaPath other = (SchemaPath) obj;
-    if (rootSegment == null) {
-      return (other.rootSegment == null);
-    }
-    return rootSegment.equals(other.rootSegment);
+    return this == obj || obj instanceof SchemaPath
+        && Objects.equals(rootSegment, ((SchemaPath) obj).rootSegment);
   }
 
-  public boolean contains(Object obj) {
-    if (this == obj) {
-      return true;
-    }
-    if (obj == null) {
-      return false;
-    }
-    if (!(obj instanceof SchemaPath)) {
-      return false;
-    }
-
-    SchemaPath other = (SchemaPath) obj;
-    return rootSegment == null || rootSegment.contains(other.rootSegment);
+  public boolean contains(SchemaPath path) {
+    return this == path || path != null
+        && (rootSegment == null || rootSegment.contains(path.rootSegment));
   }
 
   @Override