IMPALA-9744: Treat corrupt table stats as missing to avoid bad plans

This work addresses the current limitation in computing the total row
count for a Hive table in a scan. The row count can be incorrectly
computed as 0, even though there exists data in the Hive table. This
is the stats corruption at table level. Similar stats corruption
exists for a partition. The row count of a table or a partition
sometime can also be -1 which indicates a missing stats situation.

In the fix, as long as no partition in a Hive table exhibits any
missing or corrupt stats, the total row count for the table is computed
from the row counts in all partitions. Otherwise, Impala looks at
the table level stats particularly the table row count.

In addition, if the table stats is missing or corrupted, Impala
estimates a row count for the table, if feasible. This row count is
the sum of the row count from the partitions with good stats, and
an estimation of the number of rows in the partitions with missing or
corrupt stats. Such estimation also applies when some partition
has corrupt stats.

One way to observe the fix is through the explain of queries scanning
Hive tables with missing or corrupted stats. The cardinality for any
full scan should be a positive value (i.e. the estimated row count),
instead of 'unavailable'.  At the beginning of the explain output,
that table is still listed in the WARNING section for potentially
corrupt table statistics.

Testing:
1. Ran unit tests with queries documented in the case against Hive
   tables with the following configrations:
   a. No stats corruption in any partitions
   b. Stats corruption in some partitions
   c. Stats corruption in all partitions
2. Added two new tests in test_compute_stats.py:
   a. test_corrupted_stats_in_partitioned_Hive_tables
   b. test_corrupted_stats_in_unpartitioned_Hive_tables
3. Fixed failures in corrupt-stats.test
4. Ran "core" test

Change-Id: I9f4c64616ff7c0b6d5a48f2b5331325feeff3576
Reviewed-on: http://gerrit.cloudera.org:8080/16098
Reviewed-by: Sahil Takiar <stakiar@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index 81b03e0..91a5e75 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -1150,9 +1150,11 @@
   /**
    * Computes and returns the number of rows scanned based on the per-partition row count
    * stats and/or the table-level row count stats, depending on which of those are
-   * available, and whether the table is partitioned. Partitions without stats are
-   * ignored as long as there is at least one partition with stats. Otherwise,
-   * we fall back to table-level stats even for partitioned tables.
+   * available. Partition stats are used as long as they are neither missing nor
+   * corrupted. Otherwise, we fall back to table-level stats even for partitioned tables.
+   * We further estimate the row count if the table-level stats is missing or corrupted,
+   * or some partitions are with corrupt stats. The estimation is done only for those
+   * partitions with corrupt stats.
    *
    * Sets these members:
    * numPartitionsWithNumRows_, partitionNumRows_, hasCorruptTableStats_.
@@ -1161,34 +1163,43 @@
     numPartitionsWithNumRows_ = 0;
     partitionNumRows_ = -1;
     hasCorruptTableStats_ = false;
-    if (tbl_.getNumClusteringCols() > 0) {
-      for (FeFsPartition p: partitions_) {
-        // Check for corrupt partition stats
-        long partNumRows = p.getNumRows();
-        if (partNumRows < -1  || (partNumRows == 0 && p.getSize() > 0))  {
-          hasCorruptTableStats_ = true;
-        }
-        // Ignore partitions with missing stats in the hope they don't matter
-        // enough to change the planning outcome.
-        if (partNumRows > -1) {
-          if (partitionNumRows_ == -1) partitionNumRows_ = 0;
-          partitionNumRows_ = checkedAdd(partitionNumRows_, partNumRows);
-          ++numPartitionsWithNumRows_;
-        }
+
+    List<FeFsPartition> partitionsWithCorruptOrMissingStats = new ArrayList<>();
+    for (FeFsPartition p : partitions_) {
+      long partNumRows = p.getNumRows();
+      // Check for corrupt stats
+      if (partNumRows < -1 || (partNumRows == 0 && p.getSize() > 0)) {
+        hasCorruptTableStats_ = true;
+        partitionsWithCorruptOrMissingStats.add(p);
+      } else if (partNumRows == -1) { // Check for missing stats
+        partitionsWithCorruptOrMissingStats.add(p);
+      } else if (partNumRows > -1) {
+        // Consider partition with good stats.
+        if (partitionNumRows_ == -1) partitionNumRows_ = 0;
+        partitionNumRows_ = checkedAdd(partitionNumRows_, partNumRows);
+        ++numPartitionsWithNumRows_;
       }
-      if (numPartitionsWithNumRows_ > 0) return partitionNumRows_;
     }
-    // Table is unpartitioned or the table is partitioned but no partitions have stats.
+    // If all partitions have good stats, return the total row count contributed
+    // by each of the partitions, as the row count for the table.
+    if (partitionsWithCorruptOrMissingStats.size() == 0
+        && numPartitionsWithNumRows_ > 0) {
+      return partitionNumRows_;
+    }
+
     // Set cardinality based on table-level stats.
     long numRows = tbl_.getNumRows();
     // Depending on the query option of disable_hdfs_num_rows_est, if numRows
-    // is still not available, we provide a crude estimation by computing
-    // sumAvgRowSizes, the sum of the slot size of each column of scalar type,
-    // and then generate the estimate using sumValues(totalBytesPerFs_), the size of
-    // the hdfs table.
-    if (!queryOptions.disable_hdfs_num_rows_estimate && numRows == -1L) {
-      // Compute the estimated table size when taking compression into consideration
-      long estimatedTableSize = computeEstimatedTableSize();
+    // is still not available (-1), or partition stats is corrupted, we provide
+    // a crude estimation by computing sumAvgRowSizes, the sum of the slot
+    // size of each column of scalar type, and then generate the estimate using
+    // sumValues(totalBytesPerFs_), the size of the hdfs table.
+    if (!queryOptions.disable_hdfs_num_rows_estimate
+        && (numRows == -1L || hasCorruptTableStats_)) {
+      // Compute the estimated table size from those partitions with missing or corrupt
+      // row count, when taking compression into consideration
+      long estimatedTableSize =
+          computeEstimatedTableSize(partitionsWithCorruptOrMissingStats);
 
       double sumAvgRowSizes = 0.0;
       for (Column col : tbl_.getColumns()) {
@@ -1202,25 +1213,35 @@
         }
       }
 
+      long estNumRows = 0;
       if (sumAvgRowSizes == 0.0) {
         // When the type of each Column is of ArrayType or MapType,
         // sumAvgRowSizes would be equal to 0. In this case, we use a ultimate
         // fallback row width if sumAvgRowSizes == 0.0.
-        numRows = Math.round(estimatedTableSize / DEFAULT_ROW_WIDTH_ESTIMATE);
+        estNumRows = Math.round(estimatedTableSize / DEFAULT_ROW_WIDTH_ESTIMATE);
       } else {
-        numRows = Math.round(estimatedTableSize / sumAvgRowSizes);
+        estNumRows = Math.round(estimatedTableSize / sumAvgRowSizes);
       }
+
+      // Include the row count contributed by partitions with good stats (if any).
+      numRows = partitionNumRows_ =
+          (partitionNumRows_ > 0) ? partitionNumRows_ + estNumRows : estNumRows;
     }
+
     if (numRows < -1 || (numRows == 0 && tbl_.getTotalHdfsBytes() > 0)) {
       hasCorruptTableStats_ = true;
     }
+
     return numRows;
   }
 
-  /** Compute the estimated table size when taking compression into consideration */
-  private long computeEstimatedTableSize() {
+  /**
+   * Compute the estimated table size for the partitions contained in
+   * the partitions argument when taking compression into consideration
+   */
+  private long computeEstimatedTableSize(List<FeFsPartition> partitions) {
     long estimatedTableSize = 0;
-    for (FeFsPartition p: partitions_) {
+    for (FeFsPartition p : partitions) {
       HdfsFileFormat format = p.getFileFormat();
       long estimatedPartitionSize = 0;
       if (format == HdfsFileFormat.TEXT) {
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index f720873..1d03e98 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -325,7 +325,8 @@
     addTestTable("create table test_hdfs_insert_writer_limit.unpartitioned_table"
         + " (id int) location '/'");
     runPlannerTestFile("insert-hdfs-writer-limit", "test_hdfs_insert_writer_limit",
-        ImmutableSet.of(PlannerTestOption.EXTENDED_EXPLAIN));
+        ImmutableSet.of(PlannerTestOption.EXTENDED_EXPLAIN,
+            PlannerTestOption.DO_NOT_VALIDATE_ROWCOUNT_ESTIMATION_FOR_PARTITIONS));
   }
 
   @Test
@@ -383,8 +384,9 @@
     // The FK/PK detection result is included in EXTENDED or higher.
     TQueryOptions options = defaultQueryOptions();
     options.setDisable_hdfs_num_rows_estimate(false);
-    runPlannerTestFile("fk-pk-join-detection-hdfs-num-rows-est-enabled",
-        options, ImmutableSet.of(PlannerTestOption.EXTENDED_EXPLAIN));
+    runPlannerTestFile("fk-pk-join-detection-hdfs-num-rows-est-enabled", options,
+        ImmutableSet.of(PlannerTestOption.EXTENDED_EXPLAIN,
+            PlannerTestOption.DO_NOT_VALIDATE_ROWCOUNT_ESTIMATION_FOR_PARTITIONS));
   }
 
   @Test
@@ -608,7 +610,9 @@
 
   @Test
   public void testBloomFilterAssignment() {
-    runPlannerTestFile("bloom-filter-assignment");
+    runPlannerTestFile("bloom-filter-assignment",
+        ImmutableSet.of(
+            PlannerTestOption.DO_NOT_VALIDATE_ROWCOUNT_ESTIMATION_FOR_PARTITIONS));
   }
 
   @Test
@@ -624,7 +628,8 @@
   @Test
   public void testParquetFiltering() {
     runPlannerTestFile("parquet-filtering",
-        ImmutableSet.of(PlannerTestOption.EXTENDED_EXPLAIN));
+        ImmutableSet.of(PlannerTestOption.EXTENDED_EXPLAIN,
+            PlannerTestOption.DO_NOT_VALIDATE_ROWCOUNT_ESTIMATION_FOR_PARTITIONS));
   }
 
   @Test
@@ -633,7 +638,8 @@
     options.setParquet_dictionary_filtering(false);
     options.setParquet_read_statistics(false);
     runPlannerTestFile("parquet-filtering-disabled", options,
-        ImmutableSet.of(PlannerTestOption.EXTENDED_EXPLAIN));
+        ImmutableSet.of(PlannerTestOption.EXTENDED_EXPLAIN,
+            PlannerTestOption.DO_NOT_VALIDATE_ROWCOUNT_ESTIMATION_FOR_PARTITIONS));
   }
 
   @Test
@@ -798,7 +804,8 @@
   public void testTableSample() {
     TQueryOptions options = defaultQueryOptions();
     runPlannerTestFile("tablesample", options,
-        ImmutableSet.of(PlannerTestOption.EXTENDED_EXPLAIN));
+        ImmutableSet.of(PlannerTestOption.EXTENDED_EXPLAIN,
+            PlannerTestOption.DO_NOT_VALIDATE_ROWCOUNT_ESTIMATION_FOR_PARTITIONS));
   }
 
   @Test
@@ -859,7 +866,9 @@
     options.setExplain_level(TExplainLevel.EXTENDED);
     options.setDisable_hdfs_num_rows_estimate(false);
     options.setEnabled_runtime_filter_types(TEnabledRuntimeFilterTypes.MIN_MAX);
-    runPlannerTestFile("min-max-runtime-filters-hdfs-num-rows-est-enabled", options);
+    runPlannerTestFile("min-max-runtime-filters-hdfs-num-rows-est-enabled", options,
+        ImmutableSet.of(
+            PlannerTestOption.DO_NOT_VALIDATE_ROWCOUNT_ESTIMATION_FOR_PARTITIONS));
   }
 
   @Test
@@ -1013,6 +1022,9 @@
     filter = TestUtils.ROW_SIZE_FILTER;
     assertEquals(" row-size= cardinality=10.3K",
         filter.transform(" row-size=10B cardinality=10.3K"));
+    filter = TestUtils.PARTITIONS_FILTER;
+    assertEquals(" partitions: 0/24 rows=",
+        filter.transform(" partitions: 0/24 rows=10.3K"));
   }
 
   @Test
@@ -1083,7 +1095,9 @@
    */
   @Test
   public void testAcidTableScans() {
-    runPlannerTestFile("acid-scans", "functional_orc_def");
+    runPlannerTestFile("acid-scans", "functional_orc_def",
+        ImmutableSet.of(
+            PlannerTestOption.DO_NOT_VALIDATE_ROWCOUNT_ESTIMATION_FOR_PARTITIONS));
   }
 
   /**
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
index a675a75..afcfa11 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
@@ -540,6 +540,10 @@
       if (!testOptions.contains(PlannerTestOption.VALIDATE_SCAN_FS)) {
         resultFilters.add(TestUtils.SCAN_NODE_SCHEME_FILTER);
       }
+      if (testOptions.contains(
+              PlannerTestOption.DO_NOT_VALIDATE_ROWCOUNT_ESTIMATION_FOR_PARTITIONS)) {
+        resultFilters.add(TestUtils.PARTITIONS_FILTER);
+      }
 
       String planDiff = TestUtils.compareOutput(
           Lists.newArrayList(explainStr.split("\n")), expectedPlan, true, resultFilters);
@@ -830,7 +834,10 @@
     VALIDATE_SCAN_FS,
     // If set, disables the attempt to compute an estimated number of rows in an
     // hdfs table.
-    DISABLE_HDFS_NUM_ROWS_ESTIMATE
+    DISABLE_HDFS_NUM_ROWS_ESTIMATE,
+    // If set, make no attempt to validate the estimated number of rows for any
+    // partitions in an hdfs table.
+    DO_NOT_VALIDATE_ROWCOUNT_ESTIMATION_FOR_PARTITIONS
   }
 
   protected void runPlannerTestFile(String testFile, TQueryOptions options) {
diff --git a/fe/src/test/java/org/apache/impala/testutil/TestUtils.java b/fe/src/test/java/org/apache/impala/testutil/TestUtils.java
index f07e239..c7fde3e 100644
--- a/fe/src/test/java/org/apache/impala/testutil/TestUtils.java
+++ b/fe/src/test/java/org/apache/impala/testutil/TestUtils.java
@@ -113,20 +113,31 @@
    */
   public static class IgnoreValueFilter implements ResultFilter {
     // Literal string containing the key name.
-    private final String keyPrefix;
-    private final String valueRegex;
+    protected final String keyPrefix;
+    protected final String valueRegex;
 
     /**
      * Create a filter that ignores the value from key value pairs where the key is
-     * the literal 'key' value and the value matches 'valueRegex'.
+     * the literal 'key' value and the value matches 'valueRegex'. The key and the
+     * value are separated by the 'separator' character.
      */
-    public IgnoreValueFilter(String key, String valueRegex) {
+    public IgnoreValueFilter(
+        String key, String valueRegex, char separator) {
       // Include leading space to avoid matching partial keys, e.g. if key is "bar" we
       // don't want to match "foobar=".
-      this.keyPrefix = " " + key + "=";
+      this.keyPrefix = " " + key + Character.toString(separator);
       this.valueRegex = valueRegex;
     }
 
+    /**
+     *  Create a filter that ignores the value from key value pairs where the key is
+     *  the literal 'key' value and the value matches 'valueRegex'. The key and the
+     *  value are separated by '='.
+     */
+    public IgnoreValueFilter(String key, String valueRegex) {
+      this(key, valueRegex, '=');
+    }
+
     @Override
     public boolean matches(String input) { return input.contains(keyPrefix); }
 
@@ -137,6 +148,30 @@
   }
 
   /**
+   * Filter to replace the value from elements in the format key=value.
+   */
+  public static class ReplaceValueFilter extends IgnoreValueFilter {
+    // Literal string containing the replacement regex.
+    private final String replaceRegex;
+
+    /**
+     * Create a filter that replaces the value from key value pairs where the key is
+     * the literal 'key' value and the value matches 'valueRegex'. The key and the
+     * value are separated by the 'separator' character.
+     */
+    public ReplaceValueFilter(
+        String key, String valueRegex, String replaceRegex, char separator) {
+      super(key, valueRegex, separator);
+      this.replaceRegex = replaceRegex;
+    }
+
+    @Override
+    public String transform(String input) {
+      return input.replaceAll(keyPrefix + valueRegex, keyPrefix + replaceRegex);
+    }
+  }
+
+  /**
    * Filter to ignore the filesystem schemes in the scan node explain output. See
    * {@link org.apache.impala.planner.PlannerTestBase.PlannerTestOption#VALIDATE_SCAN_FS}
    * for more details.
@@ -176,6 +211,11 @@
   public static final IgnoreValueFilter CARDINALITY_FILTER =
       new IgnoreValueFilter("cardinality", "\\S+");
 
+  // Ignore any values after 'rows=' in partitions: 0/24 rows=12.83K or
+  // partitions: 0/24 rows=unavailable entries
+  public static final ReplaceValueFilter PARTITIONS_FILTER =
+      new ReplaceValueFilter("partitions", "( \\d+/\\d+ rows=)\\S+", "$1", ':');
+
   // Ignore the exact estimated row count, which depends on the file sizes.
   static IgnoreValueFilter SCAN_RANGE_ROW_COUNT_FILTER =
       new IgnoreValueFilter("max-scan-range-rows", PrintUtils.METRIC_REGEX);
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/acid-scans.test b/testdata/workloads/functional-planner/queries/PlannerTest/acid-scans.test
index ab982e1..54dfae7 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/acid-scans.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/acid-scans.test
@@ -292,7 +292,7 @@
 |  |     HDFS partitions=6/24 files=6 size=6.58KB
 |  |     stored statistics:
 |  |       table: rows=unavailable size=unavailable
-|  |       partitions: 0/6 rows=unavailable
+|  |       partitions: 0/6 rows=413
 |  |       columns: all
 |  |     extrapolated-rows=disabled max-scan-range-rows=unavailable
 |  |     mem-estimate=48.00MB mem-reservation=24.00KB thread-reservation=1
@@ -303,7 +303,7 @@
 |     HDFS partitions=24/24 files=24 size=54.09KB
 |     stored statistics:
 |       table: rows=unavailable size=unavailable
-|       partitions: 0/24 rows=unavailable
+|       partitions: 0/24 rows=3.42K
 |       columns missing stats: id
 |     extrapolated-rows=disabled max-scan-range-rows=unavailable
 |     mem-estimate=64.00MB mem-reservation=32.00KB thread-reservation=1
@@ -320,7 +320,7 @@
 |     HDFS partitions=6/24 files=6 size=6.58KB
 |     stored statistics:
 |       table: rows=unavailable size=unavailable
-|       partitions: 0/6 rows=unavailable
+|       partitions: 0/6 rows=413
 |       columns: all
 |     extrapolated-rows=disabled max-scan-range-rows=unavailable
 |     mem-estimate=48.00MB mem-reservation=24.00KB thread-reservation=1
@@ -332,7 +332,7 @@
    runtime filters: RF000[bloom] -> id
    stored statistics:
      table: rows=unavailable size=unavailable
-     partitions: 0/24 rows=unavailable
+     partitions: 0/24 rows=3.42K
      columns missing stats: id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col
    extrapolated-rows=disabled max-scan-range-rows=unavailable
    mem-estimate=64.00MB mem-reservation=112.00KB thread-reservation=1
@@ -404,7 +404,7 @@
 |  |     HDFS partitions=6/24 files=6 size=6.58KB
 |  |     stored statistics:
 |  |       table: rows=unavailable size=unavailable
-|  |       partitions: 0/6 rows=unavailable
+|  |       partitions: 0/6 rows=413
 |  |       columns: all
 |  |     extrapolated-rows=disabled max-scan-range-rows=unavailable
 |  |     mem-estimate=48.00MB mem-reservation=24.00KB thread-reservation=1
@@ -415,7 +415,7 @@
 |     HDFS partitions=24/24 files=24 size=54.09KB
 |     stored statistics:
 |       table: rows=unavailable size=unavailable
-|       partitions: 0/24 rows=unavailable
+|       partitions: 0/24 rows=3.42K
 |       columns missing stats: id
 |     extrapolated-rows=disabled max-scan-range-rows=unavailable
 |     mem-estimate=64.00MB mem-reservation=32.00KB thread-reservation=1
@@ -439,7 +439,7 @@
 |     HDFS partitions=6/24 files=6 size=6.58KB
 |     stored statistics:
 |       table: rows=unavailable size=unavailable
-|       partitions: 0/6 rows=unavailable
+|       partitions: 0/6 rows=413
 |       columns: all
 |     extrapolated-rows=disabled max-scan-range-rows=unavailable
 |     mem-estimate=48.00MB mem-reservation=24.00KB thread-reservation=1
@@ -451,7 +451,7 @@
    runtime filters: RF000[bloom] -> id
    stored statistics:
      table: rows=unavailable size=unavailable
-     partitions: 0/24 rows=unavailable
+     partitions: 0/24 rows=3.42K
      columns missing stats: id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col
    extrapolated-rows=disabled max-scan-range-rows=unavailable
    mem-estimate=64.00MB mem-reservation=112.00KB thread-reservation=1
@@ -490,7 +490,7 @@
   |  |     HDFS partitions=6/24 files=6 size=6.58KB
   |  |     stored statistics:
   |  |       table: rows=unavailable size=unavailable
-  |  |       partitions: 0/6 rows=unavailable
+  |  |       partitions: 0/6 rows=413
   |  |       columns: all
   |  |     extrapolated-rows=disabled max-scan-range-rows=unavailable
   |  |     mem-estimate=48.00MB mem-reservation=24.00KB thread-reservation=1
@@ -501,7 +501,7 @@
   |     HDFS partitions=24/24 files=24 size=54.09KB
   |     stored statistics:
   |       table: rows=unavailable size=unavailable
-  |       partitions: 0/24 rows=unavailable
+  |       partitions: 0/24 rows=3.42K
   |       columns: all
   |     extrapolated-rows=disabled max-scan-range-rows=unavailable
   |     mem-estimate=48.00MB mem-reservation=24.00KB thread-reservation=1
@@ -518,7 +518,7 @@
   |     HDFS partitions=6/24 files=6 size=6.58KB
   |     stored statistics:
   |       table: rows=unavailable size=unavailable
-  |       partitions: 0/6 rows=unavailable
+  |       partitions: 0/6 rows=413
   |       columns: all
   |     extrapolated-rows=disabled max-scan-range-rows=unavailable
   |     mem-estimate=48.00MB mem-reservation=24.00KB thread-reservation=1
@@ -530,7 +530,7 @@
      runtime filters: RF000[bloom] -> t1.id % 12
      stored statistics:
        table: rows=unavailable size=unavailable
-       partitions: 0/24 rows=unavailable
+       partitions: 0/24 rows=3.42K
        columns missing stats: id
      extrapolated-rows=disabled max-scan-range-rows=unavailable
      mem-estimate=64.00MB mem-reservation=32.00KB thread-reservation=1
@@ -590,7 +590,7 @@
      runtime filters: RF000[bloom] -> t1.id % 12
      stored statistics:
        table: rows=unavailable size=unavailable
-       partitions: 0/24 rows=unavailable
+       partitions: 0/24 rows=3.42K
        columns missing stats: id
      extrapolated-rows=disabled max-scan-range-rows=unavailable
      mem-estimate=64.00MB mem-reservation=32.00KB thread-reservation=1
@@ -605,7 +605,7 @@
      HDFS partitions=6/24 files=6 size=6.58KB
      stored statistics:
        table: rows=unavailable size=unavailable
-       partitions: 0/6 rows=unavailable
+       partitions: 0/6 rows=413
        columns: all
      extrapolated-rows=disabled max-scan-range-rows=unavailable
      mem-estimate=48.00MB mem-reservation=24.00KB thread-reservation=1
@@ -631,7 +631,7 @@
      HDFS partitions=24/24 files=24 size=54.09KB
      stored statistics:
        table: rows=unavailable size=unavailable
-       partitions: 0/24 rows=unavailable
+       partitions: 0/24 rows=3.42K
        columns: all
      extrapolated-rows=disabled max-scan-range-rows=unavailable
      mem-estimate=48.00MB mem-reservation=24.00KB thread-reservation=1
@@ -646,7 +646,7 @@
      HDFS partitions=6/24 files=6 size=6.58KB
      stored statistics:
        table: rows=unavailable size=unavailable
-       partitions: 0/6 rows=unavailable
+       partitions: 0/6 rows=413
        columns: all
      extrapolated-rows=disabled max-scan-range-rows=unavailable
      mem-estimate=48.00MB mem-reservation=24.00KB thread-reservation=1
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/bloom-filter-assignment.test b/testdata/workloads/functional-planner/queries/PlannerTest/bloom-filter-assignment.test
index ab0aa15..9e23d9f 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/bloom-filter-assignment.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/bloom-filter-assignment.test
@@ -30,7 +30,7 @@
 |     HDFS partitions=24/24 files=24 size=201.11KB
 |     stored statistics:
 |       table: rows=unavailable size=unavailable
-|       partitions: 0/24 rows=unavailable
+|       partitions: 0/24 rows=12.84K
 |       columns: unavailable
 |     extrapolated-rows=disabled max-scan-range-rows=unavailable
 |     mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=1
@@ -42,7 +42,7 @@
    runtime filters: RF000[bloom] -> a.id
    stored statistics:
      table: rows=unavailable size=unavailable
-     partitions: 0/24 rows=unavailable
+     partitions: 0/24 rows=12.84K
      columns: unavailable
    extrapolated-rows=disabled max-scan-range-rows=unavailable
    mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=1
@@ -126,7 +126,7 @@
    runtime filters: RF000[bloom] -> a.id
    stored statistics:
      table: rows=unavailable size=unavailable
-     partitions: 0/24 rows=unavailable
+     partitions: 0/24 rows=12.84K
      columns: unavailable
    extrapolated-rows=disabled max-scan-range-rows=unavailable
    mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=1
@@ -165,7 +165,7 @@
 |     HDFS partitions=24/24 files=24 size=201.11KB
 |     stored statistics:
 |       table: rows=unavailable size=unavailable
-|       partitions: 0/24 rows=unavailable
+|       partitions: 0/24 rows=12.84K
 |       columns: unavailable
 |     extrapolated-rows=disabled max-scan-range-rows=unavailable
 |     mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=1
@@ -216,7 +216,7 @@
    runtime filters: RF000[bloom] -> a.id + 1
    stored statistics:
      table: rows=unavailable size=unavailable
-     partitions: 0/24 rows=unavailable
+     partitions: 0/24 rows=12.84K
      columns: unavailable
    extrapolated-rows=disabled max-scan-range-rows=unavailable
    mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=1
@@ -254,7 +254,7 @@
 |     HDFS partitions=24/24 files=24 size=201.11KB
 |     stored statistics:
 |       table: rows=unavailable size=unavailable
-|       partitions: 0/24 rows=unavailable
+|       partitions: 0/24 rows=12.84K
 |       columns: unavailable
 |     extrapolated-rows=disabled max-scan-range-rows=unavailable
 |     mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=1
@@ -301,7 +301,7 @@
 |     predicates: c.id < CAST(100 AS INT)
 |     stored statistics:
 |       table: rows=unavailable size=unavailable
-|       partitions: 0/24 rows=unavailable
+|       partitions: 0/24 rows=12.84K
 |       columns: unavailable
 |     extrapolated-rows=disabled max-scan-range-rows=unavailable
 |     parquet statistics predicates: c.id < CAST(100 AS INT)
@@ -329,7 +329,7 @@
    runtime filters: RF000[bloom] -> a.int_col, RF002[bloom] -> a.int_col
    stored statistics:
      table: rows=unavailable size=unavailable
-     partitions: 0/24 rows=unavailable
+     partitions: 0/24 rows=12.84K
      columns: unavailable
    extrapolated-rows=disabled max-scan-range-rows=unavailable
    mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=1
@@ -372,7 +372,7 @@
 |     predicates: c.id < CAST(100 AS INT)
 |     stored statistics:
 |       table: rows=unavailable size=unavailable
-|       partitions: 0/24 rows=unavailable
+|       partitions: 0/24 rows=12.84K
 |       columns: unavailable
 |     extrapolated-rows=disabled max-scan-range-rows=unavailable
 |     parquet statistics predicates: c.id < CAST(100 AS INT)
@@ -399,7 +399,7 @@
    runtime filters: RF000[bloom] -> a.int_col + 1, RF002[bloom] -> a.int_col
    stored statistics:
      table: rows=unavailable size=unavailable
-     partitions: 0/24 rows=unavailable
+     partitions: 0/24 rows=12.84K
      columns: unavailable
    extrapolated-rows=disabled max-scan-range-rows=unavailable
    mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=1
@@ -442,7 +442,7 @@
 |     predicates: c.id < CAST(100 AS INT)
 |     stored statistics:
 |       table: rows=unavailable size=unavailable
-|       partitions: 0/24 rows=unavailable
+|       partitions: 0/24 rows=12.84K
 |       columns: unavailable
 |     extrapolated-rows=disabled max-scan-range-rows=unavailable
 |     parquet statistics predicates: c.id < CAST(100 AS INT)
@@ -470,7 +470,7 @@
    runtime filters: RF000[bloom] -> a.timestamp_col, RF003[bloom] -> a.timestamp_col
    stored statistics:
      table: rows=unavailable size=unavailable
-     partitions: 0/24 rows=unavailable
+     partitions: 0/24 rows=12.84K
      columns: unavailable
    extrapolated-rows=disabled max-scan-range-rows=unavailable
    mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=1
@@ -533,7 +533,7 @@
    runtime filters: RF000[bloom] -> a.timestamp_col, RF003[bloom] -> a.timestamp_col
    stored statistics:
      table: rows=unavailable size=unavailable
-     partitions: 0/24 rows=unavailable
+     partitions: 0/24 rows=12.84K
      columns: unavailable
    extrapolated-rows=disabled max-scan-range-rows=unavailable
    mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=1
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/fk-pk-join-detection-hdfs-num-rows-est-enabled.test b/testdata/workloads/functional-planner/queries/PlannerTest/fk-pk-join-detection-hdfs-num-rows-est-enabled.test
index e144a2c..0b8eabb 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/fk-pk-join-detection-hdfs-num-rows-est-enabled.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/fk-pk-join-detection-hdfs-num-rows-est-enabled.test
@@ -79,7 +79,7 @@
    runtime filters: RF000[bloom] -> ss_customer_sk
    stored statistics:
      table: rows=unavailable size=unavailable
-     partitions: 0/1824 rows=unavailable
+     partitions: 0/1824 rows=8.07M
      columns: unavailable
    extrapolated-rows=disabled max-scan-range-rows=unavailable
    mem-estimate=128.00MB mem-reservation=8.00MB thread-reservation=1
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/min-max-runtime-filters-hdfs-num-rows-est-enabled.test b/testdata/workloads/functional-planner/queries/PlannerTest/min-max-runtime-filters-hdfs-num-rows-est-enabled.test
index 9210208..c4b691d 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/min-max-runtime-filters-hdfs-num-rows-est-enabled.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/min-max-runtime-filters-hdfs-num-rows-est-enabled.test
@@ -51,7 +51,7 @@
    runtime filters: RF000[bloom] -> b.int_col, RF002[bloom] -> b.int_col
    stored statistics:
      table: rows=unavailable size=unavailable
-     partitions: 0/24 rows=unavailable
+     partitions: 0/24 rows=12.84K
      columns: unavailable
    extrapolated-rows=disabled max-scan-range-rows=unavailable
    mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=1
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering-disabled.test b/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering-disabled.test
index d54b4ec..6b53ede 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering-disabled.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering-disabled.test
@@ -25,7 +25,7 @@
    predicates: int_col IS NULL, int_col > CAST(1 AS INT), int_col > CAST(tinyint_col AS INT), CAST(int_col AS DOUBLE) * rand() > CAST(50 AS DOUBLE)
    stored statistics:
      table: rows=unavailable size=unavailable
-     partitions: 0/24 rows=unavailable
+     partitions: 0/24 rows=12.84K
      columns: unavailable
    extrapolated-rows=disabled max-scan-range-rows=unavailable
    mem-estimate=32.00MB mem-reservation=16.00KB thread-reservation=1
@@ -64,7 +64,7 @@
    predicates: bool_col, bigint_col < CAST(5000 AS BIGINT), double_col > CAST(100.00 AS DOUBLE), float_col > CAST(50.00 AS FLOAT), id = CAST(1 AS INT), tinyint_col < CAST(50 AS TINYINT), int_col % CAST(2 AS INT) = CAST(1 AS INT), string_col IN ('aaaa', 'bbbb', 'cccc'), smallint_col IN (CAST(1 AS SMALLINT), CAST(2 AS SMALLINT), CAST(3 AS SMALLINT), CAST(4 AS SMALLINT), CAST(5 AS SMALLINT)), timestamp_cmp(timestamp_col, TIMESTAMP '2016-11-20 00:00:00') = CAST(1 AS INT), date_string_col > '1993-10-01'
    stored statistics:
      table: rows=unavailable size=unavailable
-     partitions: 0/22 rows=unavailable
+     partitions: 0/22 rows=11.74K
      columns missing stats: id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col
    extrapolated-rows=disabled max-scan-range-rows=unavailable
    mem-estimate=128.00MB mem-reservation=88.00KB thread-reservation=1
@@ -95,7 +95,7 @@
    predicates: id IN (int_col), id NOT IN (CAST(0 AS INT), CAST(1 AS INT), CAST(2 AS INT)), int_col % CAST(50 AS INT) IN (CAST(0 AS INT), CAST(1 AS INT)), string_col IN ('aaaa', 'bbbb', 'cccc', NULL)
    stored statistics:
      table: rows=unavailable size=unavailable
-     partitions: 0/24 rows=unavailable
+     partitions: 0/24 rows=12.84K
      columns: unavailable
    extrapolated-rows=disabled max-scan-range-rows=unavailable
    mem-estimate=48.00MB mem-reservation=24.00KB thread-reservation=1
@@ -346,7 +346,7 @@
    predicates: bool_col, bigint_col < CAST(5000 AS BIGINT), double_col > CAST(100.00 AS DOUBLE), float_col > CAST(50.00 AS FLOAT), id = CAST(1 AS INT), tinyint_col < CAST(50 AS TINYINT), int_col % CAST(2 AS INT) = CAST(1 AS INT), string_col IN ('aaaa', 'bbbb', 'cccc'), smallint_col IN (CAST(1 AS SMALLINT), CAST(2 AS SMALLINT), CAST(3 AS SMALLINT), CAST(4 AS SMALLINT), CAST(5 AS SMALLINT)), timestamp_cmp(timestamp_col, TIMESTAMP '2016-11-20 00:00:00') = CAST(1 AS INT), date_string_col > '1993-10-01'
    stored statistics:
      table: rows=unavailable size=unavailable
-     partitions: 0/4 rows=unavailable
+     partitions: 0/4 rows=2.55K
      columns missing stats: id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col
    extrapolated-rows=disabled max-scan-range-rows=unavailable
    mem-estimate=128.00MB mem-reservation=88.00KB thread-reservation=1
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test b/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test
index 7b94c17..90792b8 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test
@@ -24,7 +24,7 @@
    predicates: int_col IS NULL, int_col > CAST(1 AS INT), int_col > CAST(tinyint_col AS INT), CAST(int_col AS DOUBLE) * rand() > CAST(50 AS DOUBLE)
    stored statistics:
      table: rows=unavailable size=unavailable
-     partitions: 0/24 rows=unavailable
+     partitions: 0/24 rows=12.84K
      columns: unavailable
    extrapolated-rows=disabled max-scan-range-rows=unavailable
    parquet statistics predicates: int_col > CAST(1 AS INT)
@@ -89,7 +89,7 @@
    predicates: bool_col, bigint_col < CAST(5000 AS BIGINT), double_col > CAST(100.00 AS DOUBLE), float_col > CAST(50.00 AS FLOAT), id = CAST(1 AS INT), tinyint_col < CAST(50 AS TINYINT), int_col % CAST(2 AS INT) = CAST(1 AS INT), string_col IN ('aaaa', 'bbbb', 'cccc'), smallint_col IN (CAST(1 AS SMALLINT), CAST(2 AS SMALLINT), CAST(3 AS SMALLINT), CAST(4 AS SMALLINT), CAST(5 AS SMALLINT)), timestamp_cmp(timestamp_col, TIMESTAMP '2016-11-20 00:00:00') = CAST(1 AS INT), date_string_col > '1993-10-01'
    stored statistics:
      table: rows=unavailable size=unavailable
-     partitions: 0/22 rows=unavailable
+     partitions: 0/22 rows=11.74K
      columns missing stats: id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col
    extrapolated-rows=disabled max-scan-range-rows=unavailable
    parquet statistics predicates: bigint_col < CAST(5000 AS BIGINT), double_col > CAST(100.00 AS DOUBLE), float_col > CAST(50.00 AS FLOAT), id = CAST(1 AS INT), tinyint_col < CAST(50 AS TINYINT), string_col IN ('aaaa', 'bbbb', 'cccc'), smallint_col IN (CAST(1 AS SMALLINT), CAST(2 AS SMALLINT), CAST(3 AS SMALLINT), CAST(4 AS SMALLINT), CAST(5 AS SMALLINT)), date_string_col > '1993-10-01'
@@ -159,7 +159,7 @@
    predicates: id IN (int_col), id NOT IN (CAST(0 AS INT), CAST(1 AS INT), CAST(2 AS INT)), int_col % CAST(50 AS INT) IN (CAST(0 AS INT), CAST(1 AS INT)), string_col IN ('aaaa', 'bbbb', 'cccc', NULL)
    stored statistics:
      table: rows=unavailable size=unavailable
-     partitions: 0/24 rows=unavailable
+     partitions: 0/24 rows=12.84K
      columns: unavailable
    extrapolated-rows=disabled max-scan-range-rows=unavailable
    parquet dictionary predicates: id NOT IN (CAST(0 AS INT), CAST(1 AS INT), CAST(2 AS INT)), int_col % CAST(50 AS INT) IN (CAST(0 AS INT), CAST(1 AS INT)), string_col IN ('aaaa', 'bbbb', 'cccc', NULL)
@@ -600,7 +600,7 @@
    predicates: bool_col, bigint_col < CAST(5000 AS BIGINT), double_col > CAST(100.00 AS DOUBLE), float_col > CAST(50.00 AS FLOAT), id = CAST(1 AS INT), tinyint_col < CAST(50 AS TINYINT), int_col % CAST(2 AS INT) = CAST(1 AS INT), string_col IN ('aaaa', 'bbbb', 'cccc'), smallint_col IN (CAST(1 AS SMALLINT), CAST(2 AS SMALLINT), CAST(3 AS SMALLINT), CAST(4 AS SMALLINT), CAST(5 AS SMALLINT)), timestamp_cmp(timestamp_col, TIMESTAMP '2016-11-20 00:00:00') = CAST(1 AS INT), date_string_col > '1993-10-01'
    stored statistics:
      table: rows=unavailable size=unavailable
-     partitions: 0/4 rows=unavailable
+     partitions: 0/4 rows=2.55K
      columns missing stats: id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col
    extrapolated-rows=disabled max-scan-range-rows=unavailable
    parquet statistics predicates: bigint_col < CAST(5000 AS BIGINT), double_col > CAST(100.00 AS DOUBLE), float_col > CAST(50.00 AS FLOAT), id = CAST(1 AS INT), tinyint_col < CAST(50 AS TINYINT), string_col IN ('aaaa', 'bbbb', 'cccc'), smallint_col IN (CAST(1 AS SMALLINT), CAST(2 AS SMALLINT), CAST(3 AS SMALLINT), CAST(4 AS SMALLINT), CAST(5 AS SMALLINT)), date_string_col > '1993-10-01'
@@ -637,7 +637,7 @@
    predicates: bool_col, bigint_col < CAST(5000 AS BIGINT), double_col > CAST(100.00 AS DOUBLE), float_col > CAST(50.00 AS FLOAT), id = CAST(1 AS INT), tinyint_col < CAST(50 AS TINYINT), int_col % CAST(2 AS INT) = CAST(1 AS INT), string_col IN ('aaaa', 'bbbb', 'cccc'), smallint_col IN (CAST(1 AS SMALLINT), CAST(2 AS SMALLINT), CAST(3 AS SMALLINT), CAST(4 AS SMALLINT), CAST(5 AS SMALLINT)), timestamp_cmp(timestamp_col, TIMESTAMP '2016-11-20 00:00:00') = CAST(1 AS INT), date_string_col > '1993-10-01'
    stored statistics:
      table: rows=unavailable size=unavailable
-     partitions: 0/0 rows=unavailable
+     partitions: 0/0 rows=0
      columns missing stats: id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col
    extrapolated-rows=disabled max-scan-range-rows=0
    mem-estimate=0B mem-reservation=0B thread-reservation=0
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tablesample.test b/testdata/workloads/functional-planner/queries/PlannerTest/tablesample.test
index 22d9144..2ff75a2 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/tablesample.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/tablesample.test
@@ -178,7 +178,7 @@
    HDFS partitions=3/24 files=3 size=25.50KB
    stored statistics:
      table: rows=unavailable size=unavailable
-     partitions: 0/24 rows=unavailable
+     partitions: 0/24 rows=12.84K
      columns: unavailable
    extrapolated-rows=disabled max-scan-range-rows=unavailable
    mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=1
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/union.test b/testdata/workloads/functional-planner/queries/PlannerTest/union.test
index 396f85f..818367b 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/union.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/union.test
@@ -4152,7 +4152,7 @@
    partitions=0/0 files=0 size=0B
    stored statistics:
      table: rows=unavailable size=unavailable
-     partitions: 0/0 rows=unavailable
+     partitions: 0/0 rows=0
      columns: all
    extrapolated-rows=disabled max-scan-range-rows=0
    mem-estimate=0B mem-reservation=0B thread-reservation=0
@@ -4190,7 +4190,7 @@
 |     partitions=0/0 files=0 size=0B
 |     stored statistics:
 |       table: rows=unavailable size=unavailable
-|       partitions: 0/0 rows=unavailable
+|       partitions: 0/0 rows=0
 |       columns: all
 |     extrapolated-rows=disabled max-scan-range-rows=0
 |     mem-estimate=0B mem-reservation=0B thread-reservation=0
diff --git a/testdata/workloads/functional-query/queries/QueryTest/corrupt-stats.test b/testdata/workloads/functional-query/queries/QueryTest/corrupt-stats.test
index 00af63e..6f2c696 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/corrupt-stats.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/corrupt-stats.test
@@ -59,15 +59,18 @@
 ''
 '03:AGGREGATE [FINALIZE]'
 '|  output: count:merge(*)'
+'|  row-size=8B cardinality=1'
 '|'
 '02:EXCHANGE [UNPARTITIONED]'
 '|'
 '01:AGGREGATE'
 '|  output: count(*)'
+'|  row-size=8B cardinality=1'
 '|'
 '00:SCAN $FILESYSTEM_NAME [$DATABASE.corrupted]'
+'   partition predicates: org = 1'
 '   $FILESYSTEM_NAME partitions=1/2 files=1 size=24B'
-'   row-size=0B cardinality=0'
+'   row-size=0B cardinality=1'
 ---- TYPES
 STRING
 ====
@@ -143,7 +146,7 @@
 '|'
 '00:SCAN $FILESYSTEM_NAME [$DATABASE.corrupted]'
 '   $FILESYSTEM_NAME partitions=1/2 files=1 size=24B'
-'   row-size=0B cardinality=6'
+'   row-size=0B cardinality=1'
 ---- TYPES
 STRING
 ====
@@ -203,7 +206,7 @@
 '|'
 '00:SCAN $FILESYSTEM_NAME [$DATABASE.corrupted_no_part]'
 '   $FILESYSTEM_NAME partitions=1/1 files=1 size=6B'
-'   row-size=0B cardinality=0'
+'   row-size=0B cardinality=2'
 ---- TYPES
 STRING
 ====
@@ -226,7 +229,7 @@
 '|'
 '00:SCAN $FILESYSTEM_NAME [$DATABASE.corrupted_no_part]'
 '   $FILESYSTEM_NAME partitions=1/1 files=1 size=6B'
-'   row-size=0B cardinality=unavailable'
+'   row-size=0B cardinality=2'
 ---- TYPES
 STRING
 ====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test b/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test
index 7efb68d..0f445b2 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test
@@ -11,7 +11,7 @@
 ---- RESULTS: VERIFY_IS_SUBSET
 '   stored statistics:'
 '     table: rows=unavailable size=unavailable'
-'     partitions: 0/12 rows=unavailable'
+'     partitions: 0/12 rows=5.97K'
 '     columns: unavailable'
 row_regex:.* extrapolated-rows=unavailable.*
 '   tuple-ids=0 row-size=4B cardinality=5.97K'
@@ -218,7 +218,7 @@
 ---- RESULTS: VERIFY_IS_SUBSET
 '   stored statistics:'
 '     table: rows=unavailable size=unavailable'
-'     partitions: 0/24 rows=unavailable'
+'     partitions: 0/24 rows=17.91K'
 '     columns: unavailable'
 row_regex:.* extrapolated-rows=unavailable.*
 row_regex:.* tuple-ids=0 row-size=4B cardinality=17\.9.*K
diff --git a/tests/metadata/test_compute_stats.py b/tests/metadata/test_compute_stats.py
index 70e99f5..5455d60 100644
--- a/tests/metadata/test_compute_stats.py
+++ b/tests/metadata/test_compute_stats.py
@@ -29,6 +29,8 @@
     create_uncompressed_text_dimension)
 from CatalogObjects.ttypes import THdfsCompression
 
+import os
+
 
 IMPALA_TEST_CLUSTER_PROPERTIES = ImpalaTestClusterProperties.get_instance()
 
@@ -170,6 +172,126 @@
     assert(len(show_result.data) == 2)
     assert("1\tpval\t8" in show_result.data[0])
 
+  @staticmethod
+  def create_load_test_corrupt_stats(self, unique_database, create_load_stmts,
+          table_name, partitions, files):
+    """A helper method for tests against the fix to IMPALA-9744."""
+    # Create and load the Hive table.
+    self.run_stmt_in_hive(create_load_stmts)
+
+    # Make the table visible in Impala.
+    self.execute_query("invalidate metadata %s.%s" % (unique_database, table_name))
+
+    # Formulate a simple query that scans the Hive table.
+    explain_stmt = """
+    explain select * from {0}.{1} where
+    int_col > (select 3*stddev(int_col) from {0}.{1})
+    """.format(unique_database, table_name)
+    explain_result = self.execute_query(explain_stmt)
+
+    # Formulate a template which verifies the number of partitions and the number
+    # of files are per spec.
+    hdfs_physical_properties_template \
+      = """HDFS partitions={0}/{0} files={1}""".format(partitions, files)
+
+    # Check that the template formulated above exists and row count of the table is
+    # not zero, for all scans.
+    for i in xrange(len(explain_result.data)):
+      if ("SCAN HDFS" in explain_result.data[i]):
+         assert(hdfs_physical_properties_template in explain_result.data[i + 1])
+         assert("cardinality=0" not in explain_result.data[i + 2])
+
+  @SkipIfS3.hive
+  @SkipIfABFS.hive
+  @SkipIfADLS.hive
+  @SkipIfIsilon.hive
+  @SkipIfLocal.hive
+  def test_corrupted_stats_in_partitioned_hive_tables(self, vector, unique_database):
+    """IMPALA-9744: Tests that the partition stats corruption in Hive tables
+    (row count=0, partition size>0, persisted when the data was loaded with
+    hive.stats.autogather=true) is handled at the table scan level.
+    """
+    # Unless something drastic changes in Hive and/or Impala, this test should
+    # always succeed.
+    if self.exploration_strategy() != 'exhaustive': pytest.skip()
+
+    # Load from a local data file
+    local_file = os.path.join(os.environ['IMPALA_HOME'],
+                 "testdata/data/alltypes_tiny_pages.parquet")
+    table_name = "partitioned_table_with_corrupted_and_missing_stats"
+
+    # Setting hive.stats.autogather=true after CRTB DDL but before LOAD DML
+    # minimally reproduces the corrupt stats issue.
+    create_load_stmts = """
+      CREATE TABLE {0}.{1} (
+        id int COMMENT 'Add a comment',
+        bool_col boolean,
+        tinyint_col tinyint,
+        smallint_col smallint,
+        int_col int,
+        bigint_col bigint,
+        float_col float,
+        double_col double,
+        date_string_col string,
+        string_col string,
+        timestamp_col timestamp,
+        year int,
+        month int )
+        PARTITIONED BY (decade string)
+        STORED AS PARQUET;
+      set hive.stats.autogather=true;
+      load data local inpath '{2}' into table {0}.{1} partition (decade="corrupt-stats");
+      set hive.stats.autogather=false;
+      load data local inpath '{2}' into table {0}.{1} partition (decade="missing-stats");
+    """.format(unique_database, table_name, local_file)
+
+    self.create_load_test_corrupt_stats(self, unique_database, create_load_stmts,
+            table_name, 2, 2)
+
+  @SkipIfS3.hive
+  @SkipIfABFS.hive
+  @SkipIfADLS.hive
+  @SkipIfIsilon.hive
+  @SkipIfLocal.hive
+  def test_corrupted_stats_in_unpartitioned_hive_tables(self, vector, unique_database):
+    """IMPALA-9744: Tests that the stats corruption in unpartitioned Hive
+    tables (row count=0, partition size>0, persisted when the data was loaded
+    with hive.stats.autogather=true) is handled at the table scan level.
+    """
+    # Unless something drastic changes in Hive and/or Impala, this test should
+    # always succeed.
+    if self.exploration_strategy() != 'exhaustive': pytest.skip()
+
+    # Load from a local data file
+    local_file = os.path.join(os.environ['IMPALA_HOME'],
+                 "testdata/data/alltypes_tiny_pages.parquet")
+    table_name = "nonpartitioned_table_with_corrupted_stats"
+
+    # Setting hive.stats.autogather=true prior to CRTB DDL minimally reproduces the
+    # corrupt stats issue.
+    create_load_stmts = """
+      set hive.stats.autogather=true;
+      CREATE TABLE {0}.{1} (
+        id int COMMENT 'Add a comment',
+        bool_col boolean,
+        tinyint_col tinyint,
+        smallint_col smallint,
+        int_col int,
+        bigint_col bigint,
+        float_col float,
+        double_col double,
+        date_string_col string,
+        string_col string,
+        timestamp_col timestamp,
+        year int,
+        month int)
+        STORED AS PARQUET;
+      load data local inpath '{2}' into table {0}.{1};
+    """.format(unique_database, table_name, local_file)
+
+    self.create_load_test_corrupt_stats(self, unique_database, create_load_stmts,
+            table_name, 1, 1)
+
   @SkipIfS3.eventually_consistent
   @SkipIfCatalogV2.stats_pulling_disabled()
   def test_pull_stats_profile(self, vector, unique_database):
diff --git a/tests/metadata/test_explain.py b/tests/metadata/test_explain.py
index 9f2d61b..bc408db 100644
--- a/tests/metadata/test_explain.py
+++ b/tests/metadata/test_explain.py
@@ -129,13 +129,20 @@
     result = self.execute_query("explain select * from %s where p = 1" % mixed_tbl,
         query_options={'explain_level':3})
     check_cardinality(result.data, '100')
+    # Set the number of rows at the table level to -1.
+    self.execute_query(
+      "alter table %s set tblproperties('numRows'='-1')" % mixed_tbl)
     # Set the number of rows for a single partition.
     self.execute_query(
       "alter table %s partition(p=1) set tblproperties('numRows'='50')" % mixed_tbl)
-    # Use partition stats when availabe. Partitions without stats are ignored.
+    # Use partition stats when availabe. Row counts for partitions without
+    # stats are estimated.
     result = self.execute_query("explain select * from %s" % mixed_tbl,
         query_options={'explain_level':3})
-    check_cardinality(result.data, '50')
+    check_cardinality(result.data, '51')
+    # Set the number of rows at the table level back to 100.
+    self.execute_query(
+      "alter table %s set tblproperties('numRows'='100')" % mixed_tbl)
     # Fall back to table-level stats when no selected partitions have stats.
     result = self.execute_query("explain select * from %s where p = 2" % mixed_tbl,
         query_options={'explain_level':3})