IMPALA-5713: always reserve memory for preaggs

Before this change the preaggregation was frequently disabled when
running under some memory pressure, e.g. if the aggregation is at the
end of a pipeline of joins and those joins eat up all the memory.
This can result in huge performance degradation since all rows must then
be sent over the network.

This change always reserves 16 * (buffer size + 64kb) bytes per
preaggregation so that it is always able to build some hash tables and
reduce the input somewhat.

This has two parts:
* Changing the frontend reservation calculation
* Removing dead code in the backend that handled the case when the
  initial partitions and hash tables could not be allocated.

Testing:
Passes exhaustive tests.

Change-Id: I2b544f9ec1a906719e2bbb074743926b95a3a573
Reviewed-on: http://gerrit.cloudera.org:8080/7739
Reviewed-by: Tim Armstrong <tarmstrong@cloudera.com>
Tested-by: Impala Public Jenkins
diff --git a/be/src/exec/partitioned-aggregation-node-ir.cc b/be/src/exec/partitioned-aggregation-node-ir.cc
index 9baada1..69d297c 100644
--- a/be/src/exec/partitioned-aggregation-node-ir.cc
+++ b/be/src/exec/partitioned-aggregation-node-ir.cc
@@ -221,7 +221,6 @@
   DCHECK(remaining_capacity != NULL);
   DCHECK_EQ(hash_tbl, partition->hash_tbl.get());
   DCHECK_GE(*remaining_capacity, 0);
-  if (hash_tbl == nullptr) return false; // Hash table was not created - pass through.
   bool found;
   // This is called from ProcessBatchStreaming() so the rows are not aggregated.
   HashTable::Iterator it = hash_tbl->FindBuildRowBucket(ht_ctx, &found);
diff --git a/be/src/exec/partitioned-aggregation-node.cc b/be/src/exec/partitioned-aggregation-node.cc
index 214810f..3824a72 100644
--- a/be/src/exec/partitioned-aggregation-node.cc
+++ b/be/src/exec/partitioned-aggregation-node.cc
@@ -529,12 +529,8 @@
     bool ht_needs_expansion = false;
     for (int i = 0; i < PARTITION_FANOUT; ++i) {
       HashTable* hash_tbl = GetHashTable(i);
-      if (hash_tbl == nullptr) {
-        remaining_capacity[i] = 0;
-      } else {
-        remaining_capacity[i] = hash_tbl->NumInsertsBeforeResize();
-        ht_needs_expansion |= remaining_capacity[i] < child_batch_->num_rows();
-      }
+      remaining_capacity[i] = hash_tbl->NumInsertsBeforeResize();
+      ht_needs_expansion |= remaining_capacity[i] < child_batch_->num_rows();
     }
 
     // Stop expanding hash tables if we're not reducing the input sufficiently. As our
@@ -545,7 +541,7 @@
     if (ht_needs_expansion && ShouldExpandPreaggHashTables()) {
       for (int i = 0; i < PARTITION_FANOUT; ++i) {
         HashTable* ht = GetHashTable(i);
-        if (ht != nullptr && remaining_capacity[i] < child_batch_->num_rows()) {
+        if (remaining_capacity[i] < child_batch_->num_rows()) {
           SCOPED_TIMER(ht_resize_timer_);
           bool resized;
           RETURN_IF_ERROR(
@@ -585,10 +581,8 @@
   int64_t ht_rows = 0;
   for (int i = 0; i < PARTITION_FANOUT; ++i) {
     HashTable* ht = hash_partitions_[i]->hash_tbl.get();
-    if (ht != nullptr) {
-      ht_mem += ht->CurrentMemSize();
-      ht_rows += ht->size();
-    }
+    ht_mem += ht->CurrentMemSize();
+    ht_rows += ht->size();
   }
 
   // Need some rows in tables to have valid statistics.
@@ -728,16 +722,9 @@
   RETURN_IF_ERROR(aggregated_row_stream->Init(parent->id(), true));
   bool got_buffer;
   RETURN_IF_ERROR(aggregated_row_stream->PrepareForWrite(&got_buffer));
-  if (!got_buffer) {
-    stringstream ss;
-    parent->DebugString(2, &ss);
-    DCHECK(parent->is_streaming_preagg_)
-        << "Merge agg should have enough reservation " << parent->id_ << "\n"
-        << parent->buffer_pool_client_.DebugString() << "\n"
-        << ss.str();
-    DiscardAggregatedRowStream();
-  }
-
+  DCHECK(got_buffer) << "Buffer included in reservation " << parent->id_ << "\n"
+                     << parent->buffer_pool_client_.DebugString() << "\n"
+                     << parent->DebugString(2);
   if (!parent->is_streaming_preagg_) {
     unaggregated_row_stream.reset(new BufferedTupleStream(parent->state_,
         parent->child(0)->row_desc(), &parent->buffer_pool_client_,
@@ -830,16 +817,6 @@
   return Status::OK();
 }
 
-void PartitionedAggregationNode::Partition::DiscardAggregatedRowStream() {
-  DCHECK(parent->is_streaming_preagg_);
-  DCHECK(aggregated_row_stream != nullptr);
-  DCHECK_EQ(aggregated_row_stream->num_rows(), 0);
-  if (hash_tbl != nullptr) hash_tbl->Close();
-  hash_tbl.reset();
-  aggregated_row_stream->Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
-  aggregated_row_stream.reset();
-}
-
 Status PartitionedAggregationNode::Partition::Spill(bool more_aggregate_rows) {
   DCHECK(!parent->is_streaming_preagg_);
   DCHECK(!is_closed);
@@ -1093,6 +1070,12 @@
   }
 }
 
+string PartitionedAggregationNode::DebugString(int indentation_level) const {
+  stringstream ss;
+  DebugString(indentation_level, &ss);
+  return ss.str();
+}
+
 void PartitionedAggregationNode::DebugString(
     int indentation_level, stringstream* out) const {
   *out << string(indentation_level * 2, ' ');
@@ -1142,12 +1125,9 @@
       RETURN_IF_ERROR(partition->InitHashTable(&got_memory));
       // Spill the partition if we cannot create a hash table for a merge aggregation.
       if (UNLIKELY(!got_memory)) {
-        if (is_streaming_preagg_) {
-          partition->DiscardAggregatedRowStream();
-        } else {
-          // If we're repartitioning, we will be writing aggregated rows first.
-          RETURN_IF_ERROR(partition->Spill(level > 0));
-        }
+        DCHECK(!is_streaming_preagg_) << "Preagg reserves enough memory for hash tables";
+        // If we're repartitioning, we will be writing aggregated rows first.
+        RETURN_IF_ERROR(partition->Spill(level > 0));
       }
     }
     hash_tbls_[i] = partition->hash_tbl.get();
diff --git a/be/src/exec/partitioned-aggregation-node.h b/be/src/exec/partitioned-aggregation-node.h
index ade223b..72354cc 100644
--- a/be/src/exec/partitioned-aggregation-node.h
+++ b/be/src/exec/partitioned-aggregation-node.h
@@ -146,6 +146,7 @@
  protected:
   /// Frees local allocations from aggregate_evals_ and agg_fn_evals
   virtual Status QueryMaintenance(RuntimeState* state);
+  virtual std::string DebugString(int indentation_level) const;
   virtual void DebugString(int indentation_level, std::stringstream* out) const;
 
  private:
@@ -394,11 +395,6 @@
     /// if 'more_aggregate_rows' is true or the unaggregated stream otherwise.
     Status Spill(bool more_aggregate_rows) WARN_UNUSED_RESULT;
 
-    /// Discards the aggregated row stream and hash table. Only valid to call if this is
-    /// a streaming preaggregation and the initial memory allocation for hash tables or
-    /// the aggregated stream failed. The aggregated stream must have 0 rows.
-    void DiscardAggregatedRowStream();
-
     bool is_spilled() const { return hash_tbl.get() == NULL; }
 
     PartitionedAggregationNode* parent;
@@ -732,7 +728,10 @@
   int64_t MinReservation() const {
     DCHECK(!grouping_exprs_.empty());
     // Must be kept in sync with AggregationNode.computeNodeResourceProfile() in fe.
-    if (is_streaming_preagg_) return 0; // Need 0 buffers to pass through rows.
+    if (is_streaming_preagg_) {
+      // Reserve at least one buffer and a 64kb hash table per partition.
+      return (resource_profile_.spillable_buffer_size + 64 * 1024) * PARTITION_FANOUT;
+    }
     int num_buffers = PARTITION_FANOUT + 1 + (needs_serialize_ ? 1 : 0);
     // Two of the buffers must fit the maximum row.
     return resource_profile_.spillable_buffer_size * (num_buffers - 2) +
diff --git a/fe/src/main/java/org/apache/impala/planner/AggregationNode.java b/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
index ce725e0..2778a7f 100644
--- a/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
@@ -304,25 +304,35 @@
     long bufferSize = queryOptions.getDefault_spillable_buffer_size();
     long maxRowBufferSize =
         computeMaxSpillableBufferSize(bufferSize, queryOptions.getMax_row_size());
-    if (aggInfo_.getGroupingExprs().isEmpty() || useStreamingPreagg_) {
+    if (aggInfo_.getGroupingExprs().isEmpty()) {
       perInstanceMinReservation = 0;
     } else {
+      // This is a grouping pre-aggregation or merge aggregation.
       final int PARTITION_FANOUT = 16;
-      long minBuffers = PARTITION_FANOUT + 1 + (aggInfo_.needsSerialize() ? 1 : 0);
       if (perInstanceDataBytes != -1) {
-        long bytesPerBuffer = perInstanceDataBytes / PARTITION_FANOUT;
+        long bytesPerPartition = perInstanceDataBytes / PARTITION_FANOUT;
         // Scale down the buffer size if we think there will be excess free space with the
         // default buffer size, e.g. with small dimension tables.
         bufferSize = Math.min(bufferSize, Math.max(
             queryOptions.getMin_spillable_buffer_size(),
-            BitUtil.roundUpToPowerOf2(bytesPerBuffer)));
+            BitUtil.roundUpToPowerOf2(bytesPerPartition)));
         // Recompute the max row buffer size with the smaller buffer.
         maxRowBufferSize =
             computeMaxSpillableBufferSize(bufferSize, queryOptions.getMax_row_size());
       }
-      // Two of the buffers need to be buffers large enough to hold the maximum-sized row
-      // to serve as input and output buffers while repartitioning.
-      perInstanceMinReservation = bufferSize * (minBuffers - 2) + maxRowBufferSize * 2;
+      if (useStreamingPreagg_) {
+        // We can execute a streaming preagg without any buffers by passing through rows,
+        // but that is a very low performance mode of execution if the aggregation reduces
+        // its input significantly. Instead reserve memory for one buffer and 64kb of hash
+        // tables per partition. We don't need to reserve memory for large rows since they
+        // can be passed through if needed.
+        perInstanceMinReservation = (bufferSize + 64 * 1024) * PARTITION_FANOUT;
+      } else {
+        long minBuffers = PARTITION_FANOUT + 1 + (aggInfo_.needsSerialize() ? 1 : 0);
+        // Two of the buffers need to be buffers large enough to hold the maximum-sized
+        // row to serve as input and output buffers while repartitioning.
+        perInstanceMinReservation = bufferSize * (minBuffers - 2) + maxRowBufferSize * 2;
+      }
     }
 
     nodeResourceProfile_ = new ResourceProfileBuilder()
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/max-row-size.test b/testdata/workloads/functional-planner/queries/PlannerTest/max-row-size.test
index 0732771..5a0a76c 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/max-row-size.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/max-row-size.test
@@ -150,7 +150,7 @@
 group by 1, 2
 having count(*) = 1
 ---- DISTRIBUTEDPLAN
-Max Per-Host Resource Reservation: Memory=77.00MB
+Max Per-Host Resource Reservation: Memory=110.00MB
 Per-Host Resource Estimates: Memory=251.12MB
 
 F04:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -176,11 +176,11 @@
 |  tuple-ids=2 row-size=33B cardinality=4690314
 |
 F02:PLAN FRAGMENT [HASH(l_orderkey)] hosts=3 instances=3
-Per-Host Resources: mem-estimate=85.12MB mem-reservation=31.00MB
+Per-Host Resources: mem-estimate=85.12MB mem-reservation=64.00MB
 03:AGGREGATE [STREAMING]
 |  output: count(*)
 |  group by: l_orderkey, o_orderstatus
-|  mem-estimate=54.12MB mem-reservation=0B spill-buffer=2.00MB
+|  mem-estimate=54.12MB mem-reservation=33.00MB spill-buffer=2.00MB
 |  tuple-ids=2 row-size=33B cardinality=4690314
 |
 02:HASH JOIN [INNER JOIN, PARTITIONED]
@@ -224,7 +224,7 @@
 select distinct *
 from tpch_parquet.lineitem
 ---- DISTRIBUTEDPLAN
-Max Per-Host Resource Reservation: Memory=46.00MB
+Max Per-Host Resource Reservation: Memory=79.00MB
 Per-Host Resource Estimates: Memory=3.31GB
 
 F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -248,10 +248,10 @@
 |  tuple-ids=1 row-size=263B cardinality=6001215
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
-Per-Host Resources: mem-estimate=1.69GB mem-reservation=0B
+Per-Host Resources: mem-estimate=1.69GB mem-reservation=33.00MB
 01:AGGREGATE [STREAMING]
 |  group by: tpch_parquet.lineitem.l_orderkey, tpch_parquet.lineitem.l_partkey, tpch_parquet.lineitem.l_suppkey, tpch_parquet.lineitem.l_linenumber, tpch_parquet.lineitem.l_quantity, tpch_parquet.lineitem.l_extendedprice, tpch_parquet.lineitem.l_discount, tpch_parquet.lineitem.l_tax, tpch_parquet.lineitem.l_returnflag, tpch_parquet.lineitem.l_linestatus, tpch_parquet.lineitem.l_shipdate, tpch_parquet.lineitem.l_commitdate, tpch_parquet.lineitem.l_receiptdate, tpch_parquet.lineitem.l_shipinstruct, tpch_parquet.lineitem.l_shipmode, tpch_parquet.lineitem.l_comment
-|  mem-estimate=1.62GB mem-reservation=0B spill-buffer=2.00MB
+|  mem-estimate=1.62GB mem-reservation=33.00MB spill-buffer=2.00MB
 |  tuple-ids=1 row-size=263B cardinality=6001215
 |
 00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
@@ -268,7 +268,7 @@
 from tpch_parquet.lineitem
 group by 1, 2
 ---- DISTRIBUTEDPLAN
-Max Per-Host Resource Reservation: Memory=48.00MB
+Max Per-Host Resource Reservation: Memory=81.00MB
 Per-Host Resource Estimates: Memory=482.91MB
 
 F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -293,11 +293,11 @@
 |  tuple-ids=1 row-size=32B cardinality=6001215
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
-Per-Host Resources: mem-estimate=281.46MB mem-reservation=0B
+Per-Host Resources: mem-estimate=281.46MB mem-reservation=33.00MB
 01:AGGREGATE [STREAMING]
 |  output: group_concat(l_linestatus, ',')
 |  group by: l_orderkey, l_partkey
-|  mem-estimate=201.46MB mem-reservation=0B spill-buffer=2.00MB
+|  mem-estimate=201.46MB mem-reservation=33.00MB spill-buffer=2.00MB
 |  tuple-ids=1 row-size=32B cardinality=6001215
 |
 00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test b/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test
index 98239a7..9f80785 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test
@@ -95,11 +95,11 @@
 |  tuple-ids=1 row-size=16B cardinality=unavailable
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=9
-Per-Host Resources: mem-estimate=432.00MB mem-reservation=0B
+Per-Host Resources: mem-estimate=432.00MB mem-reservation=99.00MB
 01:AGGREGATE [STREAMING]
 |  output: count(int_col)
 |  group by: bigint_col
-|  mem-estimate=128.00MB mem-reservation=0B spill-buffer=2.00MB
+|  mem-estimate=128.00MB mem-reservation=33.00MB spill-buffer=2.00MB
 |  tuple-ids=1 row-size=16B cardinality=unavailable
 |
 00:SCAN HDFS [functional_parquet.alltypes, RANDOM]
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
index b9afdf9..90035c1 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
@@ -375,8 +375,8 @@
    mem-estimate=80.00MB mem-reservation=0B
    tuple-ids=0 row-size=8B cardinality=6001215
 ---- DISTRIBUTEDPLAN
-Max Per-Host Resource Reservation: Memory=8.50MB
-Per-Host Resource Estimates: Memory=116.24MB
+Max Per-Host Resource Reservation: Memory=41.50MB
+Per-Host Resource Estimates: Memory=123.00MB
 
 F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=0B mem-reservation=0B
@@ -400,11 +400,11 @@
 |  tuple-ids=1 row-size=16B cardinality=1563438
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
-Per-Host Resources: mem-estimate=106.24MB mem-reservation=0B
+Per-Host Resources: mem-estimate=113.00MB mem-reservation=33.00MB
 01:AGGREGATE [STREAMING]
 |  output: count(*)
 |  group by: l_orderkey
-|  mem-estimate=26.24MB mem-reservation=0B spill-buffer=2.00MB
+|  mem-estimate=33.00MB mem-reservation=33.00MB spill-buffer=2.00MB
 |  tuple-ids=1 row-size=16B cardinality=1563438
 |
 00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
@@ -415,8 +415,8 @@
    mem-estimate=80.00MB mem-reservation=0B
    tuple-ids=0 row-size=8B cardinality=6001215
 ---- PARALLELPLANS
-Max Per-Host Resource Reservation: Memory=9.50MB
-Per-Host Resource Estimates: Memory=232.48MB
+Max Per-Host Resource Reservation: Memory=75.50MB
+Per-Host Resource Estimates: Memory=246.00MB
 
 F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=0B mem-reservation=0B
@@ -440,11 +440,11 @@
 |  tuple-ids=1 row-size=16B cardinality=1563438
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=6
-Per-Host Resources: mem-estimate=212.48MB mem-reservation=0B
+Per-Host Resources: mem-estimate=226.00MB mem-reservation=66.00MB
 01:AGGREGATE [STREAMING]
 |  output: count(*)
 |  group by: l_orderkey
-|  mem-estimate=26.24MB mem-reservation=0B spill-buffer=2.00MB
+|  mem-estimate=33.00MB mem-reservation=33.00MB spill-buffer=2.00MB
 |  tuple-ids=1 row-size=16B cardinality=1563438
 |
 00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
@@ -1548,7 +1548,7 @@
    mem-estimate=80.00MB mem-reservation=0B
    tuple-ids=0 row-size=78B cardinality=600122
 ---- DISTRIBUTEDPLAN
-Max Per-Host Resource Reservation: Memory=38.75MB
+Max Per-Host Resource Reservation: Memory=71.75MB
 Per-Host Resource Estimates: Memory=344.33MB
 
 F09:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -1643,10 +1643,10 @@
 |  tuple-ids=2 row-size=70B cardinality=575772
 |
 F02:PLAN FRAGMENT [HASH(l_orderkey)] hosts=3 instances=3
-Per-Host Resources: mem-estimate=47.33MB mem-reservation=4.75MB
+Per-Host Resources: mem-estimate=47.33MB mem-reservation=37.75MB
 04:AGGREGATE [STREAMING]
 |  group by: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_comment
-|  mem-estimate=42.58MB mem-reservation=0B spill-buffer=2.00MB
+|  mem-estimate=42.58MB mem-reservation=33.00MB spill-buffer=2.00MB
 |  tuple-ids=2 row-size=70B cardinality=575772
 |
 03:HASH JOIN [INNER JOIN, PARTITIONED]
@@ -1688,7 +1688,7 @@
    mem-estimate=80.00MB mem-reservation=0B
    tuple-ids=0 row-size=78B cardinality=600122
 ---- PARALLELPLANS
-Max Per-Host Resource Reservation: Memory=73.75MB
+Max Per-Host Resource Reservation: Memory=139.75MB
 Per-Host Resource Estimates: Memory=684.91MB
 
 F09:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -1799,10 +1799,10 @@
 |  tuple-ids=2 row-size=70B cardinality=575772
 |
 F02:PLAN FRAGMENT [HASH(l_orderkey)] hosts=3 instances=6
-Per-Host Resources: mem-estimate=90.91MB mem-reservation=5.75MB
+Per-Host Resources: mem-estimate=90.91MB mem-reservation=71.75MB
 04:AGGREGATE [STREAMING]
 |  group by: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_comment
-|  mem-estimate=42.58MB mem-reservation=0B spill-buffer=2.00MB
+|  mem-estimate=42.58MB mem-reservation=33.00MB spill-buffer=2.00MB
 |  tuple-ids=2 row-size=70B cardinality=575772
 |
 03:HASH JOIN [INNER JOIN, PARTITIONED]
@@ -1968,7 +1968,7 @@
    mem-estimate=88.00MB mem-reservation=0B
    tuple-ids=2 row-size=16B cardinality=6001215
 ---- DISTRIBUTEDPLAN
-Max Per-Host Resource Reservation: Memory=84.12MB
+Max Per-Host Resource Reservation: Memory=150.12MB
 Per-Host Resource Estimates: Memory=511.55MB
 
 F07:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -2000,11 +2000,11 @@
 |  tuple-ids=6 row-size=100B cardinality=575772
 |
 F02:PLAN FRAGMENT [HASH(l_orderkey)] hosts=3 instances=3
-Per-Host Resources: mem-estimate=115.78MB mem-reservation=50.12MB
+Per-Host Resources: mem-estimate=115.78MB mem-reservation=83.12MB
 08:AGGREGATE [STREAMING]
 |  output: sum(l_quantity)
 |  group by: c_name, c_custkey, o_orderkey, o_orderdate, o_totalprice
-|  mem-estimate=60.40MB mem-reservation=0B spill-buffer=2.00MB
+|  mem-estimate=60.40MB mem-reservation=33.00MB spill-buffer=2.00MB
 |  tuple-ids=6 row-size=100B cardinality=575772
 |
 07:HASH JOIN [LEFT SEMI JOIN, PARTITIONED]
@@ -2025,11 +2025,11 @@
 |  |  tuple-ids=4 row-size=24B cardinality=1563438
 |  |
 |  F04:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
-|  Per-Host Resources: mem-estimate=127.36MB mem-reservation=0B
+|  Per-Host Resources: mem-estimate=127.36MB mem-reservation=33.00MB
 |  04:AGGREGATE [STREAMING]
 |  |  output: sum(l_quantity)
 |  |  group by: l_orderkey
-|  |  mem-estimate=39.36MB mem-reservation=0B spill-buffer=2.00MB
+|  |  mem-estimate=39.36MB mem-reservation=33.00MB spill-buffer=2.00MB
 |  |  tuple-ids=4 row-size=24B cardinality=1563438
 |  |
 |  03:SCAN HDFS [tpch.lineitem, RANDOM]
@@ -2098,7 +2098,7 @@
    mem-estimate=88.00MB mem-reservation=0B
    tuple-ids=2 row-size=16B cardinality=6001215
 ---- PARALLELPLANS
-Max Per-Host Resource Reservation: Memory=122.88MB
+Max Per-Host Resource Reservation: Memory=254.88MB
 Per-Host Resource Estimates: Memory=967.22MB
 
 F07:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -2130,11 +2130,11 @@
 |  tuple-ids=6 row-size=100B cardinality=575772
 |
 F02:PLAN FRAGMENT [HASH(l_orderkey)] hosts=3 instances=6
-Per-Host Resources: mem-estimate=175.68MB mem-reservation=54.88MB
+Per-Host Resources: mem-estimate=175.68MB mem-reservation=120.88MB
 08:AGGREGATE [STREAMING]
 |  output: sum(l_quantity)
 |  group by: c_name, c_custkey, o_orderkey, o_orderdate, o_totalprice
-|  mem-estimate=60.40MB mem-reservation=0B spill-buffer=2.00MB
+|  mem-estimate=60.40MB mem-reservation=33.00MB spill-buffer=2.00MB
 |  tuple-ids=6 row-size=100B cardinality=575772
 |
 07:HASH JOIN [LEFT SEMI JOIN, PARTITIONED]
@@ -2163,11 +2163,11 @@
 |  |  tuple-ids=4 row-size=24B cardinality=1563438
 |  |
 |  F04:PLAN FRAGMENT [RANDOM] hosts=3 instances=6
-|  Per-Host Resources: mem-estimate=254.73MB mem-reservation=0B
+|  Per-Host Resources: mem-estimate=254.73MB mem-reservation=66.00MB
 |  04:AGGREGATE [STREAMING]
 |  |  output: sum(l_quantity)
 |  |  group by: l_orderkey
-|  |  mem-estimate=39.36MB mem-reservation=0B spill-buffer=2.00MB
+|  |  mem-estimate=39.36MB mem-reservation=33.00MB spill-buffer=2.00MB
 |  |  tuple-ids=4 row-size=24B cardinality=1563438
 |  |
 |  03:SCAN HDFS [tpch.lineitem, RANDOM]
@@ -2452,7 +2452,7 @@
    mem-estimate=88.00MB mem-reservation=0B
    tuple-ids=0 row-size=66B cardinality=150000
 ---- DISTRIBUTEDPLAN
-Max Per-Host Resource Reservation: Memory=69.94MB
+Max Per-Host Resource Reservation: Memory=102.94MB
 Per-Host Resource Estimates: Memory=473.94MB
 WARNING: The following tables are missing relevant table and/or column statistics.
 tpch_nested_parquet.customer
@@ -2478,10 +2478,10 @@
 |  tuple-ids=6 row-size=58B cardinality=1500000
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
-Per-Host Resources: mem-estimate=345.94MB mem-reservation=35.94MB
+Per-Host Resources: mem-estimate=345.94MB mem-reservation=68.94MB
 09:AGGREGATE [STREAMING]
 |  group by: c_name, o1.o_orderkey, o2.o_orderstatus
-|  mem-estimate=128.00MB mem-reservation=0B spill-buffer=2.00MB
+|  mem-estimate=128.00MB mem-reservation=33.00MB spill-buffer=2.00MB
 |  tuple-ids=6 row-size=58B cardinality=1500000
 |
 01:SUBPLAN
@@ -2531,7 +2531,7 @@
    mem-estimate=88.00MB mem-reservation=0B
    tuple-ids=0 row-size=66B cardinality=150000
 ---- PARALLELPLANS
-Max Per-Host Resource Reservation: Memory=139.88MB
+Max Per-Host Resource Reservation: Memory=205.88MB
 Per-Host Resource Estimates: Memory=947.88MB
 WARNING: The following tables are missing relevant table and/or column statistics.
 tpch_nested_parquet.customer
@@ -2557,10 +2557,10 @@
 |  tuple-ids=6 row-size=58B cardinality=1500000
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=6
-Per-Host Resources: mem-estimate=691.88MB mem-reservation=71.88MB
+Per-Host Resources: mem-estimate=691.88MB mem-reservation=137.88MB
 09:AGGREGATE [STREAMING]
 |  group by: c_name, o1.o_orderkey, o2.o_orderstatus
-|  mem-estimate=128.00MB mem-reservation=0B spill-buffer=2.00MB
+|  mem-estimate=128.00MB mem-reservation=33.00MB spill-buffer=2.00MB
 |  tuple-ids=6 row-size=58B cardinality=1500000
 |
 01:SUBPLAN
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/spillable-buffer-sizing.test b/testdata/workloads/functional-planner/queries/PlannerTest/spillable-buffer-sizing.test
index 340c3ec..115a8bd 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/spillable-buffer-sizing.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/spillable-buffer-sizing.test
@@ -516,7 +516,7 @@
 from tpch_parquet.customer
 group by c_nationkey
 ---- DISTRIBUTEDPLAN
-Max Per-Host Resource Reservation: Memory=1.94MB
+Max Per-Host Resource Reservation: Memory=3.94MB
 Per-Host Resource Estimates: Memory=44.00MB
 
 F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -541,11 +541,11 @@
 |  tuple-ids=1 row-size=10B cardinality=25
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
-Per-Host Resources: mem-estimate=34.00MB mem-reservation=0B
+Per-Host Resources: mem-estimate=34.00MB mem-reservation=2.00MB
 01:AGGREGATE [STREAMING]
 |  output: avg(c_acctbal)
 |  group by: c_nationkey
-|  mem-estimate=10.00MB mem-reservation=0B spill-buffer=2.00MB
+|  mem-estimate=10.00MB mem-reservation=2.00MB spill-buffer=64.00KB
 |  tuple-ids=1 row-size=10B cardinality=25
 |
 00:SCAN HDFS [tpch_parquet.customer, RANDOM]
@@ -556,7 +556,7 @@
    mem-estimate=24.00MB mem-reservation=0B
    tuple-ids=0 row-size=10B cardinality=150000
 ---- PARALLELPLANS
-Max Per-Host Resource Reservation: Memory=3.88MB
+Max Per-Host Resource Reservation: Memory=7.88MB
 Per-Host Resource Estimates: Memory=88.00MB
 
 F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -581,11 +581,11 @@
 |  tuple-ids=1 row-size=10B cardinality=25
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=1 instances=2
-Per-Host Resources: mem-estimate=68.00MB mem-reservation=0B
+Per-Host Resources: mem-estimate=68.00MB mem-reservation=4.00MB
 01:AGGREGATE [STREAMING]
 |  output: avg(c_acctbal)
 |  group by: c_nationkey
-|  mem-estimate=10.00MB mem-reservation=0B spill-buffer=2.00MB
+|  mem-estimate=10.00MB mem-reservation=2.00MB spill-buffer=64.00KB
 |  tuple-ids=1 row-size=10B cardinality=25
 |
 00:SCAN HDFS [tpch_parquet.customer, RANDOM]
@@ -603,7 +603,7 @@
 group by 1, 2
 having count(*) = 1
 ---- DISTRIBUTEDPLAN
-Max Per-Host Resource Reservation: Memory=51.00MB
+Max Per-Host Resource Reservation: Memory=84.00MB
 Per-Host Resource Estimates: Memory=225.12MB
 
 F04:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -629,11 +629,11 @@
 |  tuple-ids=2 row-size=33B cardinality=4690314
 |
 F02:PLAN FRAGMENT [HASH(l_orderkey)] hosts=3 instances=3
-Per-Host Resources: mem-estimate=71.12MB mem-reservation=17.00MB
+Per-Host Resources: mem-estimate=71.12MB mem-reservation=50.00MB
 03:AGGREGATE [STREAMING]
 |  output: count(*)
 |  group by: l_orderkey, o_orderstatus
-|  mem-estimate=54.12MB mem-reservation=0B spill-buffer=2.00MB
+|  mem-estimate=54.12MB mem-reservation=33.00MB spill-buffer=2.00MB
 |  tuple-ids=2 row-size=33B cardinality=4690314
 |
 02:HASH JOIN [INNER JOIN, PARTITIONED]
@@ -672,8 +672,8 @@
    mem-estimate=80.00MB mem-reservation=0B
    tuple-ids=0 row-size=8B cardinality=6001215
 ---- PARALLELPLANS
-Max Per-Host Resource Reservation: Memory=51.00MB
-Per-Host Resource Estimates: Memory=345.12MB
+Max Per-Host Resource Reservation: Memory=117.00MB
+Per-Host Resource Estimates: Memory=357.00MB
 
 F04:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=0B mem-reservation=0B
@@ -698,11 +698,11 @@
 |  tuple-ids=2 row-size=33B cardinality=4690314
 |
 F02:PLAN FRAGMENT [HASH(l_orderkey)] hosts=3 instances=6
-Per-Host Resources: mem-estimate=71.12MB mem-reservation=17.00MB
+Per-Host Resources: mem-estimate=83.00MB mem-reservation=83.00MB
 03:AGGREGATE [STREAMING]
 |  output: count(*)
 |  group by: l_orderkey, o_orderstatus
-|  mem-estimate=27.06MB mem-reservation=0B spill-buffer=2.00MB
+|  mem-estimate=33.00MB mem-reservation=33.00MB spill-buffer=2.00MB
 |  tuple-ids=2 row-size=33B cardinality=4690314
 |
 02:HASH JOIN [INNER JOIN, PARTITIONED]
@@ -753,7 +753,7 @@
 select distinct *
 from tpch_parquet.lineitem
 ---- DISTRIBUTEDPLAN
-Max Per-Host Resource Reservation: Memory=34.00MB
+Max Per-Host Resource Reservation: Memory=67.00MB
 Per-Host Resource Estimates: Memory=3.31GB
 
 F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -777,10 +777,10 @@
 |  tuple-ids=1 row-size=263B cardinality=6001215
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
-Per-Host Resources: mem-estimate=1.69GB mem-reservation=0B
+Per-Host Resources: mem-estimate=1.69GB mem-reservation=33.00MB
 01:AGGREGATE [STREAMING]
 |  group by: tpch_parquet.lineitem.l_orderkey, tpch_parquet.lineitem.l_partkey, tpch_parquet.lineitem.l_suppkey, tpch_parquet.lineitem.l_linenumber, tpch_parquet.lineitem.l_quantity, tpch_parquet.lineitem.l_extendedprice, tpch_parquet.lineitem.l_discount, tpch_parquet.lineitem.l_tax, tpch_parquet.lineitem.l_returnflag, tpch_parquet.lineitem.l_linestatus, tpch_parquet.lineitem.l_shipdate, tpch_parquet.lineitem.l_commitdate, tpch_parquet.lineitem.l_receiptdate, tpch_parquet.lineitem.l_shipinstruct, tpch_parquet.lineitem.l_shipmode, tpch_parquet.lineitem.l_comment
-|  mem-estimate=1.62GB mem-reservation=0B spill-buffer=2.00MB
+|  mem-estimate=1.62GB mem-reservation=33.00MB spill-buffer=2.00MB
 |  tuple-ids=1 row-size=263B cardinality=6001215
 |
 00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
@@ -791,7 +791,7 @@
    mem-estimate=80.00MB mem-reservation=0B
    tuple-ids=0 row-size=263B cardinality=6001215
 ---- PARALLELPLANS
-Max Per-Host Resource Reservation: Memory=68.00MB
+Max Per-Host Resource Reservation: Memory=134.00MB
 Per-Host Resource Estimates: Memory=6.62GB
 
 F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -815,10 +815,10 @@
 |  tuple-ids=1 row-size=263B cardinality=6001215
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=6
-Per-Host Resources: mem-estimate=3.39GB mem-reservation=0B
+Per-Host Resources: mem-estimate=3.39GB mem-reservation=66.00MB
 01:AGGREGATE [STREAMING]
 |  group by: tpch_parquet.lineitem.l_orderkey, tpch_parquet.lineitem.l_partkey, tpch_parquet.lineitem.l_suppkey, tpch_parquet.lineitem.l_linenumber, tpch_parquet.lineitem.l_quantity, tpch_parquet.lineitem.l_extendedprice, tpch_parquet.lineitem.l_discount, tpch_parquet.lineitem.l_tax, tpch_parquet.lineitem.l_returnflag, tpch_parquet.lineitem.l_linestatus, tpch_parquet.lineitem.l_shipdate, tpch_parquet.lineitem.l_commitdate, tpch_parquet.lineitem.l_receiptdate, tpch_parquet.lineitem.l_shipinstruct, tpch_parquet.lineitem.l_shipmode, tpch_parquet.lineitem.l_comment
-|  mem-estimate=1.62GB mem-reservation=0B spill-buffer=2.00MB
+|  mem-estimate=1.62GB mem-reservation=33.00MB spill-buffer=2.00MB
 |  tuple-ids=1 row-size=263B cardinality=6001215
 |
 00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
@@ -834,7 +834,7 @@
 from functional_parquet.alltypestiny
 group by string_col
 ---- DISTRIBUTEDPLAN
-Max Per-Host Resource Reservation: Memory=34.00MB
+Max Per-Host Resource Reservation: Memory=67.00MB
 Per-Host Resource Estimates: Memory=272.00MB
 WARNING: The following tables are missing relevant table and/or column statistics.
 functional_parquet.alltypestiny
@@ -861,11 +861,11 @@
 |  tuple-ids=1 row-size=24B cardinality=unavailable
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
-Per-Host Resources: mem-estimate=144.00MB mem-reservation=0B
+Per-Host Resources: mem-estimate=144.00MB mem-reservation=33.00MB
 01:AGGREGATE [STREAMING]
 |  output: count(*)
 |  group by: string_col
-|  mem-estimate=128.00MB mem-reservation=0B spill-buffer=2.00MB
+|  mem-estimate=128.00MB mem-reservation=33.00MB spill-buffer=2.00MB
 |  tuple-ids=1 row-size=24B cardinality=unavailable
 |
 00:SCAN HDFS [functional_parquet.alltypestiny, RANDOM]
@@ -876,7 +876,7 @@
    mem-estimate=16.00MB mem-reservation=0B
    tuple-ids=0 row-size=16B cardinality=unavailable
 ---- PARALLELPLANS
-Max Per-Host Resource Reservation: Memory=68.00MB
+Max Per-Host Resource Reservation: Memory=134.00MB
 Per-Host Resource Estimates: Memory=544.00MB
 WARNING: The following tables are missing relevant table and/or column statistics.
 functional_parquet.alltypestiny
@@ -903,11 +903,11 @@
 |  tuple-ids=1 row-size=24B cardinality=unavailable
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=6
-Per-Host Resources: mem-estimate=288.00MB mem-reservation=0B
+Per-Host Resources: mem-estimate=288.00MB mem-reservation=66.00MB
 01:AGGREGATE [STREAMING]
 |  output: count(*)
 |  group by: string_col
-|  mem-estimate=128.00MB mem-reservation=0B spill-buffer=2.00MB
+|  mem-estimate=128.00MB mem-reservation=33.00MB spill-buffer=2.00MB
 |  tuple-ids=1 row-size=24B cardinality=unavailable
 |
 00:SCAN HDFS [functional_parquet.alltypestiny, RANDOM]
diff --git a/testdata/workloads/functional-query/queries/QueryTest/spilling-aggs.test b/testdata/workloads/functional-query/queries/QueryTest/spilling-aggs.test
index b7345f2..05552d2 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/spilling-aggs.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/spilling-aggs.test
@@ -168,9 +168,10 @@
 ====
 ---- QUERY
 # Test aggregation with minimum required reservation to exercise IMPALA-2708.
-# Merge aggregation requires 17 buffers. The buffer size is 256k for this test.
+# Merge aggregation requires 17 buffers and preaggregation requires 16 buffers
+# plus 1mb of hash tables. The buffer size is 256k for this test.
 set max_row_size=256k;
-set buffer_pool_limit=4352k;
+set buffer_pool_limit=9472k;
 select count(*)
 from (select distinct * from orders) t
 ---- TYPES