IMPALA-6501: Optimize count(star) for Kudu scans

IMPALA-5036 added an optimisation for count(star) in Parquet scans
that avoids materialising dummy rows. This change provides similar
optimization for Kudu tables.

Instead of materializing empty rows when computing count star, we use
the NumRows field from the Kudu API. The Kudu scanner tuple is
modified to have one slot into which we will write the
num rows statistic. The aggregate function is changed from count to a
special sum function that gets initialized to 0.

Tests:
 * Added end-to-end tests
 ̣* Added planner tests
 * Run performance tests on tpch.lineitem Kudu table with 25 set as
   scaling factor, on 1 node, with mt_dop set to 1, just to measure
   the speedup gained when scanning. Counting the rows before the
   optimization took around 400ms, and around 170ms after.

Change-Id: Ic99e0f954d0ca65779bd531ca79ace1fcb066fb9
Reviewed-on: http://gerrit.cloudera.org:8080/14347
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index f407ca5..d81d817 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -80,8 +80,6 @@
       skip_header_line_count_(tnode.hdfs_scan_node.__isset.skip_header_line_count ?
           tnode.hdfs_scan_node.skip_header_line_count : 0),
       tuple_id_(tnode.hdfs_scan_node.tuple_id),
-      optimize_parquet_count_star_(
-          tnode.hdfs_scan_node.__isset.parquet_count_star_slot_offset),
       parquet_count_star_slot_offset_(
           tnode.hdfs_scan_node.__isset.parquet_count_star_slot_offset ?
           tnode.hdfs_scan_node.parquet_count_star_slot_offset : -1),
diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h
index 12b4748..f7c33ad 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -198,7 +198,9 @@
   const AvroSchemaElement& avro_schema() const { return *avro_schema_.get(); }
   int skip_header_line_count() const { return skip_header_line_count_; }
   io::RequestContext* reader_context() const { return reader_context_.get(); }
-  bool optimize_parquet_count_star() const { return optimize_parquet_count_star_; }
+  bool optimize_parquet_count_star() const {
+    return parquet_count_star_slot_offset_ != -1;
+  }
   int parquet_count_star_slot_offset() const { return parquet_count_star_slot_offset_; }
 
   typedef std::unordered_map<TupleId, std::vector<ScalarExprEvaluator*>>
@@ -417,13 +419,10 @@
   /// Tuple id resolved in Prepare() to set tuple_desc_
   const int tuple_id_;
 
-  /// Set to true when this scan node can optimize a count(*) query by populating the
-  /// tuple with data from the Parquet num rows statistic. See
+  /// The byte offset of the slot for Parquet metadata if Parquet count star optimization
+  /// is enabled. When set, this scan node can optimize a count(*) query by populating
+  /// the tuple with data from the Parquet num rows statistic. See
   /// applyParquetCountStartOptimization() in HdfsScanNode.java.
-  const bool optimize_parquet_count_star_;
-
-  // The byte offset of the slot for Parquet metadata if Parquet count star optimization
-  // is enabled.
   const int parquet_count_star_slot_offset_;
 
   /// RequestContext object to use with the disk-io-mgr for reads.
diff --git a/be/src/exec/kudu-scan-node-base.cc b/be/src/exec/kudu-scan-node-base.cc
index 08c50d7..35bd2c1 100644
--- a/be/src/exec/kudu-scan-node-base.cc
+++ b/be/src/exec/kudu-scan-node-base.cc
@@ -53,7 +53,10 @@
 KuduScanNodeBase::KuduScanNodeBase(
     ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
   : ScanNode(pool, tnode, descs),
-    tuple_id_(tnode.kudu_scan_node.tuple_id) {
+    tuple_id_(tnode.kudu_scan_node.tuple_id),
+    count_star_slot_offset_(
+            tnode.kudu_scan_node.__isset.count_star_slot_offset ?
+            tnode.kudu_scan_node.count_star_slot_offset : -1) {
   DCHECK(KuduIsAvailable());
 }
 
diff --git a/be/src/exec/kudu-scan-node-base.h b/be/src/exec/kudu-scan-node-base.h
index ed010c2..6aa5e28 100644
--- a/be/src/exec/kudu-scan-node-base.h
+++ b/be/src/exec/kudu-scan-node-base.h
@@ -42,6 +42,10 @@
   virtual Status Open(RuntimeState* state) override;
   virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos)
       override = 0;
+
+  bool optimize_count_star() const { return count_star_slot_offset_ != -1; }
+  int count_star_slot_offset() const { return count_star_slot_offset_; }
+
  protected:
   virtual void DebugString(int indentation_level, std::stringstream* out) const override;
 
@@ -79,6 +83,12 @@
   /// The next index in 'scan_tokens_' to be assigned.
   int next_scan_token_idx_ = 0;
 
+  /// The byte offset of the slot for Kudu metadata if count star optimization is enabled.
+  /// When set, this scan node can optimize a count(*) query by populating the
+  /// tuple with data from the num rows statistic.
+  /// See applyCountStartOptimization() in KuduScanNode.java.
+  const int count_star_slot_offset_;
+
   RuntimeProfile::Counter* kudu_round_trips_ = nullptr;
   RuntimeProfile::Counter* kudu_remote_tokens_ = nullptr;
   RuntimeProfile::Counter* kudu_client_time_ = nullptr;
diff --git a/be/src/exec/kudu-scanner.cc b/be/src/exec/kudu-scanner.cc
index f5e0729..4394851 100644
--- a/be/src/exec/kudu-scanner.cc
+++ b/be/src/exec/kudu-scanner.cc
@@ -105,18 +105,51 @@
   last_alive_time_micros_ = now;
 }
 
+Status KuduScanner::GetNextWithCountStarOptimization(RowBatch* row_batch, bool* eos) {
+  int64_t counter = 0;
+  while (scanner_->HasMoreRows()) {
+    RETURN_IF_CANCELLED(state_);
+    RETURN_IF_ERROR(GetNextScannerBatch());
+
+    cur_kudu_batch_num_read_ = static_cast<int64_t>(cur_kudu_batch_.NumRows());
+    counter += cur_kudu_batch_num_read_;
+  }
+  *eos = true;
+  int64_t tuple_buffer_size;
+  uint8_t* tuple_buffer;
+  int capacity = 1;
+  RETURN_IF_ERROR(row_batch->ResizeAndAllocateTupleBuffer(state_,
+      row_batch->tuple_data_pool(), row_batch->row_desc()->GetRowSize(), &capacity,
+      &tuple_buffer_size, &tuple_buffer));
+  Tuple* tuple = reinterpret_cast<Tuple*>(tuple_buffer);
+  Tuple::ClearNullBits(tuple, scan_node_->tuple_desc()->null_bytes_offset(),
+      scan_node_->tuple_desc()->num_null_bytes());
+  int64_t* counter_slot = tuple->GetBigIntSlot(scan_node_->count_star_slot_offset());
+  *counter_slot = counter;
+  TupleRow* dst_row = row_batch->GetRow(row_batch->AddRow());
+  dst_row->SetTuple(0, tuple);
+  row_batch->CommitLastRow();
+
+  CloseCurrentClientScanner();
+  return Status::OK();
+}
+
 Status KuduScanner::GetNext(RowBatch* row_batch, bool* eos) {
   SCOPED_TIMER(scan_node_->materialize_tuple_timer());
+  // Optimized scanning for count(*), only write the NumRows
+  if (scan_node_->optimize_count_star()) {
+    return GetNextWithCountStarOptimization(row_batch, eos);
+  }
   int64_t tuple_buffer_size;
   uint8_t* tuple_buffer;
   RETURN_IF_ERROR(
       row_batch->ResizeAndAllocateTupleBuffer(state_, &tuple_buffer_size, &tuple_buffer));
-  Tuple* tuple = reinterpret_cast<Tuple*>(tuple_buffer);
 
   // Main scan loop:
   // Tries to fill 'row_batch' with rows from cur_kudu_batch_.
   // If there are no rows to decode, tries to get the next row batch from kudu.
   // If this scanner has no more rows, the scanner is closed and eos is returned.
+  Tuple* tuple = reinterpret_cast<Tuple*>(tuple_buffer);
   while (!*eos) {
     RETURN_IF_CANCELLED(state_);
 
@@ -277,7 +310,7 @@
 }
 
 Status KuduScanner::DecodeRowsIntoRowBatch(RowBatch* row_batch, Tuple** tuple_mem) {
-  // Short-circuit the count(*) case.
+  // Short-circuit for empty projection cases.
   if (scan_node_->tuple_desc()->slots().empty()) {
     return HandleEmptyProjection(row_batch);
   }
diff --git a/be/src/exec/kudu-scanner.h b/be/src/exec/kudu-scanner.h
index 2a44725..011a40b 100644
--- a/be/src/exec/kudu-scanner.h
+++ b/be/src/exec/kudu-scanner.h
@@ -62,6 +62,11 @@
   void Close();
 
  private:
+  /// Handles count(*) queries, writing only the NumRows from the Kudu batch.
+  /// The optimization is possible only in simpler cases e.g. when there are no conjucts.
+  /// Check ScanNode.java#canApplyCountStarOptimization for full detail.
+  Status GetNextWithCountStarOptimization(RowBatch* row_batch, bool* eos);
+
   /// Handles the case where the projection is empty (e.g. count(*)).
   /// Does this by adding sets of rows to 'row_batch' instead of adding one-by-one.
   /// If in the rare case where there is any conjunct, evaluate them once for each row
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index b8b76d1..ab4ae66 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -324,6 +324,9 @@
   // Indicates whether the MT scan node implementation should be used.
   // If this is true, then the MT_DOP query option must be > 0.
   2: optional bool use_mt_scan_node
+
+  // The byte offset of the slot for Kudu metadata if count star optimization is enabled.
+  3: optional i32 count_star_slot_offset
 }
 
 struct TEqJoinCondition {
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index febc287..41480df 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -122,13 +122,6 @@
  * conjuncts can be used to prune a row group by evaluating conjuncts on the
  * column dictionaries.
  *
- * Count(*) aggregation optimization flow:
- * The caller passes in an AggregateInfo to the constructor that this scan node uses to
- * determine whether to apply the optimization or not. The produced smap must then be
- * applied to the AggregateInfo in this query block. We do not apply the smap in this
- * class directly to avoid side effects and make it easier to reason about.
- * See HdfsScanNode.applyParquetCountStartOptimization().
- *
  * TODO: pass in range restrictions.
  */
 public class HdfsScanNode extends ScanNode {
@@ -207,10 +200,6 @@
   private final TReplicaPreference replicaPreference_;
   private final boolean randomReplica_;
 
-  // The AggregationInfo from the query block of this scan node. Used for determining if
-  // the Parquet count(*) optimization can be applied.
-  private final AggregateInfo aggInfo_;
-
   // Number of partitions, files and bytes scanned. Set in computeScanRangeLocations().
   // Might not match 'partitions_' due to table sampling. Grouped by the FsType, so
   // each key value pair maps how many partitions / files / bytes are stored on each fs.
@@ -243,11 +232,6 @@
   // True if this scan node should use the MT implementation in the backend.
   private boolean useMtScanNode_;
 
-  // Should be applied to the AggregateInfo from the same query block. We cannot use the
-  // PlanNode.outputSmap_ for this purpose because we don't want the smap entries to be
-  // propagated outside the query block.
-  protected ExprSubstitutionMap optimizedAggSmap_;
-
   // Conjuncts that can be evaluated while materializing the items (tuples) of
   // collection-typed slots. Maps from tuple descriptor to the conjuncts bound by that
   // tuple. Uses a linked hash map for consistent display in explain.
@@ -358,44 +342,14 @@
   }
 
   /**
-   * Adds a new slot descriptor to the tuple descriptor of this scan. The new slot will be
-   * used for storing the data extracted from the Parquet num rows statistic. Also adds an
-   * entry to 'optimizedAggSmap_' that substitutes count(*) with
-   * sum_init_zero(<new-slotref>). Returns the new slot descriptor.
-   */
-  private SlotDescriptor applyParquetCountStartOptimization(Analyzer analyzer) {
-    FunctionCallExpr countFn = new FunctionCallExpr(new FunctionName("count"),
-        FunctionParams.createStarParam());
-    countFn.analyzeNoThrow(analyzer);
-
-    // Create the sum function.
-    SlotDescriptor sd = analyzer.addSlotDescriptor(getTupleDesc());
-    sd.setType(Type.BIGINT);
-    sd.setIsMaterialized(true);
-    sd.setIsNullable(false);
-    sd.setLabel("parquet-stats: num_rows");
-    List<Expr> args = new ArrayList<>();
-    args.add(new SlotRef(sd));
-    FunctionCallExpr sumFn = new FunctionCallExpr("sum_init_zero", args);
-    sumFn.analyzeNoThrow(analyzer);
-
-    optimizedAggSmap_ = new ExprSubstitutionMap();
-    optimizedAggSmap_.put(countFn, sumFn);
-    return sd;
-  }
-
-  /**
    * Returns true if the Parquet count(*) optimization can be applied to the query block
    * of this scan node.
    */
-  private boolean canApplyParquetCountStarOptimization(Analyzer analyzer,
+  private boolean canApplyCountStarOptimization(Analyzer analyzer,
       Set<HdfsFileFormat> fileFormats) {
-    if (analyzer.getNumTableRefs() != 1) return false;
-    if (aggInfo_ == null || !aggInfo_.hasCountStarOnly()) return false;
     if (fileFormats.size() != 1) return false;
     if (!fileFormats.contains(HdfsFileFormat.PARQUET)) return false;
-    if (!conjuncts_.isEmpty()) return false;
-    return desc_.getMaterializedSlots().isEmpty() || desc_.hasClusteringColsOnly();
+    return canApplyCountStarOptimization(analyzer);
   }
 
   /**
@@ -427,10 +381,10 @@
       }
     }
 
-    if (canApplyParquetCountStarOptimization(analyzer, fileFormats_)) {
+    if (canApplyCountStarOptimization(analyzer, fileFormats_)) {
       Preconditions.checkState(desc_.getPath().destTable() != null);
       Preconditions.checkState(collectionConjuncts_.isEmpty());
-      countStarSlot_ = applyParquetCountStartOptimization(analyzer);
+      countStarSlot_ = applyCountStarOptimization(analyzer);
     }
 
     computeMemLayout(analyzer);
diff --git a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
index 0c86a65..ec19602 100644
--- a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
@@ -24,10 +24,15 @@
 import java.util.ListIterator;
 import java.util.Set;
 
+import org.apache.impala.analysis.AggregateInfo;
 import org.apache.impala.analysis.Analyzer;
 import org.apache.impala.analysis.BinaryPredicate;
 import org.apache.impala.analysis.BoolLiteral;
 import org.apache.impala.analysis.Expr;
+import org.apache.impala.analysis.ExprSubstitutionMap;
+import org.apache.impala.analysis.FunctionCallExpr;
+import org.apache.impala.analysis.FunctionName;
+import org.apache.impala.analysis.FunctionParams;
 import org.apache.impala.analysis.InPredicate;
 import org.apache.impala.analysis.IsNullPredicate;
 import org.apache.impala.analysis.LiteralExpr;
@@ -37,6 +42,7 @@
 import org.apache.impala.analysis.StringLiteral;
 import org.apache.impala.analysis.TupleDescriptor;
 import org.apache.impala.catalog.FeKuduTable;
+import org.apache.impala.catalog.HdfsFileFormat;
 import org.apache.impala.catalog.KuduColumn;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.ImpalaRuntimeException;
@@ -103,12 +109,20 @@
   // Exprs in kuduConjuncts_ converted to KuduPredicates.
   private final List<KuduPredicate> kuduPredicates_ = new ArrayList<>();
 
-  public KuduScanNode(PlanNodeId id, TupleDescriptor desc, List<Expr> conjuncts) {
+  // Slot that is used to record the Kudu metadata for the count(*) aggregation if
+  // this scan node has the count(*) optimization enabled.
+  private SlotDescriptor countStarSlot_ = null;
+
+  public KuduScanNode(PlanNodeId id, TupleDescriptor desc, List<Expr> conjuncts,
+      AggregateInfo aggInfo) {
     super(id, desc, "SCAN KUDU");
     kuduTable_ = (FeKuduTable) desc_.getTable();
     conjuncts_ = conjuncts;
+    aggInfo_ = aggInfo;
   }
 
+  public ExprSubstitutionMap getOptimizedAggSmap() { return optimizedAggSmap_; }
+
   @Override
   public void init(Analyzer analyzer) throws ImpalaRuntimeException {
     conjuncts_ = orderConjunctsByCost(conjuncts_);
@@ -119,6 +133,12 @@
           client.openTable(kuduTable_.getKuduTableName());
       validateSchema(rpcTable);
 
+      if (canApplyCountStarOptimization(analyzer)) {
+        Preconditions.checkState(desc_.getPath().destTable() != null);
+        Preconditions.checkState(kuduConjuncts_.isEmpty());
+        countStarSlot_ = applyCountStarOptimization(analyzer);
+      }
+
       // Extract predicates that can be evaluated by Kudu.
       extractKuduConjuncts(analyzer, client, rpcTable);
 
@@ -153,6 +173,7 @@
       throws ImpalaRuntimeException {
     Schema tableSchema = rpcTable.getSchema();
     for (SlotDescriptor desc: getTupleDesc().getSlots()) {
+      if (!desc.isScanSlot()) continue;
       String colName = ((KuduColumn) desc.getColumn()).getKuduName();
       Type colType = desc.getColumn().getType();
       ColumnSchema kuduCol = null;
@@ -240,7 +261,9 @@
       org.apache.kudu.client.KuduTable rpcTable) {
     List<String> projectedCols = new ArrayList<>();
     for (SlotDescriptor desc: getTupleDesc().getSlotsOrderedByOffset()) {
-      projectedCols.add(((KuduColumn) desc.getColumn()).getKuduName());
+      if (!isCountStarOptimizationDescriptor(desc)) {
+        projectedCols.add(((KuduColumn) desc.getColumn()).getKuduName());
+      }
     }
     KuduScanTokenBuilder tokenBuilder = client.newScanTokenBuilder(rpcTable);
     tokenBuilder.setProjectedColumnNames(projectedCols);
@@ -332,6 +355,11 @@
     node.node_type = TPlanNodeType.KUDU_SCAN_NODE;
     node.kudu_scan_node = new TKuduScanNode(desc_.getId().asInt());
     node.kudu_scan_node.setUse_mt_scan_node(useMtScanNode_);
+
+    Preconditions.checkState((optimizedAggSmap_ == null) == (countStarSlot_ == null));
+    if (countStarSlot_ != null) {
+      node.kudu_scan_node.setCount_star_slot_offset(countStarSlot_.getByteOffset());
+    }
   }
 
   /**
diff --git a/fe/src/main/java/org/apache/impala/planner/ScanNode.java b/fe/src/main/java/org/apache/impala/planner/ScanNode.java
index 78f3510..1547320 100644
--- a/fe/src/main/java/org/apache/impala/planner/ScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/ScanNode.java
@@ -20,7 +20,15 @@
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.impala.analysis.AggregateInfo;
+import org.apache.impala.analysis.Analyzer;
+import org.apache.impala.analysis.Expr;
+import org.apache.impala.analysis.ExprSubstitutionMap;
+import org.apache.impala.analysis.FunctionCallExpr;
+import org.apache.impala.analysis.FunctionName;
+import org.apache.impala.analysis.FunctionParams;
 import org.apache.impala.analysis.SlotDescriptor;
+import org.apache.impala.analysis.SlotRef;
 import org.apache.impala.analysis.TupleDescriptor;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.HdfsFileFormat;
@@ -56,6 +64,21 @@
   // Scan-range specs. Populated in init().
   protected TScanRangeSpec scanRangeSpecs_;
 
+  // The AggregationInfo from the query block of this scan node. Used for determining if
+  // the count(*) optimization can be applied.
+  // Count(*) aggregation optimization flow:
+  // The caller passes in an AggregateInfo to the constructor that this scan node uses to
+  // determine whether to apply the optimization or not. The produced smap must then be
+  // applied to the AggregateInfo in this query block. We do not apply the smap in this
+  // class directly to avoid side effects and make it easier to reason about.
+  protected AggregateInfo aggInfo_ = null;
+  protected static final String STATS_NUM_ROWS = "stats: num_rows";
+
+  // Should be applied to the AggregateInfo from the same query block. We cannot use the
+  // PlanNode.outputSmap_ for this purpose because we don't want the smap entries to be
+  // propagated outside the query block.
+  protected ExprSubstitutionMap optimizedAggSmap_;
+
   public ScanNode(PlanNodeId id, TupleDescriptor desc, String displayName) {
     super(id, desc.getId().asList(), displayName);
     desc_ = desc;
@@ -88,6 +111,48 @@
     }
   }
 
+  protected boolean isCountStarOptimizationDescriptor(SlotDescriptor desc) {
+    return desc.getLabel().equals(STATS_NUM_ROWS);
+  }
+
+  /**
+   * Adds a new slot descriptor to the tuple descriptor of this scan. The new slot will be
+   * used for storing the data extracted from the Kudu num rows statistic. Also adds an
+   * entry to 'optimizedAggSmap_' that substitutes count(*) with
+   * sum_init_zero(<new-slotref>). Returns the new slot descriptor.
+   */
+  protected SlotDescriptor applyCountStarOptimization(Analyzer analyzer) {
+    FunctionCallExpr countFn = new FunctionCallExpr(new FunctionName("count"),
+        FunctionParams.createStarParam());
+    countFn.analyzeNoThrow(analyzer);
+
+    // Create the sum function.
+    SlotDescriptor sd = analyzer.addSlotDescriptor(getTupleDesc());
+    sd.setType(Type.BIGINT);
+    sd.setIsMaterialized(true);
+    sd.setIsNullable(false);
+    sd.setLabel(STATS_NUM_ROWS);
+    List<Expr> args = new ArrayList<>();
+    args.add(new SlotRef(sd));
+    FunctionCallExpr sumFn = new FunctionCallExpr("sum_init_zero", args);
+    sumFn.analyzeNoThrow(analyzer);
+
+    optimizedAggSmap_ = new ExprSubstitutionMap();
+    optimizedAggSmap_.put(countFn, sumFn);
+    return sd;
+  }
+
+  /**
+   * Returns true if the count(*) optimization can be applied to the query block
+   * of this scan node.
+   */
+  protected boolean canApplyCountStarOptimization(Analyzer analyzer) {
+    if (analyzer.getNumTableRefs() != 1)  return false;
+    if (aggInfo_ == null || !aggInfo_.hasCountStarOnly()) return false;
+    if (!conjuncts_.isEmpty()) return false;
+    return desc_.getMaterializedSlots().isEmpty() || desc_.hasClusteringColsOnly();
+  }
+
   /**
    * Returns all scan range specs.
    */
diff --git a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
index 3a3dba3..f11c031 100644
--- a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
@@ -663,10 +663,16 @@
     // Add aggregation, if any.
     if (multiAggInfo != null) {
       // Apply substitution for optimized scan/agg plan,
-      if (scanAggInfo != null && root instanceof HdfsScanNode) {
-        scanAggInfo.substitute(((HdfsScanNode) root).getOptimizedAggSmap(), analyzer);
-        scanAggInfo.getMergeAggInfo().substitute(
-            ((HdfsScanNode) root).getOptimizedAggSmap(), analyzer);
+      if (scanAggInfo != null) {
+        if (root instanceof HdfsScanNode) {
+          scanAggInfo.substitute(((HdfsScanNode) root).getOptimizedAggSmap(), analyzer);
+          scanAggInfo.getMergeAggInfo().substitute(
+              ((HdfsScanNode) root).getOptimizedAggSmap(), analyzer);
+        } else if (root instanceof KuduScanNode) {
+          scanAggInfo.substitute(((KuduScanNode) root).getOptimizedAggSmap(), analyzer);
+          scanAggInfo.getMergeAggInfo().substitute(
+              ((KuduScanNode) root).getOptimizedAggSmap(), analyzer);
+        }
       }
       root = createAggregationPlan(selectStmt, analyzer, root);
     }
@@ -1357,7 +1363,7 @@
    * Create node for scanning all data files of a particular table.
    *
    * The given 'aggInfo' is used for detecting and applying optimizations that span both
-   * the scan and aggregation. Only applicable to HDFS table refs.
+   * the scan and aggregation. Only applicable to HDFS and Kudu table refs.
    *
    * Throws if a PlanNode.init() failed or if planning of the given
    * table ref is not implemented.
@@ -1405,7 +1411,8 @@
       scanNode.init(analyzer);
       return scanNode;
     } else if (tblRef.getTable() instanceof FeKuduTable) {
-      scanNode = new KuduScanNode(ctx_.getNextNodeId(), tblRef.getDesc(), conjuncts);
+      scanNode = new KuduScanNode(ctx_.getNextNodeId(), tblRef.getDesc(), conjuncts,
+          aggInfo);
       scanNode.init(analyzer);
       return scanNode;
     } else {
@@ -1570,7 +1577,7 @@
    * CollectionTableRef or an InlineViewRef.
    *
    * The given 'aggInfo' is used for detecting and applying optimizations that span both
-   * the scan and aggregation. Only applicable to HDFS table refs.
+   * the scan and aggregation. Only applicable to HDFS and Kudu table refs.
    *
    * Throws if a PlanNode.init() failed or if planning of the given
    * table ref is not implemented.
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index 4873caf..a0e88c2 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -623,6 +623,11 @@
   }
 
   @Test
+  public void testKuduStatsAgg() {
+    runPlannerTestFile("kudu-stats-agg");
+  }
+
+  @Test
   public void testMtDopValidation() {
     // Tests that queries supported with mt_dop > 0 produce a parallel plan, or
     // throw a NotImplementedException otherwise (e.g. plan has a distributed join).
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/disable-codegen.test b/testdata/workloads/functional-planner/queries/PlannerTest/disable-codegen.test
index 3d2702b..0fb3fe2 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/disable-codegen.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/disable-codegen.test
@@ -60,7 +60,7 @@
 02:EXCHANGE [UNPARTITIONED]
 |
 01:AGGREGATE
-|  output: sum_init_zero(functional_parquet.alltypes.parquet-stats: num_rows)
+|  output: sum_init_zero(functional_parquet.alltypes.stats: num_rows)
 |  row-size=8B cardinality=1
 |
 00:SCAN HDFS [functional_parquet.alltypes]
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/kudu-stats-agg.test b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-stats-agg.test
new file mode 100644
index 0000000..0053587
--- /dev/null
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-stats-agg.test
@@ -0,0 +1,272 @@
+# Verify that that the kudu count(*) optimization is applied in all count(*) or
+# count(<literal>) cases when scanning a Kudu table. In the last case, we are scanning
+# a text table, so the optimization is not applied.
+select count(*) from functional_kudu.alltypes
+union all
+select count(1) from functional_kudu.alltypes
+union all
+select count(123) from functional_kudu.alltypes
+union all
+select count(*) from functional.alltypes
+---- PLAN
+PLAN-ROOT SINK
+|
+00:UNION
+|  pass-through-operands: all
+|  row-size=8B cardinality=4
+|
+|--08:AGGREGATE [FINALIZE]
+|  |  output: count(*)
+|  |  row-size=8B cardinality=1
+|  |
+|  07:SCAN HDFS [functional.alltypes]
+|     HDFS partitions=24/24 files=24 size=478.45KB
+|     row-size=0B cardinality=7.30K
+|
+|--06:AGGREGATE [FINALIZE]
+|  |  output: sum_init_zero(functional_kudu.alltypes.stats: num_rows)
+|  |  row-size=8B cardinality=1
+|  |
+|  05:SCAN KUDU [functional_kudu.alltypes]
+|     row-size=8B cardinality=unavailable
+|
+|--04:AGGREGATE [FINALIZE]
+|  |  output: sum_init_zero(functional_kudu.alltypes.stats: num_rows)
+|  |  row-size=8B cardinality=1
+|  |
+|  03:SCAN KUDU [functional_kudu.alltypes]
+|     row-size=8B cardinality=unavailable
+|
+02:AGGREGATE [FINALIZE]
+|  output: sum_init_zero(functional_kudu.alltypes.stats: num_rows)
+|  row-size=8B cardinality=1
+|
+01:SCAN KUDU [functional_kudu.alltypes]
+   row-size=8B cardinality=unavailable
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+00:UNION
+|  pass-through-operands: all
+|  row-size=8B cardinality=4
+|
+|--16:AGGREGATE [FINALIZE]
+|  |  output: count:merge(*)
+|  |  row-size=8B cardinality=1
+|  |
+|  15:EXCHANGE [UNPARTITIONED]
+|  |
+|  08:AGGREGATE
+|  |  output: count(*)
+|  |  row-size=8B cardinality=1
+|  |
+|  07:SCAN HDFS [functional.alltypes]
+|     HDFS partitions=24/24 files=24 size=478.45KB
+|     row-size=0B cardinality=7.30K
+|
+|--14:AGGREGATE [FINALIZE]
+|  |  output: count:merge(*)
+|  |  row-size=8B cardinality=1
+|  |
+|  13:EXCHANGE [UNPARTITIONED]
+|  |
+|  06:AGGREGATE
+|  |  output: sum_init_zero(functional_kudu.alltypes.stats: num_rows)
+|  |  row-size=8B cardinality=1
+|  |
+|  05:SCAN KUDU [functional_kudu.alltypes]
+|     row-size=8B cardinality=unavailable
+|
+|--12:AGGREGATE [FINALIZE]
+|  |  output: count:merge(*)
+|  |  row-size=8B cardinality=1
+|  |
+|  11:EXCHANGE [UNPARTITIONED]
+|  |
+|  04:AGGREGATE
+|  |  output: sum_init_zero(functional_kudu.alltypes.stats: num_rows)
+|  |  row-size=8B cardinality=1
+|  |
+|  03:SCAN KUDU [functional_kudu.alltypes]
+|     row-size=8B cardinality=unavailable
+|
+10:AGGREGATE [FINALIZE]
+|  output: count:merge(*)
+|  row-size=8B cardinality=1
+|
+09:EXCHANGE [UNPARTITIONED]
+|
+02:AGGREGATE
+|  output: sum_init_zero(functional_kudu.alltypes.stats: num_rows)
+|  row-size=8B cardinality=1
+|
+01:SCAN KUDU [functional_kudu.alltypes]
+   row-size=8B cardinality=unavailable
+====
+# Verify that the kudu count(*) optimization is applied even if there is more than
+# one item in the select list.
+select count(*), count(1), count(123) from functional_kudu.alltypes
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: sum_init_zero(functional_kudu.alltypes.stats: num_rows)
+|  row-size=8B cardinality=1
+|
+00:SCAN KUDU [functional_kudu.alltypes]
+   row-size=8B cardinality=unavailable
+====
+# The optimization is disabled because tinyint_col is not a partition col.
+select tinyint_col, count(*) from functional_kudu.alltypes group by tinyint_col, year
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  group by: tinyint_col, `year`
+|  row-size=13B cardinality=unavailable
+|
+00:SCAN KUDU [functional_kudu.alltypes]
+   row-size=5B cardinality=unavailable
+====
+# The optimization is disabled because there are two aggregate functions.
+select avg(year), count(*) from functional_kudu.alltypes
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: avg(`year`), count(*)
+|  row-size=16B cardinality=1
+|
+00:SCAN KUDU [functional_kudu.alltypes]
+   row-size=4B cardinality=unavailable
+====
+# Optimization is not applied because the inner count(*) is not materialized. The outer
+# count(*) does not reference a base table.
+select count(*) from (select count(*) from functional_kudu.alltypes) t
+---- PLAN
+PLAN-ROOT SINK
+|
+02:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  row-size=8B cardinality=1
+|
+01:AGGREGATE [FINALIZE]
+|  row-size=0B cardinality=1
+|
+00:SCAN KUDU [functional_kudu.alltypes]
+   row-size=0B cardinality=unavailable
+====
+# The optimization is applied if count(*) is in the having clause.
+select 1 from functional_kudu.alltypes having count(*) > 1
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: sum_init_zero(functional_kudu.alltypes.stats: num_rows)
+|  having: count(*) > 1
+|  row-size=8B cardinality=0
+|
+00:SCAN KUDU [functional_kudu.alltypes]
+   row-size=8B cardinality=unavailable
+====
+# The count(*) optimization is applied in the inline view.
+select count(*), count(a) from (select count(1) as a from functional_kudu.alltypes) t
+---- PLAN
+PLAN-ROOT SINK
+|
+02:AGGREGATE [FINALIZE]
+|  output: count(*), count(count(*))
+|  row-size=16B cardinality=1
+|
+01:AGGREGATE [FINALIZE]
+|  output: sum_init_zero(functional_kudu.alltypes.stats: num_rows)
+|  row-size=8B cardinality=1
+|
+00:SCAN KUDU [functional_kudu.alltypes]
+   row-size=8B cardinality=unavailable
+====
+# The count(*) optimization is not applied if there is more than 1 table ref.
+select count(*) from functional_kudu.alltypes a, functional_kudu.alltypes b
+---- PLAN
+PLAN-ROOT SINK
+|
+03:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  row-size=8B cardinality=1
+|
+02:NESTED LOOP JOIN [CROSS JOIN]
+|  row-size=0B cardinality=unavailable
+|
+|--01:SCAN KUDU [functional_kudu.alltypes b]
+|     row-size=0B cardinality=unavailable
+|
+00:SCAN KUDU [functional_kudu.alltypes a]
+   row-size=0B cardinality=unavailable
+====
+# Optimization is not applied to count(null).
+select count(1 + null + 3) from functional_kudu.alltypes
+union all
+select count(null) from functional_kudu.alltypes
+---- PLAN
+PLAN-ROOT SINK
+|
+00:UNION
+|  pass-through-operands: all
+|  row-size=8B cardinality=2
+|
+|--04:AGGREGATE [FINALIZE]
+|  |  output: count(NULL)
+|  |  row-size=8B cardinality=1
+|  |
+|  03:SCAN KUDU [functional_kudu.alltypes]
+|     row-size=0B cardinality=unavailable
+|
+02:AGGREGATE [FINALIZE]
+|  output: count(NULL + 3)
+|  row-size=8B cardinality=1
+|
+01:SCAN KUDU [functional_kudu.alltypes]
+   row-size=0B cardinality=unavailable
+====
+# Optimization is not applied when selecting from an empty table.
+select count(*) from functional_kudu.emptytable
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: sum_init_zero(functional_kudu.emptytable.stats: num_rows)
+|  row-size=8B cardinality=0
+|
+00:SCAN KUDU [functional_kudu.emptytable]
+   row-size=0B cardinality=0
+====
+# Optimization is not applied across query blocks, even though it would be correct here.
+select count(*) from (select int_col from functional_kudu.alltypes) t
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  row-size=8B cardinality=1
+|
+00:SCAN KUDU [functional_kudu.alltypes]
+   row-size=0B cardinality=unavailable
+====
+# Optimization is not applied when there is a distinct agg.
+select count(*), count(distinct 1) from functional_kudu.alltypes
+---- PLAN
+PLAN-ROOT SINK
+|
+02:AGGREGATE [FINALIZE]
+|  output: count(1), count:merge(*)
+|  row-size=16B cardinality=1
+|
+01:AGGREGATE
+|  output: count(*)
+|  group by: 1
+|  row-size=9B cardinality=1
+|
+00:SCAN KUDU [functional_kudu.alltypes]
+   row-size=0B cardinality=unavailable
+====
\ No newline at end of file
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/parquet-stats-agg.test b/testdata/workloads/functional-planner/queries/PlannerTest/parquet-stats-agg.test
index 6ec083a..f08f40e 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/parquet-stats-agg.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/parquet-stats-agg.test
@@ -24,7 +24,7 @@
 |     row-size=0B cardinality=7.30K
 |
 |--06:AGGREGATE [FINALIZE]
-|  |  output: sum_init_zero(functional_parquet.alltypes.parquet-stats: num_rows)
+|  |  output: sum_init_zero(functional_parquet.alltypes.stats: num_rows)
 |  |  row-size=8B cardinality=1
 |  |
 |  05:SCAN HDFS [functional_parquet.alltypes]
@@ -32,7 +32,7 @@
 |     row-size=8B cardinality=unavailable
 |
 |--04:AGGREGATE [FINALIZE]
-|  |  output: sum_init_zero(functional_parquet.alltypes.parquet-stats: num_rows)
+|  |  output: sum_init_zero(functional_parquet.alltypes.stats: num_rows)
 |  |  row-size=8B cardinality=1
 |  |
 |  03:SCAN HDFS [functional_parquet.alltypes]
@@ -40,7 +40,7 @@
 |     row-size=8B cardinality=unavailable
 |
 02:AGGREGATE [FINALIZE]
-|  output: sum_init_zero(functional_parquet.alltypes.parquet-stats: num_rows)
+|  output: sum_init_zero(functional_parquet.alltypes.stats: num_rows)
 |  row-size=8B cardinality=1
 |
 01:SCAN HDFS [functional_parquet.alltypes]
@@ -74,7 +74,7 @@
 |  13:EXCHANGE [UNPARTITIONED]
 |  |
 |  06:AGGREGATE
-|  |  output: sum_init_zero(functional_parquet.alltypes.parquet-stats: num_rows)
+|  |  output: sum_init_zero(functional_parquet.alltypes.stats: num_rows)
 |  |  row-size=8B cardinality=1
 |  |
 |  05:SCAN HDFS [functional_parquet.alltypes]
@@ -88,7 +88,7 @@
 |  11:EXCHANGE [UNPARTITIONED]
 |  |
 |  04:AGGREGATE
-|  |  output: sum_init_zero(functional_parquet.alltypes.parquet-stats: num_rows)
+|  |  output: sum_init_zero(functional_parquet.alltypes.stats: num_rows)
 |  |  row-size=8B cardinality=1
 |  |
 |  03:SCAN HDFS [functional_parquet.alltypes]
@@ -102,7 +102,7 @@
 09:EXCHANGE [UNPARTITIONED]
 |
 02:AGGREGATE
-|  output: sum_init_zero(functional_parquet.alltypes.parquet-stats: num_rows)
+|  output: sum_init_zero(functional_parquet.alltypes.stats: num_rows)
 |  row-size=8B cardinality=1
 |
 01:SCAN HDFS [functional_parquet.alltypes]
@@ -116,7 +116,7 @@
 PLAN-ROOT SINK
 |
 01:AGGREGATE [FINALIZE]
-|  output: sum_init_zero(functional_parquet.alltypes.parquet-stats: num_rows)
+|  output: sum_init_zero(functional_parquet.alltypes.stats: num_rows)
 |  row-size=8B cardinality=1
 |
 00:SCAN HDFS [functional_parquet.alltypes]
@@ -143,7 +143,7 @@
 PLAN-ROOT SINK
 |
 01:AGGREGATE [FINALIZE]
-|  output: sum_init_zero(functional_parquet.alltypes.parquet-stats: num_rows)
+|  output: sum_init_zero(functional_parquet.alltypes.stats: num_rows)
 |  group by: `month`, `year`
 |  row-size=16B cardinality=24
 |
@@ -201,7 +201,7 @@
 PLAN-ROOT SINK
 |
 01:AGGREGATE [FINALIZE]
-|  output: sum_init_zero(functional_parquet.alltypes.parquet-stats: num_rows)
+|  output: sum_init_zero(functional_parquet.alltypes.stats: num_rows)
 |  having: count(*) > 1
 |  row-size=8B cardinality=0
 |
@@ -219,7 +219,7 @@
 |  row-size=16B cardinality=1
 |
 01:AGGREGATE [FINALIZE]
-|  output: sum_init_zero(functional_parquet.alltypes.parquet-stats: num_rows)
+|  output: sum_init_zero(functional_parquet.alltypes.stats: num_rows)
 |  row-size=8B cardinality=1
 |
 00:SCAN HDFS [functional_parquet.alltypes]
@@ -240,7 +240,7 @@
 |  row-size=101B cardinality=7.30K
 |
 |--02:AGGREGATE [FINALIZE]
-|  |  output: sum_init_zero(functional_parquet.alltypes.parquet-stats: num_rows)
+|  |  output: sum_init_zero(functional_parquet.alltypes.stats: num_rows)
 |  |  group by: `year`
 |  |  row-size=12B cardinality=2
 |  |
@@ -279,7 +279,7 @@
 PLAN-ROOT SINK
 |
 01:AGGREGATE [FINALIZE]
-|  output: sum_init_zero(functional_parquet.alltypes.parquet-stats: num_rows)
+|  output: sum_init_zero(functional_parquet.alltypes.stats: num_rows)
 |  row-size=8B cardinality=1
 |
 00:SCAN HDFS [functional_parquet.alltypes]
@@ -308,7 +308,7 @@
 PLAN-ROOT SINK
 |
 01:AGGREGATE [FINALIZE]
-|  output: sum_init_zero(functional_parquet.alltypes.parquet-stats: num_rows)
+|  output: sum_init_zero(functional_parquet.alltypes.stats: num_rows)
 |  row-size=8B cardinality=1
 |
 00:SCAN HDFS [functional_parquet.alltypes]
@@ -412,7 +412,7 @@
 PLAN-ROOT SINK
 |
 01:AGGREGATE [FINALIZE]
-|  output: sum_init_zero(functional_parquet.alltypes.parquet-stats: num_rows)
+|  output: sum_init_zero(functional_parquet.alltypes.stats: num_rows)
 |  group by: `year`
 |  row-size=12B cardinality=2
 |
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
index 80df92d..d9b4c7d 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
@@ -2141,7 +2141,7 @@
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 01:AGGREGATE [FINALIZE]
-|  output: sum_init_zero(tpch_parquet.lineitem.parquet-stats: num_rows)
+|  output: sum_init_zero(tpch_parquet.lineitem.stats: num_rows)
 |  mem-estimate=10.00MB mem-reservation=0B spill-buffer=2.00MB thread-reservation=0
 |  tuple-ids=1 row-size=8B cardinality=1
 |  in pipelines: 01(GETNEXT), 00(OPEN)
@@ -2180,7 +2180,7 @@
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
 Per-Host Resources: mem-estimate=11.00MB mem-reservation=128.00KB thread-reservation=2
 01:AGGREGATE
-|  output: sum_init_zero(tpch_parquet.lineitem.parquet-stats: num_rows)
+|  output: sum_init_zero(tpch_parquet.lineitem.stats: num_rows)
 |  mem-estimate=10.00MB mem-reservation=0B spill-buffer=2.00MB thread-reservation=0
 |  tuple-ids=1 row-size=8B cardinality=1
 |  in pipelines: 01(GETNEXT), 00(OPEN)
@@ -2219,7 +2219,7 @@
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=6
 Per-Host Resources: mem-estimate=180.00MB mem-reservation=256.00KB thread-reservation=2
 01:AGGREGATE
-|  output: sum_init_zero(tpch_parquet.lineitem.parquet-stats: num_rows)
+|  output: sum_init_zero(tpch_parquet.lineitem.stats: num_rows)
 |  mem-estimate=10.00MB mem-reservation=0B spill-buffer=2.00MB thread-reservation=0
 |  tuple-ids=1 row-size=8B cardinality=1
 |  in pipelines: 01(GETNEXT), 00(OPEN)
@@ -5913,19 +5913,19 @@
 Analyzed query: SELECT count(*) FROM functional_kudu.alltypes
 
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=10.00MB mem-reservation=0B thread-reservation=2
+|  Per-Host Resources: mem-estimate=10.38MB mem-reservation=0B thread-reservation=2
 PLAN-ROOT SINK
 |  output exprs: count(*)
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 01:AGGREGATE [FINALIZE]
-|  output: count(*)
+|  output: sum_init_zero(functional_kudu.alltypes.stats: num_rows)
 |  mem-estimate=10.00MB mem-reservation=0B spill-buffer=2.00MB thread-reservation=0
 |  tuple-ids=1 row-size=8B cardinality=1
 |  in pipelines: 01(GETNEXT), 00(OPEN)
 |
 00:SCAN KUDU [functional_kudu.alltypes]
-   mem-estimate=0B mem-reservation=0B thread-reservation=1
+   mem-estimate=384.00KB mem-reservation=0B thread-reservation=1
    tuple-ids=0 row-size=0B cardinality=7.30K
    in pipelines: 00(GETNEXT)
 ====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/kudu-stats-agg.test b/testdata/workloads/functional-query/queries/QueryTest/kudu-stats-agg.test
new file mode 100644
index 0000000..24e53f3
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/kudu-stats-agg.test
@@ -0,0 +1,100 @@
+====
+---- QUERY
+# Tests the correctness of the Kudu count(*) optimization.
+select count(1)
+from functional_kudu.alltypes
+---- RESULTS
+7300
+---- TYPES
+bigint
+=====
+---- QUERY
+# Kudu count(*) optimization with predicates on the partition columns.
+select count(1)
+from functional_kudu.alltypes where year < 2010 and month > 8
+---- RESULTS
+1220
+---- TYPES
+bigint
+=====
+---- QUERY
+# Kudu count(*) optimization with group by partition columns.
+select year, month, count(1)
+from functional_kudu.alltypes group by year, month
+---- RESULTS
+2009,1,310
+2009,2,280
+2009,3,310
+2009,4,300
+2009,5,310
+2009,6,300
+2009,7,310
+2009,8,310
+2009,9,300
+2009,10,310
+2009,11,300
+2009,12,310
+2010,1,310
+2010,2,280
+2010,3,310
+2010,4,300
+2010,5,310
+2010,6,300
+2010,7,310
+2010,8,310
+2010,9,300
+2010,10,310
+2010,11,300
+2010,12,310
+---- TYPES
+int, int, bigint
+=====
+---- QUERY
+# Kudu count(*) optimization with both group by and predicates on partition columns.
+select count(1)
+from functional_kudu.alltypes where year < 2010 and month > 8
+group by month
+---- RESULTS
+310
+300
+310
+300
+---- TYPES
+bigint
+=====
+---- QUERY
+# Kudu count(*) optimization with the result going into a join.
+select x.bigint_col from functional.alltypes x
+  inner join (
+    select count(1) as a from functional_kudu.alltypes group by year
+  ) t on x.id = t.a;
+---- RESULTS
+0
+0
+---- TYPES
+bigint
+=====
+---- QUERY
+# Kudu count(*) optimization with the agg function in the having clause.
+select 1 from functional_kudu.alltypes having count(*) > 1
+---- RESULTS
+1
+---- TYPES
+tinyint
+====
+---- QUERY
+# Verify that 0 is returned for count(*) on an empty table.
+select count(1) from functional_kudu.emptytable
+---- RESULTS
+0
+---- TYPES
+bigint
+=====
+---- QUERY
+# Verify that 0 is returned when all partitions are pruned.
+select count(1) from functional_kudu.alltypes where year = -1
+---- RESULTS
+0
+---- TYPES
+bigint
+=====
diff --git a/tests/query_test/test_aggregation.py b/tests/query_test/test_aggregation.py
index 038790e..62e9ee7 100644
--- a/tests/query_test/test_aggregation.py
+++ b/tests/query_test/test_aggregation.py
@@ -268,6 +268,15 @@
     vector.get_value('exec_option')['batch_size'] = 1
     self.run_test_case('QueryTest/parquet-stats-agg', vector, unique_database)
 
+  def test_kudu_count_star_optimization(self, vector, unique_database):
+    if (vector.get_value('table_format').file_format != 'text' or
+       vector.get_value('table_format').compression_codec != 'none'):
+      # No need to run this test on all file formats
+      pytest.skip()
+    self.run_test_case('QueryTest/kudu-stats-agg', vector, unique_database)
+    vector.get_value('exec_option')['batch_size'] = 1
+    self.run_test_case('QueryTest/kudu-stats-agg', vector, unique_database)
+
   def test_sampled_ndv(self, vector, unique_database):
     """The SAMPLED_NDV() function is inherently non-deterministic and cannot be
     reasonably made deterministic with existing options so we test it separately.