IMPALA-9382: part 1: transposed profile prototype

This adds an experimental profile representation
that is denser than the traditional representation.
Counters, info strings and other information for all
instances of a fragment are merged into a single
tree. Descriptive stats (min, max, mean) are shown for
each counter, along with the values for each instance. It
can be enabled by setting --gen_experimental_profile=true.
The default behaviour is unchanged, aside from including
a few extra counters in existing profiles.

An example of the pretty-printed profile is attached
to the JIRA.

The thrift representation of the profile is extended
so that all instances of a fragment can be merged
together into a single "aggregated" fragment, with
vectors of counters.

The in-memory representation is transformed in
a similar way. The RuntimeProfile class is
restructured so that there is a common RuntimeProfileBase
class, with RuntimeProfile and AggregatedRuntimeProfile
subclasses. Execution fills in counters in RuntimeProfile
for each instances, then these are aggregated together into
an AggregatedRuntimeProfile on the coordinator. This replaces
the "averaged" profile concept with an abstraction that
more clearly distinguishes what operations apply to aggregated
and unaggregated profiles.

In a future change, we could use AggregatedRuntimeProfile
for status reports so that less data needs to be sent to
the coordinator, and the coordinator needs to do less
processing.

The new profile removes the bad practice of including aggregated
stats as strings from the new profile. These stats can now be
automatically as aggregations of counters. The legacy uses of
InfoString are preserved so as to not lose information but
can be removed when we switch to the transposed profile.

Also make TotalTime and InactiveTime behave like other counters -
they are pretty-printed the same as other counters. Inactive time
is also now subtracted from local time in the averaged profile,
which fixes IMPALA-2794.

TODO in later patches for IMPALA-9382:
These will need to be fixed before this can be considered production
ready.
* The JSON profile generation is not fully implemented for aggregated
  profiles.
* Not all counter times are included in aggregated profile, e.g. time
  series counters.
* The pretty-printing of the various profile counters will need to be
  improved to be more readable, e.g. grouping by host, improving
  formatting.
* The aggregated profile is only updated at the end of the query.
  We need to support live updating.
* Consider how to show local time per instance - make it a first-class
  counter in the profile?

Possible extensions:
* We could better highlight outliers when pretty-printing the profile.

Testing:
* I diffed the text profile of TPC-DS Q1 to make sure there were no
  unexpected changes.
* Added unit test for stats computation in AveragedCounter.
* Passed core tests.
* exhaustive tests
* ASAN tests
* Ran some tests locally with TSAN

Change-Id: I0838c6a0872f57c696267ff4e92d29c08748eb7a
Reviewed-on: http://gerrit.cloudera.org:8080/15798
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/be/src/runtime/bufferpool/buffer-pool-test.cc b/be/src/runtime/bufferpool/buffer-pool-test.cc
index 611963c..4a9e7e8 100644
--- a/be/src/runtime/bufferpool/buffer-pool-test.cc
+++ b/be/src/runtime/bufferpool/buffer-pool-test.cc
@@ -1392,10 +1392,10 @@
       nullptr, total_mem, profile, &client));
   ASSERT_TRUE(client.IncreaseReservation(total_mem));
 
-  RuntimeProfile* buffer_pool_profile = nullptr;
-  vector<RuntimeProfile*> profile_children;
+  RuntimeProfileBase* buffer_pool_profile = nullptr;
+  vector<RuntimeProfileBase*> profile_children;
   profile->GetChildren(&profile_children);
-  for (RuntimeProfile* child : profile_children) {
+  for (RuntimeProfileBase* child : profile_children) {
     if (child->name() == "Buffer pool") {
       buffer_pool_profile = child;
       break;
diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc
index 5250927..fdf21d4 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -56,10 +56,18 @@
 using namespace rapidjson;
 namespace accumulators = boost::accumulators;
 
+DECLARE_bool(gen_experimental_profile);
 DECLARE_int32(backend_client_rpc_timeout_ms);
 DECLARE_int64(rpc_max_message_size);
 
 namespace impala {
+PROFILE_DEFINE_COUNTER(BytesAssigned, STABLE_HIGH, TUnit::BYTES,
+    "Total number of bytes of filesystem scan ranges assigned to this fragment "
+    "instance.");
+PROFILE_DEFINE_TIMER(
+    CompletionTime, STABLE_HIGH, "Completion time of this fragment instance");
+PROFILE_DEFINE_COUNTER(ExecutionRate, STABLE_LOW, TUnit::BYTES_PER_SECOND,
+    "Rate at which the fragment instance processed its input scan ranges.");
 
 const char* Coordinator::BackendState::InstanceStats::LAST_REPORT_TIME_DESC =
     "Last report received time";
@@ -521,12 +529,20 @@
     DCHECK_LT(fragment_idx, fragment_stats.size());
     FragmentStats* f = fragment_stats[fragment_idx];
     int64_t completion_time = instance_stats.stopwatch_.ElapsedTime();
-    f->completion_times_(completion_time);
+    RuntimeProfile::Counter* completion_timer =
+        PROFILE_CompletionTime.Instantiate(instance_stats.profile_);
+    completion_timer->Set(completion_time);
+    if (!FLAGS_gen_experimental_profile) f->completion_times_(completion_time);
     if (completion_time > 0) {
-      f->rates_(instance_stats.total_split_size_
-        / (completion_time / 1000.0 / 1000.0 / 1000.0));
+      RuntimeProfile::Counter* execution_rate_counter =
+          PROFILE_ExecutionRate.Instantiate(instance_stats.profile_);
+      double rate =
+          instance_stats.total_split_size_ / (completion_time / 1000.0 / 1000.0 / 1000.0);
+      execution_rate_counter->Set(static_cast<int64_t>(rate));
+      if (!FLAGS_gen_experimental_profile) f->rates_(rate);
     }
-    f->avg_profile_->UpdateAverage(instance_stats.profile_);
+    f->agg_profile_->Update(
+        instance_stats.profile_, instance_stats.per_fragment_instance_idx());
   }
 }
 
@@ -700,14 +716,19 @@
   profile_->AddInfoString(LAST_REPORT_TIME_DESC, ToStringFromUnixMillis(UnixMillis()));
   fragment_stats->root_profile()->AddChild(profile_);
 
-  // add total split size to fragment_stats->bytes_assigned()
+  // Compute total split size and add to profile as "BytesAssigned".
   for (const auto& entry : exec_params_.per_node_scan_ranges()) {
     for (const ScanRangeParamsPB& scan_range_params : entry.second.scan_ranges()) {
       if (!scan_range_params.scan_range().has_hdfs_file_split()) continue;
       total_split_size_ += scan_range_params.scan_range().hdfs_file_split().length();
     }
   }
-  (*fragment_stats->bytes_assigned())(total_split_size_);
+  RuntimeProfile::Counter* bytes_assigned_counter =
+      PROFILE_BytesAssigned.Instantiate(profile_);
+  bytes_assigned_counter->Set(total_split_size_);
+  if (!FLAGS_gen_experimental_profile) {
+    (*fragment_stats->bytes_assigned())(total_split_size_);
+  }
 }
 
 void Coordinator::BackendState::InstanceStats::Update(
@@ -783,14 +804,16 @@
       document->GetAllocator());
 }
 
-Coordinator::FragmentStats::FragmentStats(const string& avg_profile_name,
+Coordinator::FragmentStats::FragmentStats(const string& agg_profile_name,
     const string& root_profile_name, int num_instances, ObjectPool* obj_pool)
-  : avg_profile_(RuntimeProfile::Create(obj_pool, avg_profile_name, true)),
+  : agg_profile_(
+        AggregatedRuntimeProfile::Create(obj_pool, agg_profile_name, num_instances)),
     root_profile_(RuntimeProfile::Create(obj_pool, root_profile_name)),
-    num_instances_(num_instances) {
-}
+    num_instances_(num_instances) {}
 
 void Coordinator::FragmentStats::AddSplitStats() {
+  // These strings are not included in the transposed profile because we have counters.
+  if (FLAGS_gen_experimental_profile) return;
   double min = accumulators::min(bytes_assigned_);
   double max = accumulators::max(bytes_assigned_);
   double mean = accumulators::mean(bytes_assigned_);
@@ -800,11 +823,13 @@
     << ", max: " << PrettyPrinter::Print(max, TUnit::BYTES)
     << ", avg: " << PrettyPrinter::Print(mean, TUnit::BYTES)
     << ", stddev: " << PrettyPrinter::Print(stddev, TUnit::BYTES);
-  avg_profile_->AddInfoString("split sizes", ss.str());
+  agg_profile_->AddInfoString("split sizes", ss.str());
 }
 
 void Coordinator::FragmentStats::AddExecStats() {
   root_profile_->SortChildrenByTotalTime();
+  // These strings are not included in the transposed profile because we have counters.
+  if (FLAGS_gen_experimental_profile) return;
   stringstream times_label;
   times_label
     << "min:" << PrettyPrinter::Print(
@@ -827,11 +852,9 @@
     << "  stddev:" << PrettyPrinter::Print(
         sqrt(accumulators::variance(rates_)), TUnit::BYTES_PER_SECOND);
 
-  // why plural?
-  avg_profile_->AddInfoString("completion times", times_label.str());
-  // why plural?
-  avg_profile_->AddInfoString("execution rates", rates_label.str());
-  avg_profile_->AddInfoString("num instances", lexical_cast<string>(num_instances_));
+  agg_profile_->AddInfoString("completion times", times_label.str());
+  agg_profile_->AddInfoString("execution rates", rates_label.str());
+  agg_profile_->AddInfoString("num instances", lexical_cast<string>(num_instances_));
 }
 
 void Coordinator::BackendState::ToJson(Value* value, Document* document) {
diff --git a/be/src/runtime/coordinator-backend-state.h b/be/src/runtime/coordinator-backend-state.h
index 9de5715..8818613 100644
--- a/be/src/runtime/coordinator-backend-state.h
+++ b/be/src/runtime/coordinator-backend-state.h
@@ -151,7 +151,7 @@
   /// Merges the incoming 'thrift_profile' into this backend state's host profile.
   void UpdateHostProfile(const TRuntimeProfileTree& thrift_profile);
 
-  /// Update completion_times, rates, and avg_profile for all fragment_stats.
+  /// Update completion_times, rates, and agg_profile for all fragment_stats.
   void UpdateExecStats(const std::vector<FragmentStats*>& fragment_stats);
 
   /// Make a PublishFilter rpc with given params to this backend. The backend
@@ -430,16 +430,15 @@
       boost::accumulators::tag::variance>
   > SummaryStats;
 
-  /// Create avg and root profiles in obj_pool.
-  FragmentStats(const std::string& avg_profile_name,
-      const std::string& root_profile_name,
+  /// Create aggregated and root profiles in obj_pool.
+  FragmentStats(const std::string& agg_profile_name, const std::string& root_profile_name,
       int num_instances, ObjectPool* obj_pool);
 
-  RuntimeProfile* avg_profile() { return avg_profile_; }
+  AggregatedRuntimeProfile* agg_profile() { return agg_profile_; }
   RuntimeProfile* root_profile() { return root_profile_; }
   SummaryStats* bytes_assigned() { return &bytes_assigned_; }
 
-  /// Compute stats for 'bytes_assigned' and add as info string to avg_profile.
+  /// Compute stats for 'bytes_assigned' and add as info string to agg_profile.
   void AddSplitStats();
 
   /// Add summary string with execution stats to avg profile.
@@ -453,7 +452,7 @@
   /// counters in the fragment instance profiles.
   /// Note that the individual fragment instance profiles themselves are stored and
   /// displayed as children of the root_profile below.
-  RuntimeProfile* avg_profile_;
+  AggregatedRuntimeProfile* agg_profile_;
 
   /// root profile for all fragment instances for this fragment; resides in obj_pool
   RuntimeProfile* root_profile_;
@@ -462,12 +461,15 @@
   int num_instances_;
 
   /// Bytes assigned for instances of this fragment
+  /// TODO: IMPALA-9382: can remove when we switch to the transposed profile.
   SummaryStats bytes_assigned_;
 
   /// Completion times for instances of this fragment
+  /// TODO: IMPALA-9382: can remove when we switch to the transposed profile.
   SummaryStats completion_times_;
 
   /// Execution rates for instances of this fragment
+  /// TODO: IMPALA-9382: can remove when we switch to the transposed profile.
   SummaryStats rates_;
 };
 
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 529ce2d..7bf5ec7 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -75,6 +75,7 @@
 using boost::algorithm::split;
 using boost::filesystem::path;
 
+DECLARE_bool(gen_experimental_profile);
 DECLARE_string(hostname);
 
 using namespace impala;
@@ -209,20 +210,25 @@
   DCHECK_GT(exec_params_.num_fragments(), 0);
   for (const TPlanFragment* fragment : exec_params_.GetFragments()) {
     string root_profile_name =
-        Substitute(fragment == coord_fragment ? "Coordinator Fragment $0" : "Fragment $0",
-            fragment->display_name);
-    string avg_profile_name = Substitute("Averaged Fragment $0", fragment->display_name);
+        Substitute(
+          fragment == coord_fragment ? "Coordinator Fragment $0" : "Fragment $0",
+          fragment->display_name);
+    const string& agg_profile_name = FLAGS_gen_experimental_profile ?
+        root_profile_name :
+        Substitute("Averaged Fragment $0", fragment->display_name);
     int num_instances = exec_params_.query_schedule()
                             .fragment_exec_params(fragment->idx)
                             .instances_size();
     total_num_finstances += num_instances;
     // TODO: special-case the coordinator fragment?
-    FragmentStats* fragment_stats = obj_pool()->Add(
-        new FragmentStats(
-          avg_profile_name, root_profile_name, num_instances, obj_pool()));
+    FragmentStats* fragment_stats = obj_pool()->Add(new FragmentStats(
+        agg_profile_name, root_profile_name, num_instances, obj_pool()));
     fragment_stats_.push_back(fragment_stats);
-    query_profile_->AddChild(fragment_stats->avg_profile(), true);
-    query_profile_->AddChild(fragment_stats->root_profile());
+    query_profile_->AddChild(fragment_stats->agg_profile(), true);
+    if (!FLAGS_gen_experimental_profile) {
+      // Per-instance profiles are not included in the profile tree in profile V2.
+      query_profile_->AddChild(fragment_stats->root_profile());
+    }
   }
   COUNTER_SET(PROFILE_NumFragments.Instantiate(query_profile_),
       static_cast<int64_t>(exec_params_.num_fragments()));
diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc
index 6602733..6c566a6 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -291,12 +291,12 @@
 
   // Compute local_time for use below.
   profile()->ComputeTimeInProfile();
-  vector<RuntimeProfile*> nodes;
+  vector<RuntimeProfileBase*> nodes;
   profile()->GetAllChildren(&nodes);
   int64_t bytes_read = 0;
   int64_t scan_ranges_complete = 0;
   int64_t total_bytes_sent = 0;
-  for (RuntimeProfile* node : nodes) {
+  for (RuntimeProfileBase* node : nodes) {
     RuntimeProfile::Counter* c = node->GetCounter(PROFILE_BytesRead.name());
     if (c != nullptr) bytes_read += c->value();
     c = node->GetCounter(PROFILE_ScanRangesComplete.name());
diff --git a/be/src/util/dummy-runtime-profile.h b/be/src/util/dummy-runtime-profile.h
index 1642d4e..41caffa 100644
--- a/be/src/util/dummy-runtime-profile.h
+++ b/be/src/util/dummy-runtime-profile.h
@@ -28,7 +28,7 @@
 /// but not always so that the object can still allocate counters in the same way.
 class DummyProfile {
  public:
-  DummyProfile() : pool_(), profile_(RuntimeProfile::Create(&pool_, "dummy", false)) {}
+  DummyProfile() : pool_(), profile_(RuntimeProfile::Create(&pool_, "dummy")) {}
   RuntimeProfile* profile() { return profile_; }
 
  private:
diff --git a/be/src/util/pretty-printer-test.cc b/be/src/util/pretty-printer-test.cc
index 9a3a538..88aead0 100644
--- a/be/src/util/pretty-printer-test.cc
+++ b/be/src/util/pretty-printer-test.cc
@@ -185,7 +185,7 @@
 }
 
 TEST(PrettyPrinterTest, DoubleValue) {
-  EXPECT_EQ(PrettyPrinter::Print(1.0, TUnit::DOUBLE_VALUE), "1.00 ");
+  EXPECT_EQ(PrettyPrinter::Print(1.0, TUnit::DOUBLE_VALUE), "1.00");
 }
 
 TEST(PrettyPrinterTest, StringList) {
diff --git a/be/src/util/pretty-printer.h b/be/src/util/pretty-printer.h
index a211803..166bdfa 100644
--- a/be/src/util/pretty-printer.h
+++ b/be/src/util/pretty-printer.h
@@ -133,7 +133,7 @@
 
       case TUnit::DOUBLE_VALUE: {
         double output = *reinterpret_cast<double*>(&value);
-        ss << std::setprecision(PRECISION) << output << " ";
+        ss << std::setprecision(PRECISION) << output;
         break;
       }
 
diff --git a/be/src/util/runtime-profile-counters.h b/be/src/util/runtime-profile-counters.h
index 21a586a..472ef16 100644
--- a/be/src/util/runtime-profile-counters.h
+++ b/be/src/util/runtime-profile-counters.h
@@ -24,6 +24,8 @@
 #include <sys/resource.h>
 #include <sys/time.h>
 
+#include <gtest/gtest_prod.h> // for FRIEND_TEST
+
 #include "common/atomic.h"
 #include "common/logging.h"
 #include "gutil/singleton.h"
@@ -219,14 +221,14 @@
 /// ProfileEntryPrototype and pass its objects into the profile->Add.*Counter() methods.
 class CounterPrototype : public ProfileEntryPrototype {
  public:
-  CounterPrototype(const char* name, Significance significance, const char* desc,
-      TUnit::type unit): ProfileEntryPrototype(name, significance, desc, unit) {}
+  CounterPrototype(
+      const char* name, Significance significance, const char* desc, TUnit::type unit)
+    : ProfileEntryPrototype(name, significance, desc, unit) {}
 
-  RuntimeProfile::Counter* Instantiate(RuntimeProfile* profile,
-      const std::string& parent_counter_name = "") {
+  RuntimeProfileBase::Counter* Instantiate(
+      RuntimeProfile* profile, const std::string& parent_counter_name = "") {
     return profile->AddCounter(name(), unit(), parent_counter_name);
   }
-
 };
 
 class DerivedCounterPrototype : public ProfileEntryPrototype {
@@ -243,16 +245,16 @@
 
 class SamplingCounterPrototype : public ProfileEntryPrototype {
  public:
-  SamplingCounterPrototype(const char* name, Significance significance, const char* desc):
-      ProfileEntryPrototype(name, significance, desc, TUnit::DOUBLE_VALUE) {}
+  SamplingCounterPrototype(const char* name, Significance significance, const char* desc)
+    : ProfileEntryPrototype(name, significance, desc, TUnit::DOUBLE_VALUE) {}
 
-  RuntimeProfile::Counter* Instantiate(RuntimeProfile* profile,
-      RuntimeProfile::Counter* src_counter) {
+  RuntimeProfileBase::Counter* Instantiate(
+      RuntimeProfile* profile, RuntimeProfileBase::Counter* src_counter) {
     return profile->AddSamplingCounter(name(), src_counter);
   }
 
-  RuntimeProfile::Counter* Instantiate(RuntimeProfile* profile,
-      boost::function<int64_t ()> sample_fn) {
+  RuntimeProfileBase::Counter* Instantiate(
+      RuntimeProfile* profile, boost::function<int64_t()> sample_fn) {
     return profile->AddSamplingCounter(name(), sample_fn);
   }
 };
@@ -271,37 +273,39 @@
 
 class TimeSeriesCounterPrototype : public ProfileEntryPrototype {
  public:
-  TimeSeriesCounterPrototype(const char* name, Significance significance,
-      const char* desc, TUnit::type unit):
-      ProfileEntryPrototype(name, significance, desc, unit) {}
+  TimeSeriesCounterPrototype(
+      const char* name, Significance significance, const char* desc, TUnit::type unit)
+    : ProfileEntryPrototype(name, significance, desc, unit) {}
 
-  RuntimeProfile::TimeSeriesCounter* operator()(RuntimeProfile* profile,
-      RuntimeProfile::Counter* src_counter) {
+  RuntimeProfile::TimeSeriesCounter* operator()(
+      RuntimeProfile* profile, RuntimeProfileBase::Counter* src_counter) {
     DCHECK(src_counter->unit() == unit());
     return profile->AddSamplingTimeSeriesCounter(name(), src_counter);
   }
 
-  RuntimeProfile::TimeSeriesCounter* Instantiate(RuntimeProfile* profile,
-      RuntimeProfile::Counter* src_counter) {
+  RuntimeProfile::TimeSeriesCounter* Instantiate(
+      RuntimeProfile* profile, RuntimeProfileBase::Counter* src_counter) {
     return (*this)(profile, src_counter);
   }
 };
 
 class RateCounterPrototype : public ProfileEntryPrototype {
  public:
-  RateCounterPrototype(const char* name, Significance significance,
-      const char* desc, TUnit::type unit):
-      ProfileEntryPrototype(name, significance, desc, unit) {}
+  RateCounterPrototype(
+      const char* name, Significance significance, const char* desc, TUnit::type unit)
+    : ProfileEntryPrototype(name, significance, desc, unit) {}
 
-  RuntimeProfile::Counter* operator()(
-      RuntimeProfile* profile, RuntimeProfile::Counter* src_counter) {
-    RuntimeProfile::Counter* new_counter = profile->AddRateCounter(name(), src_counter);
+  RuntimeProfileBase::Counter* operator()(
+      RuntimeProfile* profile, RuntimeProfileBase::Counter* src_counter) {
+    RuntimeProfileBase::Counter* new_counter =
+        profile->AddRateCounter(name(), src_counter);
     DCHECK_EQ(unit(), new_counter->unit());
     return new_counter;
   }
-  RuntimeProfile::Counter* Instantiate(
-      RuntimeProfile* profile, RuntimeProfile::Counter* src_counter) {
-    RuntimeProfile::Counter* new_counter = profile->AddRateCounter(name(), src_counter);
+  RuntimeProfileBase::Counter* Instantiate(
+      RuntimeProfile* profile, RuntimeProfileBase::Counter* src_counter) {
+    RuntimeProfileBase::Counter* new_counter =
+        profile->AddRateCounter(name(), src_counter);
     DCHECK_EQ(unit(), new_counter->unit());
     return new_counter;
   }
@@ -319,10 +323,9 @@
   }
 };
 
-
 /// A counter that keeps track of the highest value seen (reporting that
 /// as value()) and the current value.
-class RuntimeProfile::HighWaterMarkCounter : public RuntimeProfile::Counter {
+class RuntimeProfile::HighWaterMarkCounter : public RuntimeProfileBase::Counter {
  public:
   HighWaterMarkCounter(TUnit::type unit) : Counter(unit) {}
 
@@ -371,11 +374,10 @@
 
 /// A DerivedCounter also has a name and unit, but the value is computed.
 /// Do not call Set() and Add().
-class RuntimeProfile::DerivedCounter : public RuntimeProfile::Counter {
+class RuntimeProfile::DerivedCounter : public RuntimeProfileBase::Counter {
  public:
   DerivedCounter(TUnit::type unit, const SampleFunction& counter_fn)
-    : Counter(unit),
-      counter_fn_(counter_fn) {}
+    : Counter(unit), counter_fn_(counter_fn) {}
 
   int64_t value() const override {
     return counter_fn_();
@@ -389,41 +391,23 @@
 /// average of the values in that set. The average is updated through calls
 /// to UpdateCounter(), which may add a new counter or update an existing counter.
 /// Set() and Add() should not be called.
-class RuntimeProfile::AveragedCounter : public RuntimeProfile::Counter {
+/// TODO: IMPALA-9382: rename counter. CounterVector? AggregatedCounter?
+/// TODO: IMPALA-9382: consider adding more descriptive stats, e.g. median.
+class RuntimeProfileBase::AveragedCounter : public RuntimeProfileBase::Counter {
  public:
-  AveragedCounter(TUnit::type unit)
-   : Counter(unit),
-     current_double_sum_(0.0),
-     current_int_sum_(0) {
-  }
+  /// Construct an empty counter with no values added.
+  AveragedCounter(TUnit::type unit, int num_samples);
 
-  /// Update counter_value_map_ with the new counter. This may require the counter
-  /// to be added to the map.
-  /// No locks are obtained within this class because UpdateCounter() is called from
-  /// UpdateAverage(), which obtains locks on the entire counter map in a profile.
-  void UpdateCounter(Counter* new_counter) {
-    DCHECK_EQ(new_counter->unit_, unit_);
-    boost::unordered_map<Counter*, int64_t>::iterator it =
-        counter_value_map_.find(new_counter);
-    int64_t old_val = 0;
-    if (it != counter_value_map_.end()) {
-      old_val = it->second;
-      it->second = new_counter->value();
-    } else {
-      counter_value_map_[new_counter] = new_counter->value();
-    }
+  /// Construct a counter from existing samples.
+  AveragedCounter(TUnit::type unit, const std::vector<bool>& has_value,
+      const std::vector<int64_t>& values);
 
-    if (unit_ == TUnit::DOUBLE_VALUE) {
-      double old_double_val = *reinterpret_cast<double*>(&old_val);
-      current_double_sum_ += (new_counter->double_value() - old_double_val);
-      double result_val = current_double_sum_ / (double) counter_value_map_.size();
-      value_.Store(*reinterpret_cast<int64_t*>(&result_val));
-    } else {
-      current_int_sum_ = ArithmeticUtil::AsUnsigned<std::plus>(
-          current_int_sum_, (new_counter->value() - old_val));
-      value_.Store(current_int_sum_ / counter_value_map_.size());
-    }
-  }
+  /// Update the counter with a new value for the input instance at 'idx'.
+  /// No locks are obtained within this method because UpdateCounter() is called from
+  /// Update(), which obtains locks on the entire counter map in a profile.
+  /// Note that it is not thread-safe to call this from two threads at the same time.
+  /// It is safe for it to be read at the same time as it is updated.
+  void UpdateCounter(Counter* new_counter, int idx);
 
   /// The value for this counter should be updated through UpdateCounter().
   /// Set() and Add() should not be used.
@@ -431,15 +415,56 @@
   void Set(int64_t value) override { DCHECK(false); }
   void Add(int64_t delta) override { DCHECK(false); }
 
- private:
-  /// Map from counters to their existing values. Modified via UpdateCounter().
-  boost::unordered_map<Counter*, int64_t> counter_value_map_;
+  void PrettyPrint(
+      const std::string& prefix, const std::string& name, std::ostream* s) const override;
 
-  /// Current sums of values from counter_value_map_. Only one of these is used,
-  /// depending on the unit of the counter. current_double_sum_ is used for
-  /// DOUBLE_VALUE, current_int_sum_ otherwise.
-  double current_double_sum_;
-  int64_t current_int_sum_;
+  void ToThrift(const std::string& name, TAggCounter* tcounter) const;
+
+  int64_t value() const override;
+
+ private:
+  FRIEND_TEST(CountersTest, AveragedCounterStats);
+
+  /// Number of values in the below arrays.
+  const int num_values_;
+
+  /// Whether we have a valid value for each input counter. Always initialized to have
+  /// 'num_samples' entries.
+  std::unique_ptr<AtomicBool[]> has_value_;
+
+  /// The value of each input counter. Always initialized to have 'num_samples' entries.
+  std::unique_ptr<AtomicInt64[]> values_;
+
+  // Stats computed in GetStats().
+  template <typename T>
+  struct Stats {
+    T min = 0;
+    T mean = 0;
+    T max = 0;
+    // Values at different percentiles.
+    T p50 = 0;
+    T p75 = 0;
+    T p90 = 0;
+    T p95 = 0;
+    int num_vals = 0;
+  };
+
+  /// Implementation of PrettyPrint parameterized by the type that 'values_' is
+  /// interpreted as - either int64_t or double, depending on the T parameter.
+  template <typename T>
+  void PrettyPrintImpl(
+      const std::string& prefix, const std::string& name, std::ostream* s) const;
+
+  /// Compute all of the stats in Stats, interpreting the values in 'vals' as type T,
+  /// which must be double if 'unit_' is DOUBLE_VALUE or int64_t otherwise.
+  template <typename T>
+  Stats<T> GetStats() const;
+
+  /// Helper for value() that compute the mean value, interpreting the values in 'vals'
+  /// as type T, which must be double if 'unit_' is DOUBLE_VALUE or int64_t otherwise.
+  /// Returns the mean value, or the double bit pattern stored in an int64_t.
+  template <typename T>
+  int64_t ComputeMean() const;
 };
 
 /// This counter records multiple values and keeps a track of the minimum, maximum and
@@ -447,15 +472,15 @@
 /// Unlike the AveragedCounter, this only keeps track of statistics of raw values
 /// whereas the AveragedCounter maintains an average of counters.
 /// value() stores the average.
-class RuntimeProfile::SummaryStatsCounter : public RuntimeProfile::Counter {
+class RuntimeProfileBase::SummaryStatsCounter : public RuntimeProfileBase::Counter {
  public:
-  SummaryStatsCounter(TUnit::type unit, int32_t total_num_values,
-      int64_t min_value, int64_t max_value, int64_t sum)
-   : Counter(unit),
-     total_num_values_(total_num_values),
-     min_(min_value),
-     max_(max_value),
-     sum_(sum) {
+  SummaryStatsCounter(TUnit::type unit, int32_t total_num_values, int64_t min_value,
+      int64_t max_value, int64_t sum)
+    : Counter(unit),
+      total_num_values_(total_num_values),
+      min_(min_value),
+      max_(max_value),
+      sum_(sum) {
     value_.Store(total_num_values == 0 ? 0 : sum / total_num_values);
   }
 
@@ -484,8 +509,19 @@
   /// Overwrites the existing counter with 'counter'
   void SetStats(const TSummaryStatsCounter& counter);
 
+  /// Overwrites the existing counter with 'counter'. Acquires lock on both counters.
+  void SetStats(const SummaryStatsCounter& other);
+
+  /// Merge 'other' into this counter. Acquires lock on both counters.
+  void Merge(const SummaryStatsCounter& other);
+
   void ToThrift(TSummaryStatsCounter* counter, const std::string& name);
 
+  /// Convert a vector of summary stats counters to an aggregate representation.
+  static void ToThrift(const std::string& name, TUnit::type unit,
+      const std::vector<SummaryStatsCounter*>& counters,
+      TAggSummaryStatsCounter* tcounter);
+
   void ToJson(rapidjson::Document& document, rapidjson::Value* val) const override {
     Counter::ToJson(document, val);
     val->AddMember("min", min_, document.GetAllocator());
@@ -494,6 +530,9 @@
     val->AddMember("num_of_samples", total_num_values_, document.GetAllocator());
   }
 
+  void PrettyPrint(
+      const std::string& prefix, const std::string& name, std::ostream* s) const override;
+
  private:
   /// The total number of values seen so far.
   int32_t total_num_values_;
@@ -504,7 +543,9 @@
   int64_t sum_;
 
   // Protects min_, max_, sum_, total_num_values_ and value_.
-  SpinLock lock_;
+  // When acquiring locks on two counters, e.g. in Merge(), the source is acquired
+  // before the destination.
+  mutable SpinLock lock_;
 };
 
 /// A set of counters that measure thread info, such as total time, user time, sys time.
@@ -835,9 +876,9 @@
 template <class T>
 class ScopedTimer {
  public:
-  ScopedTimer(RuntimeProfile::Counter* c1 = nullptr,
-      RuntimeProfile::Counter* c2 = nullptr,
-      RuntimeProfile::Counter* c3 = nullptr, const bool* is_cancelled = nullptr)
+  ScopedTimer(RuntimeProfileBase::Counter* c1 = nullptr,
+      RuntimeProfileBase::Counter* c2 = nullptr,
+      RuntimeProfileBase::Counter* c3 = nullptr, const bool* is_cancelled = nullptr)
     : counter1_(c1), counter2_(c2), counter3_(c3), is_cancelled_(is_cancelled) {
     DCHECK(c1 == nullptr || c1->unit() == TUnit::TIME_NS);
     DCHECK(c2 == nullptr || c2->unit() == TUnit::TIME_NS);
@@ -881,9 +922,9 @@
   ScopedTimer& operator=(const ScopedTimer& timer);
 
   T sw_;
-  RuntimeProfile::Counter* counter1_;
-  RuntimeProfile::Counter* counter2_;
-  RuntimeProfile::Counter* counter3_;
+  RuntimeProfileBase::Counter* counter1_;
+  RuntimeProfileBase::Counter* counter2_;
+  RuntimeProfileBase::Counter* counter3_;
   const bool* is_cancelled_;
 };
 
diff --git a/be/src/util/runtime-profile-test.cc b/be/src/util/runtime-profile-test.cc
index dcb5bf7..db9e532 100644
--- a/be/src/util/runtime-profile-test.cc
+++ b/be/src/util/runtime-profile-test.cc
@@ -18,11 +18,13 @@
 #include <stdlib.h>
 #include <algorithm>
 #include <iostream>
+#include <random>
 
 #include <boost/bind.hpp>
 
 #include "common/object-pool.h"
 #include "testutil/gtest-util.h"
+#include "testutil/rand-util.h"
 #include "util/container-util.h"
 #include "util/periodic-counter-updater.h"
 #include "util/runtime-profile-counters.h"
@@ -33,8 +35,17 @@
 DECLARE_int32(status_report_interval_ms);
 DECLARE_int32(periodic_counter_update_period_ms);
 
+using std::mt19937;
+using std::shuffle;
+
 namespace impala {
 
+/// Return true if this is one of the counters automatically added to profiles,
+/// e.g. TotalTime.
+static bool IsDefaultCounter(const string& counter_name) {
+  return counter_name == "TotalTime" || counter_name == "InactiveTotalTime";
+}
+
 TEST(CountersTest, Basic) {
   ObjectPool pool;
   RuntimeProfile* profile_a = RuntimeProfile::Create(&pool, "ProfileA");
@@ -111,18 +122,19 @@
   EXPECT_EQ(exec_summary_result.status, status);
 
   // Averaged
-  RuntimeProfile* averaged_profile = RuntimeProfile::Create(&pool, "Merged", true);
-  averaged_profile->UpdateAverage(from_thrift);
+  AggregatedRuntimeProfile* averaged_profile =
+      AggregatedRuntimeProfile::Create(&pool, "Merged", 2, true);
+  averaged_profile->Update(from_thrift, 0);
   counter_merged = averaged_profile->GetCounter("A");
   EXPECT_EQ(counter_merged->value(), 1);
 
-  // UpdateAverage again, there should be no change.
-  averaged_profile->UpdateAverage(from_thrift);
+  // Update again, there should be no change.
+  averaged_profile->Update(from_thrift, 0);
   EXPECT_EQ(counter_merged->value(), 1);
 
   counter_a = profile_a2->AddCounter("A", TUnit::UNIT);
   counter_a->Set(3);
-  averaged_profile->UpdateAverage(profile_a2);
+  averaged_profile->Update(profile_a2, 1);
   EXPECT_EQ(counter_merged->value(), 2);
 
   // Update
@@ -137,7 +149,7 @@
   EXPECT_EQ(counter_updated->value(), 1);
 }
 
-void ValidateCounter(RuntimeProfile* profile, const string& name, int64_t value) {
+void ValidateCounter(RuntimeProfileBase* profile, const string& name, int64_t value) {
   RuntimeProfile::Counter* counter = profile->GetCounter(name);
   EXPECT_TRUE(counter != NULL);
   EXPECT_EQ(counter->value(), value);
@@ -198,20 +210,21 @@
   // Merge the two and validate
   TRuntimeProfileTree tprofile1;
   profile1->ToThrift(&tprofile1);
-  RuntimeProfile* averaged_profile = RuntimeProfile::Create(&pool, "merged", true);
-  averaged_profile->UpdateAverage(profile1);
-  averaged_profile->UpdateAverage(profile2);
+  AggregatedRuntimeProfile* averaged_profile =
+      AggregatedRuntimeProfile::Create(&pool, "merged", 2, true);
+  averaged_profile->Update(profile1, 0);
+  averaged_profile->Update(profile2, 1);
   EXPECT_EQ(5, averaged_profile->num_counters());
   ValidateCounter(averaged_profile, "Parent Shared", 2);
   ValidateCounter(averaged_profile, "Parent 1 Only", 2);
   ValidateCounter(averaged_profile, "Parent 2 Only", 5);
 
-  vector<RuntimeProfile*> children;
+  vector<RuntimeProfileBase*> children;
   averaged_profile->GetChildren(&children);
   EXPECT_EQ(children.size(), 3);
 
   for (int i = 0; i < 3; ++i) {
-    RuntimeProfile* profile = children[i];
+    RuntimeProfileBase* profile = children[i];
     if (profile->name().compare("Child1") == 0) {
       EXPECT_EQ(5, profile->num_counters());
       ValidateCounter(profile, "Child1 Shared", 15);
@@ -243,7 +256,7 @@
   EXPECT_EQ(children.size(), 3);
 
   for (int i = 0; i < 3; ++i) {
-    RuntimeProfile* profile = children[i];
+    RuntimeProfileBase* profile = children[i];
     if (profile->name().compare("Child1") == 0) {
       EXPECT_EQ(5, profile->num_counters());
       ValidateCounter(profile, "Child1 Shared", 10);
@@ -276,12 +289,13 @@
   profile1->ToThrift(&tprofile1_v1);
 
   // Update averaged and deserialized profiles from the serialized profile.
-  RuntimeProfile* averaged_profile = RuntimeProfile::Create(&pool, "merged", true);
+  AggregatedRuntimeProfile* averaged_profile =
+      AggregatedRuntimeProfile::Create(&pool, "merged", 2, true);
   RuntimeProfile* deserialized_profile = RuntimeProfile::Create(&pool, "Parent");
-  averaged_profile->UpdateAverage(profile1);
+  averaged_profile->Update(profile1, 0);
   deserialized_profile->Update(tprofile1_v1);
 
-  std::vector<RuntimeProfile*> tmp_children;
+  std::vector<RuntimeProfileBase*> tmp_children;
   averaged_profile->GetChildren(&tmp_children);
   EXPECT_EQ(1, tmp_children.size());
   EXPECT_EQ("Child2", tmp_children[0]->name());
@@ -293,7 +307,7 @@
   RuntimeProfile* p1_child1 = RuntimeProfile::Create(&pool, "Child1");
   profile1->PrependChild(p1_child1);
   profile1->ToThrift(&tprofile1_v2);
-  averaged_profile->UpdateAverage(profile1);
+  averaged_profile->Update(profile1, 0);
   deserialized_profile->Update(tprofile1_v2);
 
   averaged_profile->GetChildren(&tmp_children);
@@ -315,7 +329,7 @@
   EXPECT_EQ("Child2", tmp_children[0]->name());
   EXPECT_EQ("Child1", tmp_children[1]->name());
   profile1->ToThrift(&tprofile1_v3);
-  averaged_profile->UpdateAverage(profile1);
+  averaged_profile->Update(profile1, 0);
   deserialized_profile->Update(tprofile1_v2);
 
   // The previous order of children that were already present is preserved.
@@ -553,20 +567,20 @@
       profile->AddCounter("bytes 2", TUnit::BYTES);
 
   bytes_1_counter->Set(10);
-  RuntimeProfile::AveragedCounter bytes_avg(TUnit::BYTES);
-  bytes_avg.UpdateCounter(bytes_1_counter);
+  RuntimeProfile::AveragedCounter bytes_avg(TUnit::BYTES, 2);
+  bytes_avg.UpdateCounter(bytes_1_counter, 0);
   // Avg of 10L
   EXPECT_EQ(bytes_avg.value(), 10);
   bytes_1_counter->Set(20L);
-  bytes_avg.UpdateCounter(bytes_1_counter);
+  bytes_avg.UpdateCounter(bytes_1_counter, 0);
   // Avg of 20L
   EXPECT_EQ(bytes_avg.value(), 20);
   bytes_2_counter->Set(40L);
-  bytes_avg.UpdateCounter(bytes_2_counter);
+  bytes_avg.UpdateCounter(bytes_2_counter, 1);
   // Avg of 20L and 40L
   EXPECT_EQ(bytes_avg.value(), 30);
   bytes_2_counter->Set(30L);
-  bytes_avg.UpdateCounter(bytes_2_counter);
+  bytes_avg.UpdateCounter(bytes_2_counter, 1);
   // Avg of 20L and 30L
   EXPECT_EQ(bytes_avg.value(), 25);
 
@@ -575,24 +589,70 @@
   RuntimeProfile::Counter* double_2_counter =
       profile->AddCounter("double 2", TUnit::DOUBLE_VALUE);
   double_1_counter->Set(1.0f);
-  RuntimeProfile::AveragedCounter double_avg(TUnit::DOUBLE_VALUE);
-  double_avg.UpdateCounter(double_1_counter);
+  RuntimeProfile::AveragedCounter double_avg(TUnit::DOUBLE_VALUE, 2);
+  double_avg.UpdateCounter(double_1_counter, 0);
   // Avg of 1.0f
   EXPECT_EQ(double_avg.double_value(), 1.0f);
   double_1_counter->Set(2.0f);
-  double_avg.UpdateCounter(double_1_counter);
+  double_avg.UpdateCounter(double_1_counter, 0);
   // Avg of 2.0f
   EXPECT_EQ(double_avg.double_value(), 2.0f);
   double_2_counter->Set(4.0f);
-  double_avg.UpdateCounter(double_2_counter);
+  double_avg.UpdateCounter(double_2_counter, 1);
   // Avg of 2.0f and 4.0f
   EXPECT_EQ(double_avg.double_value(), 3.0f);
   double_2_counter->Set(3.0f);
-  double_avg.UpdateCounter(double_2_counter);
+  double_avg.UpdateCounter(double_2_counter, 1);
   // Avg of 2.0f and 3.0f
   EXPECT_EQ(double_avg.double_value(), 2.5f);
 }
 
+TEST(CountersTest, AveragedCounterStats) {
+  ObjectPool pool;
+  RuntimeProfile* profile = RuntimeProfile::Create(&pool, "Profile");
+  // Average 100 input counters with values 100-199.
+  const int NUM_COUNTERS = 100;
+  vector<RuntimeProfile::Counter*> counters;
+  for (int i = 0; i < NUM_COUNTERS; ++i) {
+    counters.push_back(
+        profile->AddCounter(Substitute("c$0", i), TUnit::BYTES));
+    counters.back()->Set(100 + i);
+  }
+  // Randomize counter order - computed stats shouldn't depend on order.
+  mt19937 rng;
+  RandTestUtil::SeedRng("RUNTIME_PROFILE_TEST_SEED", &rng);
+  shuffle(counters.begin(), counters.end(), rng);
+
+  RuntimeProfile::AveragedCounter bytes_avg(TUnit::BYTES, NUM_COUNTERS);
+  for (int i = 0; i < NUM_COUNTERS; ++i) {
+    bytes_avg.UpdateCounter(counters[i], i);
+  }
+  RuntimeProfile::AveragedCounter::Stats<int64_t> stats = bytes_avg.GetStats<int64_t>();
+  EXPECT_EQ(NUM_COUNTERS, stats.num_vals);
+  EXPECT_EQ(100, stats.min);
+  EXPECT_EQ(199, stats.max);
+  EXPECT_EQ(149, stats.mean);
+  EXPECT_EQ(149, stats.p50);
+  EXPECT_EQ(174, stats.p75);
+  EXPECT_EQ(189, stats.p90);
+  EXPECT_EQ(194, stats.p95);
+
+  // Round-trip via thrift and confirm values are all the same.
+  TAggCounter tcounter;
+  bytes_avg.ToThrift("", &tcounter);
+  RuntimeProfile::AveragedCounter bytes_avg2(
+      TUnit::BYTES, tcounter.has_value, tcounter.values);
+  stats = bytes_avg2.GetStats<int64_t>();
+  EXPECT_EQ(NUM_COUNTERS, stats.num_vals);
+  EXPECT_EQ(100, stats.min);
+  EXPECT_EQ(199, stats.max);
+  EXPECT_EQ(149, stats.mean);
+  EXPECT_EQ(149, stats.p50);
+  EXPECT_EQ(174, stats.p75);
+  EXPECT_EQ(189, stats.p90);
+  EXPECT_EQ(194, stats.p95);
+}
+
 TEST(CountersTest, InfoStringTest) {
   ObjectPool pool;
   RuntimeProfile* profile = RuntimeProfile::Create(&pool, "Profile");
@@ -1220,7 +1280,7 @@
   EXPECT_EQ("Value", content["info_strings"][0]["value"]);
 
   // Check counter value matches
-  EXPECT_EQ(2, content["counters"].Size());
+  EXPECT_EQ(4, content["counters"].Size());
   for (auto& itr : content["counters"].GetArray()) {
     // check normal Counter
     if (itr["counter_name"] == "A") {
@@ -1231,7 +1291,8 @@
       EXPECT_EQ(20, itr["value"].GetInt());
       EXPECT_EQ("BYTES", itr["unit"]);
     } else {
-      EXPECT_TRUE(false) << itr["counter_name"].GetString();
+      EXPECT_TRUE(IsDefaultCounter(itr["counter_name"].GetString()))
+          << itr["counter_name"].GetString();
     }
   }
 
@@ -1264,11 +1325,16 @@
   // Empty profile should not have following members
   EXPECT_TRUE(!content.HasMember("info_strings"));
   EXPECT_TRUE(!content.HasMember("event_sequences"));
-  EXPECT_TRUE(!content.HasMember("counters"));
   EXPECT_TRUE(!content.HasMember("summary_stats_counters"));
   EXPECT_TRUE(!content.HasMember("time_series_counters"));
   EXPECT_TRUE(!content.HasMember("child_profiles"));
 
+  // Only default counters should be present.
+  EXPECT_EQ(2, content["counters"].Size());
+  for (auto& itr : content["counters"].GetArray()) {
+    EXPECT_TRUE(IsDefaultCounter(itr["counter_name"].GetString()))
+        << itr["counter_name"].GetString();
+  }
 }
 
 TEST(ToJson, EventSequenceToJsonTest) {
diff --git a/be/src/util/runtime-profile.cc b/be/src/util/runtime-profile.cc
index a78bfef..79453c0 100644
--- a/be/src/util/runtime-profile.cc
+++ b/be/src/util/runtime-profile.cc
@@ -21,8 +21,11 @@
 #include <iomanip>
 #include <iostream>
 #include <mutex>
+#include <numeric>
+#include <type_traits>
 #include <utility>
 
+#include <boost/algorithm/string/join.hpp>
 #include <boost/bind.hpp>
 
 #include "common/object-pool.h"
@@ -46,6 +49,10 @@
 DECLARE_int32(status_report_interval_ms);
 DECLARE_int32(periodic_counter_update_period_ms);
 
+DEFINE_bool_hidden(gen_experimental_profile, false,
+    "(Experimental) generate a new aggregated runtime profile layout. Format is subject "
+    "to change.");
+
 using namespace rapidjson;
 
 namespace impala {
@@ -60,12 +67,34 @@
 // The root counter name for all top level counters.
 static const string ROOT_COUNTER = "";
 
-const string RuntimeProfile::TOTAL_TIME_COUNTER_NAME = "TotalTime";
-const string RuntimeProfile::LOCAL_TIME_COUNTER_NAME = "LocalTime";
-const string RuntimeProfile::INACTIVE_TIME_COUNTER_NAME = "InactiveTotalTime";
+const string RuntimeProfileBase::TOTAL_TIME_COUNTER_NAME = "TotalTime";
+const string RuntimeProfileBase::LOCAL_TIME_COUNTER_NAME = "LocalTime";
+const string RuntimeProfileBase::INACTIVE_TIME_COUNTER_NAME = "InactiveTotalTime";
 
 constexpr ProfileEntryPrototype::Significance ProfileEntryPrototype::ALLSIGNIFICANCE[];
 
+/// Helper to interpret the bit pattern of 'val' as T, which can either be an int64_t or
+/// a double.
+template <typename T>
+static T BitcastFromInt64(int64_t val) {
+  static_assert(std::is_same<T, int64_t>::value || std::is_same<T, double>::value,
+      "Only double and int64_t are supported");
+  T res;
+  memcpy(&res, &val, sizeof(int64_t));
+  return res;
+}
+
+/// Helper to store the bit pattern of 'val' as an int64_t, T can either be an int64_t or
+/// a double.
+template <typename T>
+static int64_t BitcastToInt64(T val) {
+  static_assert(std::is_same<T, int64_t>::value || std::is_same<T, double>::value,
+      "Only double and int64_t are supported");
+  int64_t res;
+  memcpy(&res, &val, sizeof(int64_t));
+  return res;
+}
+
 void ProfileEntryPrototypeRegistry::AddPrototype(const ProfileEntryPrototype* prototype) {
   lock_guard<SpinLock> l(lock_);
   DCHECK(prototypes_.find(prototype->name()) == prototypes_.end()) <<
@@ -129,29 +158,22 @@
   }
 }
 
-RuntimeProfile* RuntimeProfile::Create(ObjectPool* pool, const string& name,
-    bool is_averaged_profile) {
-  return pool->Add(new RuntimeProfile(pool, name, is_averaged_profile));
+RuntimeProfileBase::RuntimeProfileBase(ObjectPool* pool, const string& name)
+  : pool_(pool), name_(name) {}
+
+RuntimeProfileBase::~RuntimeProfileBase() {}
+
+RuntimeProfile* RuntimeProfile::Create(ObjectPool* pool, const string& name) {
+  return pool->Add(new RuntimeProfile(pool, name));
 }
 
-RuntimeProfile::RuntimeProfile(
-    ObjectPool* pool, const string& name, bool is_averaged_profile)
-  : pool_(pool),
-    name_(name),
-    is_averaged_profile_(is_averaged_profile),
-    counter_total_time_(TUnit::TIME_NS),
-    inactive_timer_(TUnit::TIME_NS) {
-  Counter* total_time_counter;
-  Counter* inactive_timer;
-  if (!is_averaged_profile) {
-    total_time_counter = &counter_total_time_;
-    inactive_timer = &inactive_timer_;
-  } else {
-    total_time_counter = pool->Add(new AveragedCounter(TUnit::TIME_NS));
-    inactive_timer = pool->Add(new AveragedCounter(TUnit::TIME_NS));
-  }
-  counter_map_[TOTAL_TIME_COUNTER_NAME] = total_time_counter;
-  counter_map_[INACTIVE_TIME_COUNTER_NAME] = inactive_timer;
+RuntimeProfile::RuntimeProfile(ObjectPool* pool, const string& name)
+  : RuntimeProfileBase(pool, name) {
+  set<string>& root_counters = child_counter_map_[ROOT_COUNTER];
+  counter_map_[TOTAL_TIME_COUNTER_NAME] = &counter_total_time_;
+  root_counters.emplace(TOTAL_TIME_COUNTER_NAME);
+  counter_map_[INACTIVE_TIME_COUNTER_NAME] = &inactive_timer_;
+  root_counters.emplace(INACTIVE_TIME_COUNTER_NAME);
 }
 
 RuntimeProfile::~RuntimeProfile() {
@@ -180,27 +202,67 @@
     const TRuntimeProfileTree& profiles) {
   if (profiles.nodes.size() == 0) return NULL;
   int idx = 0;
-  RuntimeProfile* profile = RuntimeProfile::CreateFromThrift(pool, profiles.nodes, &idx);
-  profile->SetTExecSummary(profiles.exec_summary);
-  return profile;
+  RuntimeProfileBase* profile =
+      RuntimeProfileBase::CreateFromThriftHelper(pool, profiles.nodes, &idx);
+  // The root must always be a RuntimeProfile, not an AggregatedProfile.
+  RuntimeProfile* root = dynamic_cast<RuntimeProfile*>(profile);
+  DCHECK(root != nullptr);
+  root->SetTExecSummary(profiles.exec_summary);
+  // Some values like local time are not serialized to Thrift and need to be
+  // recomputed.
+  root->ComputeTimeInProfile();
+  return root;
 }
 
-RuntimeProfile* RuntimeProfile::CreateFromThrift(ObjectPool* pool,
-    const vector<TRuntimeProfileNode>& nodes, int* idx) {
+RuntimeProfileBase* RuntimeProfileBase::CreateFromThriftHelper(
+    ObjectPool* pool, const vector<TRuntimeProfileNode>& nodes, int* idx) {
   DCHECK_LT(*idx, nodes.size());
 
   const TRuntimeProfileNode& node = nodes[*idx];
-  RuntimeProfile* profile = Create(pool, node.name);
+  RuntimeProfileBase* profile;
+  if (FLAGS_gen_experimental_profile && node.__isset.aggregated) {
+    DCHECK(node.aggregated.__isset.num_instances);
+    profile = AggregatedRuntimeProfile::Create(pool, node.name,
+        node.aggregated.num_instances, node.aggregated.__isset.input_profiles);
+  } else {
+    // If we're not using the transposed profile representation, just convert
+    // the averaged profile to a regular profile (this is what this code
+    // always did in the past).
+    profile = RuntimeProfile::Create(pool, node.name);
+  }
   profile->metadata_ = node.node_metadata;
+  profile->InitFromThrift(node, pool);
+  profile->child_counter_map_ = node.child_counters_map;
+
+  // TODO: IMPALA-9382: move to RuntimeProfile::InitFromThrift() once 'info_strings_' is
+  // moved.
+  profile->info_strings_ = node.info_strings;
+  profile->info_strings_display_order_ = node.info_strings_display_order;
+
+  ++*idx;
+  {
+    lock_guard<SpinLock> l(profile->children_lock_);
+    for (int i = 0; i < node.num_children; ++i) {
+      bool indent = nodes[*idx].indent;
+      profile->AddChildLocked(
+          RuntimeProfileBase::CreateFromThriftHelper(pool, nodes, idx),
+          indent, profile->children_.end());
+    }
+  }
+  return profile;
+}
+
+void RuntimeProfile::InitFromThrift(const TRuntimeProfileNode& node, ObjectPool* pool) {
+  // Only read 'counters' for non-aggregated profiles. Aggregated profiles will populate
+  // 'counter_map_' from the aggregated counters in thrift.
   for (int i = 0; i < node.counters.size(); ++i) {
     const TCounter& counter = node.counters[i];
-    profile->counter_map_[counter.name] =
-        pool->Add(new Counter(counter.unit, counter.value));
+    counter_map_[counter.name] = pool->Add(new Counter(counter.unit, counter.value));
   }
 
   if (node.__isset.event_sequences) {
     for (const TEventSequence& sequence: node.event_sequences) {
-      profile->event_sequence_map_[sequence.name] =
+      event_sequence_map_[sequence.name] =
           pool->Add(new EventSequence(sequence.timestamps, sequence.labels));
     }
   }
@@ -209,36 +271,37 @@
     for (const TTimeSeriesCounter& val: node.time_series_counters) {
       // Capture all incoming time series counters with the same type since re-sampling
       // will have happened on the sender side.
-      profile->time_series_counter_map_[val.name] = pool->Add(
+      time_series_counter_map_[val.name] = pool->Add(
           new ChunkedTimeSeriesCounter(val.name, val.unit, val.period_ms, val.values));
     }
   }
 
   if (node.__isset.summary_stats_counters) {
     for (const TSummaryStatsCounter& val: node.summary_stats_counters) {
-      profile->summary_stats_map_[val.name] =
-          pool->Add(new SummaryStatsCounter(
-              val.unit, val.total_num_values, val.min_value, val.max_value, val.sum));
+      summary_stats_map_[val.name] = pool->Add(new SummaryStatsCounter(
+          val.unit, val.total_num_values, val.min_value, val.max_value, val.sum));
     }
   }
-
-  profile->child_counter_map_ = node.child_counters_map;
-  profile->info_strings_ = node.info_strings;
-  profile->info_strings_display_order_ = node.info_strings_display_order;
-
-  ++*idx;
-  for (int i = 0; i < node.num_children; ++i) {
-    bool indent = nodes[*idx].indent;
-    profile->AddChild(RuntimeProfile::CreateFromThrift(pool, nodes, idx), indent);
-  }
-  // Compute timers that are not serialized to the thrift.
-  profile->ComputeTimeInProfile();
-  return profile;
 }
 
-void RuntimeProfile::UpdateAverage(RuntimeProfile* other) {
+void AggregatedRuntimeProfile::Update(RuntimeProfile* other, int idx) {
+  {
+    lock_guard<SpinLock> l(input_profile_name_lock_);
+    DCHECK(!input_profile_names_.empty())
+        << "Update() can only be called on root of averaged profile tree";
+    input_profile_names_[idx] = other->name();
+  }
+
+  UpdateRecursive(other, idx);
+
+  // Recursively compute times on the whole tree.
+  ComputeTimeInProfile();
+}
+
+void AggregatedRuntimeProfile::UpdateRecursive(RuntimeProfile* other, int idx) {
   DCHECK(other != NULL);
-  DCHECK(is_averaged_profile_);
+  DCHECK_GE(idx, 0);
+  DCHECK_LT(idx, num_input_profiles_);
 
   // Merge this level
   {
@@ -248,26 +311,22 @@
     lock_guard<SpinLock> m(other->counter_map_lock_);
     for (src_iter = other->counter_map_.begin();
          src_iter != other->counter_map_.end(); ++src_iter) {
-
-      // Ignore this counter for averages.
-      if (src_iter->first == INACTIVE_TIME_COUNTER_NAME) continue;
-
       dst_iter = counter_map_.find(src_iter->first);
       AveragedCounter* avg_counter;
 
       // Get the counter with the same name in dst_iter (this->counter_map_)
       // Create one if it doesn't exist.
       if (dst_iter == counter_map_.end()) {
-        avg_counter = pool_->Add(new AveragedCounter(src_iter->second->unit()));
+        avg_counter = pool_->Add(
+            new AveragedCounter(src_iter->second->unit(), num_input_profiles_));
         counter_map_[src_iter->first] = avg_counter;
       } else {
         DCHECK(dst_iter->second->unit() == src_iter->second->unit());
         avg_counter = static_cast<AveragedCounter*>(dst_iter->second);
       }
-      avg_counter->UpdateCounter(src_iter->second);
+      avg_counter->UpdateCounter(src_iter->second, idx);
     }
 
-    // TODO: Can we unlock the counter_map_lock_ here?
     ChildCounterMap::const_iterator child_counter_src_itr;
     for (child_counter_src_itr = other->child_counter_map_.begin();
          child_counter_src_itr != other->child_counter_map_.end();
@@ -279,21 +338,62 @@
     }
   }
 
+  if (FLAGS_gen_experimental_profile) {
+    lock_guard<SpinLock> l(agg_info_strings_lock_);
+    lock_guard<SpinLock> m(other->info_strings_lock_);
+    for (const auto& entry : other->info_strings_) {
+      vector<string>& values = agg_info_strings_[entry.first];
+      if (values.empty()) values.resize(num_input_profiles_);
+      if (values[idx] != entry.second) values[idx] = entry.second;
+    }
+  }
+
+  if (FLAGS_gen_experimental_profile) {
+    // Merge summary stats.
+    lock_guard<SpinLock> l(summary_stats_map_lock_);
+    lock_guard<SpinLock> m(other->summary_stats_map_lock_);
+    for (const RuntimeProfile::SummaryStatsCounterMap::value_type& src_entry :
+        other->summary_stats_map_) {
+      DCHECK_GT(num_input_profiles_, 0);
+      AggSummaryStatsCounterMap::mapped_type& agg_entry =
+          summary_stats_map_[src_entry.first];
+      vector<SummaryStatsCounter*>& agg_instance_counters = agg_entry.second;
+      if (agg_instance_counters.empty()) {
+        agg_instance_counters.resize(num_input_profiles_);
+        agg_entry.first = src_entry.second->unit();
+      } else {
+        DCHECK_EQ(agg_entry.first, src_entry.second->unit()) << "Unit must be consistent";
+      }
+
+      // Get the counter with the same name.  Create one if it doesn't exist.
+      if (agg_instance_counters[idx] == nullptr) {
+        agg_instance_counters[idx] =
+            pool_->Add(new SummaryStatsCounter(src_entry.second->unit()));
+      }
+      // Overwrite the previous value with the new value.
+      agg_instance_counters[idx]->SetStats(*src_entry.second);
+    }
+  }
+
   {
     lock_guard<SpinLock> l(children_lock_);
     lock_guard<SpinLock> m(other->children_lock_);
     // Recursively merge children with matching names.
     // Track the current position in the vector so we preserve the order of children
-    // if children are added after the first Update()/UpdateAverage() call (IMPALA-6694).
+    // if children are added after the first Update() call (IMPALA-6694).
     // E.g. if the first update sends [B, D] and the second update sends [A, B, C, D],
     // then this code makes sure that children_ is [A, B, C, D] afterwards.
     ChildVector::iterator insert_pos = children_.begin();
     for (int i = 0; i < other->children_.size(); ++i) {
-      RuntimeProfile* other_child = other->children_[i].first;
-      ChildMap::iterator j = child_map_.find(other_child->name_);
-      RuntimeProfile* child = NULL;
+      RuntimeProfile* other_child =
+          dynamic_cast<RuntimeProfile*>(other->children_[i].first);
+      DCHECK(other_child != nullptr)
+          << other->children_[i].first->name() << " must be a RuntimeProfile";
+      ChildMap::iterator j = child_map_.find(other_child->name());
+      AggregatedRuntimeProfile* child = NULL;
       if (j != child_map_.end()) {
-        child = j->second;
+        child = dynamic_cast<AggregatedRuntimeProfile*>(j->second);
+        DCHECK(child != nullptr);
         // Search forward until the insert position is either at the end of the vector
         // or after this child. This preserves the order if the relative order of
         // children in all updates is consistent.
@@ -303,24 +403,25 @@
           ++insert_pos;
         }
       } else {
-        child = Create(pool_, other_child->name_, true);
-        child->metadata_ = other_child->metadata_;
+        child =
+            Create(pool_, other_child->name(), num_input_profiles_, /*is_root=*/false);
+        child->metadata_ = other_child->metadata();
         bool indent_other_child = other->children_[i].second;
         child_map_[child->name_] = child;
         insert_pos = children_.insert(insert_pos, make_pair(child, indent_other_child));
         ++insert_pos;
       }
-      child->UpdateAverage(other_child);
+      child->UpdateRecursive(other_child, idx);
     }
   }
-
-  ComputeTimeInProfile();
 }
 
 void RuntimeProfile::Update(const TRuntimeProfileTree& thrift_profile) {
   int idx = 0;
   Update(thrift_profile.nodes, &idx);
   DCHECK_EQ(idx, thrift_profile.nodes.size());
+  // Re-compute the total time for the entire profile tree.
+  ComputeTimeInProfile();
 }
 
 void RuntimeProfile::Update(const vector<TRuntimeProfileNode>& nodes, int* idx) {
@@ -425,7 +526,7 @@
   {
     lock_guard<SpinLock> l(children_lock_);
     // Track the current position in the vector so we preserve the order of children
-    // if children are added after the first Update()/UpdateAverage() call (IMPALA-6694).
+    // if children are added after the first Update() call (IMPALA-6694).
     // E.g. if the first update sends [B, D] and the second update sends [A, B, C, D],
     // then this code makes sure that children_ is [A, B, C, D] afterwards.
     ChildVector::iterator insert_pos = children_.begin();
@@ -435,7 +536,8 @@
       ChildMap::iterator j = child_map_.find(tchild.name);
       RuntimeProfile* child = NULL;
       if (j != child_map_.end()) {
-        child = j->second;
+        child = dynamic_cast<RuntimeProfile*>(j->second);
+        DCHECK(child != nullptr) << j->second->name() << " must be a RuntimeProfile";
         // Search forward until the insert position is either at the end of the vector
         // or after this child. This preserves the order if the relative order of
         // children in all updates is consistent.
@@ -456,52 +558,20 @@
   }
 }
 
-void RuntimeProfile::Divide(int n) {
-  DCHECK_GT(n, 0);
-  map<string, Counter*>::iterator iter;
-  {
-    lock_guard<SpinLock> l(counter_map_lock_);
-    for (iter = counter_map_.begin(); iter != counter_map_.end(); ++iter) {
-      if (iter->second->unit() == TUnit::DOUBLE_VALUE) {
-        iter->second->Set(iter->second->double_value() / n);
-      } else {
-        iter->second->value_.Store(iter->second->value() / n);
-      }
-    }
-  }
-  {
-    lock_guard<SpinLock> l(children_lock_);
-    for (ChildMap::iterator i = child_map_.begin(); i != child_map_.end(); ++i) {
-      i->second->Divide(n);
-    }
-  }
-}
-
-void RuntimeProfile::ComputeTimeInProfile() {
-  ComputeTimeInProfile(total_time_counter()->value());
-}
-
-void RuntimeProfile::ComputeTimeInProfile(int64_t total) {
+void RuntimeProfileBase::ComputeTimeInProfile() {
   // Recurse on children. After this, childrens' total time is up to date.
-  {
-    lock_guard<SpinLock> l(children_lock_);
-    for (int i = 0; i < children_.size(); ++i) {
-      children_[i].first->ComputeTimeInProfile();
-    }
-  }
-
-  // Get total time from children
   int64_t children_total_time = 0;
   {
     lock_guard<SpinLock> l(children_lock_);
     for (int i = 0; i < children_.size(); ++i) {
+      children_[i].first->ComputeTimeInProfile();
       children_total_time += children_[i].first->total_time();
     }
   }
   // IMPALA-5200: Take the max, because the parent includes all of the time from the
   // children, whether or not its total time counter has been updated recently enough
   // to see this.
-  total_time_ns_.Store(max(children_total_time, total_time_counter()->value()));
+  int64_t total_time_ns = max(children_total_time, total_time_counter()->value());
 
   // If a local time counter exists, use its value as local time. Otherwise, derive the
   // local time from total time and the child time.
@@ -517,20 +587,20 @@
   }
 
   if (!has_local_time_counter) {
-    local_time_ns = total_time_ns_.Load() - children_total_time;
-    if (!is_averaged_profile_) {
-      local_time_ns -= inactive_timer()->value();
-    }
+    local_time_ns = total_time_ns - children_total_time;
+    local_time_ns -= inactive_timer()->value();
   }
   // Counters have some margin, set to 0 if it was negative.
   local_time_ns = ::max<int64_t>(0, local_time_ns);
-  local_time_ns_.Store(local_time_ns);
   double local_time_frac =
-      min(1.0, static_cast<double>(local_time_ns) / total_time_ns_.Load());
-  local_time_frac_.Store(*reinterpret_cast<int64_t*>(&local_time_frac));
+      min(1.0, static_cast<double>(local_time_ns) / total_time_ns);
+  total_time_ns_.Store(total_time_ns);
+  local_time_ns_.Store(local_time_ns);
+  local_time_frac_.Store(BitcastToInt64(local_time_frac));
 }
 
-void RuntimeProfile::AddChild(RuntimeProfile* child, bool indent, RuntimeProfile* loc) {
+void RuntimeProfile::AddChild(
+    RuntimeProfileBase* child, bool indent, RuntimeProfile* loc) {
   lock_guard<SpinLock> l(children_lock_);
   ChildVector::iterator insert_pos;
   if (loc == NULL) {
@@ -549,20 +619,20 @@
   AddChildLocked(child, indent, insert_pos);
 }
 
-void RuntimeProfile::AddChildLocked(
-    RuntimeProfile* child, bool indent, ChildVector::iterator insert_pos) {
+void RuntimeProfileBase::AddChildLocked(
+    RuntimeProfileBase* child, bool indent, ChildVector::iterator insert_pos) {
   children_lock_.DCheckLocked();
   DCHECK(child != NULL);
-  if (child_map_.count(child->name_) > 0) {
+  if (child_map_.count(child->name()) > 0) {
     // This child has already been added, so do nothing.
     // Otherwise, the map and vector will be out of sync.
     return;
   }
-  child_map_[child->name_] = child;
+  child_map_[child->name()] = child;
   children_.insert(insert_pos, make_pair(child, indent));
 }
 
-void RuntimeProfile::PrependChild(RuntimeProfile* child, bool indent) {
+void RuntimeProfile::PrependChild(RuntimeProfileBase* child, bool indent) {
   lock_guard<SpinLock> l(children_lock_);
   AddChildLocked(child, indent, children_.begin());
 }
@@ -576,13 +646,13 @@
   return child;
 }
 
-void RuntimeProfile::GetChildren(vector<RuntimeProfile*>* children) {
+void RuntimeProfileBase::GetChildren(vector<RuntimeProfileBase*>* children) {
   children->clear();
   lock_guard<SpinLock> l(children_lock_);
   for (const auto& entry : children_) children->push_back(entry.first);
 }
 
-void RuntimeProfile::GetAllChildren(vector<RuntimeProfile*>* children) {
+void RuntimeProfileBase::GetAllChildren(vector<RuntimeProfileBase*>* children) {
   lock_guard<SpinLock> l(children_lock_);
   for (ChildMap::iterator i = child_map_.begin(); i != child_map_.end(); ++i) {
     children->push_back(i->second);
@@ -590,6 +660,11 @@
   }
 }
 
+int RuntimeProfileBase::num_counters() const {
+  std::lock_guard<SpinLock> l(counter_map_lock_);
+  return counter_map_.size();
+}
+
 void RuntimeProfile::SortChildrenByTotalTime() {
   lock_guard<SpinLock> l(children_lock_);
   // Create a snapshot of total time values so that they don't change while we're
@@ -608,7 +683,7 @@
   children_ = move(new_children);
 }
 
-void RuntimeProfile::AddInfoString(const string& key, const string& value) {
+void RuntimeProfileBase::AddInfoString(const string& key, const string& value) {
   return AddInfoStringInternal(key, value, false);
 }
 
@@ -620,9 +695,8 @@
   return AddInfoStringInternal(key, value, true);
 }
 
-void RuntimeProfile::AddInfoStringInternal(const string& key, string value,
-    bool append, bool redact) {
-
+void RuntimeProfileBase::AddInfoStringInternal(
+    const string& key, string value, bool append, bool redact) {
   if (redact) Redact(&value);
 
   StripTrailingWhitespace(&value);
@@ -672,7 +746,6 @@
   RuntimeProfile::T* RuntimeProfile::NAME##Locked( const string& name,           \
       TUnit::type unit, const string& parent_counter_name, bool* created) {      \
     counter_map_lock_.DCheckLocked();                                            \
-    DCHECK_EQ(is_averaged_profile_, false);                                      \
     if (counter_map_.find(name) != counter_map_.end()) {                         \
       *created = false;                                                          \
       return reinterpret_cast<T*>(counter_map_[name]);                           \
@@ -695,7 +768,6 @@
 RuntimeProfile::DerivedCounter* RuntimeProfile::AddDerivedCounter(
     const string& name, TUnit::type unit,
     const SampleFunction& counter_fn, const string& parent_counter_name) {
-  DCHECK_EQ(is_averaged_profile_, false);
   lock_guard<SpinLock> l(counter_map_lock_);
   if (counter_map_.find(name) != counter_map_.end()) return NULL;
   DerivedCounter* counter = pool_->Add(new DerivedCounter(unit, counter_fn));
@@ -730,7 +802,7 @@
   counter_map_[LOCAL_TIME_COUNTER_NAME] = local_time_counter;
 }
 
-RuntimeProfile::Counter* RuntimeProfile::GetCounter(const string& name) {
+RuntimeProfileBase::Counter* RuntimeProfileBase::GetCounter(const string& name) {
   lock_guard<SpinLock> l(counter_map_lock_);
   if (counter_map_.find(name) != counter_map_.end()) {
     return counter_map_[name];
@@ -738,7 +810,7 @@
   return NULL;
 }
 
-RuntimeProfile::SummaryStatsCounter* RuntimeProfile::GetSummaryStatsCounter(
+RuntimeProfileBase::SummaryStatsCounter* RuntimeProfile::GetSummaryStatsCounter(
     const string& name) {
   lock_guard<SpinLock> l(summary_stats_map_lock_);
   if (summary_stats_map_.find(name) != summary_stats_map_.end()) {
@@ -747,7 +819,7 @@
   return nullptr;
 }
 
-void RuntimeProfile::GetCounters(const string& name, vector<Counter*>* counters) {
+void RuntimeProfileBase::GetCounters(const string& name, vector<Counter*>* counters) {
   Counter* c = GetCounter(name);
   if (c != NULL) counters->push_back(c);
 
@@ -765,17 +837,17 @@
   return it->second;
 }
 
-void RuntimeProfile::ToJson(Document* d) const{
+void RuntimeProfile::ToJson(Document* d) const {
   // queryObj that stores all JSON format profile information
   Value queryObj(kObjectType);
-  RuntimeProfile::ToJsonHelper(&queryObj, d);
+  ToJsonHelper(&queryObj, d);
   d->RemoveMember("contents");
   d->AddMember("contents", queryObj, d->GetAllocator());
 }
 
-void RuntimeProfile::ToJsonCounters(Value* parent, Document* d,
+void RuntimeProfileBase::ToJsonCounters(Value* parent, Document* d,
     const string& counter_name, const CounterMap& counter_map,
-    const ChildCounterMap& child_counter_map) const{
+    const ChildCounterMap& child_counter_map) const {
   auto& allocator = d->GetAllocator();
   ChildCounterMap::const_iterator itr = child_counter_map.find(counter_name);
   if (itr != child_counter_map.end()) {
@@ -790,8 +862,8 @@
       counter.AddMember("counter_name", child_counter_json, allocator);
 
       Value child_counters_json(kArrayType);
-      RuntimeProfile::ToJsonCounters(&child_counters_json, d,
-          child_counter, counter_map,child_counter_map);
+      ToJsonCounters(
+          &child_counters_json, d, child_counter, counter_map, child_counter_map);
       if (!child_counters_json.Empty()){
         counter.AddMember("child_counters", child_counters_json, allocator);
       }
@@ -800,7 +872,7 @@
   }
 }
 
-void RuntimeProfile::ToJsonHelper(Value* parent, Document* d) const{
+void RuntimeProfileBase::ToJsonHelper(Value* parent, Document* d) const {
   Document::AllocatorType& allocator = d->GetAllocator();
   // Create copy of counter_map_ and child_counter_map_ so we don't need to hold lock
   // while we call value() on the counters (some of those might be DerivedCounters).
@@ -839,7 +911,8 @@
     parent->AddMember("node_metadata", node_metadata_json, allocator);
   }
 
-  // 4. Info_strings
+  // 4. Info strings
+  // TODO: IMPALA-9382: move to subclass once 'info_strings_' is also moved
   {
     lock_guard<SpinLock> l(info_strings_lock_);
     if (!info_strings_.empty()) {
@@ -858,7 +931,41 @@
     }
   }
 
-  // 5. Events
+  // 5. Counters and info strings from subclasses
+  ToJsonSubclass(parent, d);
+
+  // 6. Counters
+  Value counters(kArrayType);
+  ToJsonCounters(&counters, d, ROOT_COUNTER, counter_map, child_counter_map);
+  if (!counters.Empty()) {
+    parent->AddMember("counters", counters, allocator);
+  }
+
+  // 7. Children Runtime Profiles
+  //
+  // Create copy of children_ so we don't need to hold lock while we call
+  // ToJsonHelper() on the children.
+  ChildVector children;
+  {
+    lock_guard<SpinLock> l(children_lock_);
+    children = children_;
+  }
+
+  if (!children.empty()) {
+    Value child_profiles(kArrayType);
+    for (int i = 0; i < children.size(); ++i) {
+      RuntimeProfileBase* profile = children[i].first;
+      Value child_profile(kObjectType);
+      profile->ToJsonHelper(&child_profile, d);
+      child_profiles.PushBack(child_profile, allocator);
+    }
+    parent->AddMember("child_profiles", child_profiles, allocator);
+  }
+}
+
+void RuntimeProfile::ToJsonSubclass(Value* parent, Document* d) const {
+  Document::AllocatorType& allocator = d->GetAllocator();
+  // 1. Events
   {
     lock_guard<SpinLock> l(event_sequence_lock_);
     if (!event_sequence_map_.empty()) {
@@ -873,32 +980,7 @@
     }
   }
 
-
-  // 6. Counters
-  Value counters(kArrayType);
-  RuntimeProfile::ToJsonCounters(&counters , d, "", counter_map, child_counter_map);
-  if (!counters.Empty()) {
-    parent->AddMember("counters", counters, allocator);
-  }
-
-  // 7. SummaryStatsCounter
-  {
-    lock_guard<SpinLock> l(summary_stats_map_lock_);
-    if (!summary_stats_map_.empty()) {
-      Value summary_stats_counters_json(kArrayType);
-      for (const SummaryStatsCounterMap::value_type& v : summary_stats_map_) {
-        Value summary_stats_counter(kObjectType);
-        Value summary_name_json(v.first.c_str(), v.first.size(), allocator);
-        v.second->ToJson(*d, &summary_stats_counter);
-        summary_stats_counter.AddMember("counter_name", summary_name_json, allocator);
-        summary_stats_counters_json.PushBack(summary_stats_counter, allocator);
-      }
-      parent->AddMember(
-          "summary_stats_counters", summary_stats_counters_json, allocator);
-    }
-  }
-
-  // 8. Time_series_counter_map
+  // 2. Time_series_counter_map
   {
     // Print all time series counters as following:
     //    - <Name> (<period>): <val1>, <val2>, <etc>
@@ -915,25 +997,20 @@
     }
   }
 
-  // 9. Children Runtime Profiles
-  //
-  // Create copy of children_ so we don't need to hold lock while we call
-  // ToJsonHelper() on the children.
-  ChildVector children;
+  // 3. SummaryStatsCounter
   {
-    lock_guard<SpinLock> l(children_lock_);
-    children = children_;
-  }
-
-  if (!children.empty()) {
-    Value child_profiles(kArrayType);
-    for (int i = 0; i < children.size(); ++i) {
-      RuntimeProfile* profile = children[i].first;
-      Value child_profile(kObjectType);
-      profile->ToJsonHelper(&child_profile, d);
-      child_profiles.PushBack(child_profile, allocator);
+    lock_guard<SpinLock> l(summary_stats_map_lock_);
+    if (!summary_stats_map_.empty()) {
+      Value summary_stats_counters_json(kArrayType);
+      for (const SummaryStatsCounterMap::value_type& v : summary_stats_map_) {
+        Value summary_stats_counter(kObjectType);
+        Value summary_name_json(v.first.c_str(), v.first.size(), allocator);
+        v.second->ToJson(*d, &summary_stats_counter);
+        summary_stats_counter.AddMember("counter_name", summary_name_json, allocator);
+        summary_stats_counters_json.PushBack(summary_stats_counter, allocator);
+      }
+      parent->AddMember("summary_stats_counters", summary_stats_counters_json, allocator);
     }
-    parent->AddMember("child_profiles", child_profiles, allocator);
   }
 }
 
@@ -942,7 +1019,7 @@
 //  2. Info Strings
 //  3. Counters
 //  4. Children
-void RuntimeProfile::PrettyPrint(ostream* s, const string& prefix) const {
+void RuntimeProfileBase::PrettyPrint(ostream* s, const string& prefix) const {
   ostream& stream = *s;
 
   // Create copy of counter_map_ and child_counter_map_ so we don't need to hold lock
@@ -955,25 +1032,26 @@
     child_counter_map = child_counter_map_;
   }
 
-  map<string, Counter*>::const_iterator total_time =
-      counter_map.find(TOTAL_TIME_COUNTER_NAME);
-  DCHECK(total_time != counter_map.end());
-
+  int num_input_profiles = GetNumInputProfiles();
+  Counter* total_time = total_time_counter();
+  DCHECK(total_time != nullptr);
   stream.flags(ios::fixed);
-  stream << prefix << name_ << ":";
-  if (total_time->second->value() != 0) {
-    int64_t local_time_frac_int = local_time_frac_.Load();
-    stream << "(Total: "
-           << PrettyPrinter::Print(total_time->second->value(),
-               total_time->second->unit())
+  stream << prefix << name_;
+  if (num_input_profiles != 1) {
+    stream << " [" << num_input_profiles << " instances]";
+  }
+  stream << ":";
+  if (total_time->value() != 0) {
+    stream << "(Total: " << PrettyPrinter::Print(total_time->value(), total_time->unit())
            << ", non-child: "
            << PrettyPrinter::Print(local_time_ns_.Load(), TUnit::TIME_NS)
-           << ", % non-child: "
-           << setprecision(2) << *reinterpret_cast<double*>(&local_time_frac_int) * 100
-           << "%)";
+           << ", % non-child: " << setprecision(2)
+           << BitcastFromInt64<double>(local_time_frac_.Load()) * 100 << "%)";
   }
   stream << endl;
 
+  // TODO: IMPALA-9382: move to RuntimeProfile::PrettyPrintInfoStrings() once we move
+  // 'info_strings_'.
   {
     lock_guard<SpinLock> l(info_strings_lock_);
     for (const string& key: info_strings_display_order_) {
@@ -981,6 +1059,29 @@
     }
   }
 
+  PrettyPrintInfoStrings(s, prefix);
+  PrettyPrintSubclassCounters(s, prefix);
+  RuntimeProfileBase::PrintChildCounters(
+      prefix, ROOT_COUNTER, counter_map, child_counter_map, s);
+
+  // Create copy of children_ so we don't need to hold lock while we call
+  // PrettyPrint() on the children.
+  ChildVector children;
+  {
+    lock_guard<SpinLock> l(children_lock_);
+    children = children_;
+  }
+  for (int i = 0; i < children.size(); ++i) {
+    RuntimeProfileBase* profile = children[i].first;
+    bool indent = children[i].second;
+    profile->PrettyPrint(s, prefix + (indent ? "  " : ""));
+  }
+}
+
+void RuntimeProfile::PrettyPrintInfoStrings(ostream* s, const string& prefix) const {}
+
+void RuntimeProfile::PrettyPrintSubclassCounters(ostream* s, const string& prefix) const {
+  ostream& stream = *s;
   {
     // Print all the event timers as the following:
     // <EventKey> Timeline: 2s719ms
@@ -990,7 +1091,7 @@
     // The times in parentheses are the time elapsed since the last event.
     vector<EventSequence::Event> events;
     lock_guard<SpinLock> l(event_sequence_lock_);
-    for (const EventSequenceMap::value_type& event_sequence: event_sequence_map_) {
+    for (const auto& event_sequence : event_sequence_map_) {
       // If the stopwatch has never been started (e.g. because this sequence came from
       // Thrift), look for the last element to tell us the total runtime. For
       // currently-updating sequences, it's better to use the stopwatch value because that
@@ -1018,7 +1119,7 @@
     // Print all time series counters as following:
     //    - <Name> (<period>): <val1>, <val2>, <etc>
     lock_guard<SpinLock> l(counter_map_lock_);
-    for (const TimeSeriesCounterMap::value_type& v: time_series_counter_map_) {
+    for (const auto& v : time_series_counter_map_) {
       const TimeSeriesCounter* counter = v.second;
       lock_guard<SpinLock> l(counter->lock_);
       int num, period;
@@ -1048,38 +1149,10 @@
     // Print all SummaryStatsCounters as following:
     // <Name>: (Avg: <value> ; Min: <min_value> ; Max: <max_value> ;
     // Number of samples: <total>)
-    for (const SummaryStatsCounterMap::value_type& v: summary_stats_map_) {
-      if (v.second->TotalNumValues() == 0) {
-        // No point printing all the stats if number of samples is zero.
-        stream << prefix << "   - " << v.first << ": "
-               << PrettyPrinter::Print(v.second->value(), v.second->unit(), true)
-               << " (Number of samples: " << v.second->TotalNumValues() << ")" << endl;
-      } else {
-        stream << prefix << "   - " << v.first << ": (Avg: "
-               << PrettyPrinter::Print(v.second->value(), v.second->unit(), true)
-               << " ; Min: "
-               << PrettyPrinter::Print(v.second->MinValue(), v.second->unit(), true)
-               << " ; Max: "
-               << PrettyPrinter::Print(v.second->MaxValue(), v.second->unit(), true)
-               << " ; Number of samples: " << v.second->TotalNumValues() << ")" << endl;
-      }
+    for (const auto& v : summary_stats_map_) {
+      v.second->PrettyPrint(prefix, v.first, s);
     }
   }
-  RuntimeProfile::PrintChildCounters(
-      prefix, ROOT_COUNTER, counter_map, child_counter_map, s);
-
-  // Create copy of children_ so we don't need to hold lock while we call
-  // PrettyPrint() on the children.
-  ChildVector children;
-  {
-    lock_guard<SpinLock> l(children_lock_);
-    children = children_;
-  }
-  for (int i = 0; i < children.size(); ++i) {
-    RuntimeProfile* profile = children[i].first;
-    bool indent = children[i].second;
-    profile->PrettyPrint(s, prefix + (indent ? "  " : ""));
-  }
 }
 
 Status RuntimeProfile::Compress(vector<uint8_t>* out) const {
@@ -1182,11 +1255,12 @@
 
 void RuntimeProfile::ToThrift(TRuntimeProfileTree* tree) const {
   tree->nodes.clear();
-  ToThrift(&tree->nodes);
+  ToThriftHelper(&tree->nodes);
   ExecSummaryToThrift(tree);
+  if (FLAGS_gen_experimental_profile) tree->__set_profile_version(2);
 }
 
-void RuntimeProfile::ToThrift(vector<TRuntimeProfileNode>* nodes) const {
+void RuntimeProfileBase::ToThriftHelper(vector<TRuntimeProfileNode>* nodes) const {
   // Use a two-pass approach where we first collect nodes with a pre-order traversal and
   // then serialize them. This is done to allow reserving the full vector of
   // TRuntimeProfileNodes upfront - copying the constructed nodes when resizing the vector
@@ -1198,13 +1272,13 @@
   for (CollectedNode& preorder_node : preorder_nodes) {
     nodes->emplace_back();
     TRuntimeProfileNode& node = nodes->back();
-    preorder_node.node->ToThrift(&node);
+    preorder_node.node->ToThriftHelper(&node);
     node.indent = preorder_node.indent;
     node.num_children = preorder_node.num_children;
   }
 }
 
-void RuntimeProfile::ToThrift(TRuntimeProfileNode* out_node) const {
+void RuntimeProfileBase::ToThriftHelper(TRuntimeProfileNode* out_node) const {
   // Use a reference to reduce code churn. TODO: clean this up to use a pointer later.
   TRuntimeProfileNode& node = *out_node;
   node.name = name_;
@@ -1228,40 +1302,52 @@
     std::copy(counter_map_.begin(), counter_map_.end(),
         std::back_inserter(counter_map_entries));
     node.child_counters_map = child_counter_map_;
-
-    if (time_series_counter_map_.size() != 0) {
-      node.__isset.time_series_counters = true;
-      node.time_series_counters.reserve(time_series_counter_map_.size());
-      for (const auto& val : time_series_counter_map_) {
-        node.time_series_counters.emplace_back();
-        val.second->ToThrift(&node.time_series_counters.back());
-      }
-    }
   }
-  node.counters.reserve(counter_map_entries.size());
+
+  // TODO: IMPALA-9382: move to RuntimeProfile::ToThriftSubclass() once 'info_strings_'
+  // is moved.
+  {
+    lock_guard<SpinLock> l(info_strings_lock_);
+    out_node->info_strings = info_strings_;
+    out_node->info_strings_display_order = info_strings_display_order_;
+  }
+
+  ToThriftSubclass(counter_map_entries, out_node);
+}
+
+void RuntimeProfile::ToThriftSubclass(
+    vector<pair<const string&, const Counter*>>& counter_map_entries,
+    TRuntimeProfileNode* out_node) const {
+  out_node->counters.reserve(counter_map_entries.size());
   for (const auto& entry : counter_map_entries) {
-    node.counters.emplace_back();
-    TCounter& counter = node.counters.back();
+    out_node->counters.emplace_back();
+    TCounter& counter = out_node->counters.back();
     counter.name = entry.first;
     counter.value = entry.second->value();
     counter.unit = entry.second->unit();
   }
 
   {
-    lock_guard<SpinLock> l(info_strings_lock_);
-    node.info_strings = info_strings_;
-    node.info_strings_display_order = info_strings_display_order_;
+    lock_guard<SpinLock> l(counter_map_lock_);
+    if (time_series_counter_map_.size() != 0) {
+      out_node->__isset.time_series_counters = true;
+      out_node->time_series_counters.reserve(time_series_counter_map_.size());
+      for (const auto& val : time_series_counter_map_) {
+        out_node->time_series_counters.emplace_back();
+        val.second->ToThrift(&out_node->time_series_counters.back());
+      }
+    }
   }
 
   {
     vector<EventSequence::Event> events;
     lock_guard<SpinLock> l(event_sequence_lock_);
     if (event_sequence_map_.size() != 0) {
-      node.__isset.event_sequences = true;
-      node.event_sequences.reserve(event_sequence_map_.size());
+      out_node->__isset.event_sequences = true;
+      out_node->event_sequences.reserve(event_sequence_map_.size());
       for (const auto& val : event_sequence_map_) {
-        node.event_sequences.emplace_back();
-        TEventSequence& seq = node.event_sequences.back();
+        out_node->event_sequences.emplace_back();
+        TEventSequence& seq = out_node->event_sequences.back();
         seq.name = val.first;
         val.second->GetEvents(&events);
         seq.labels.reserve(events.size());
@@ -1277,17 +1363,17 @@
   {
     lock_guard<SpinLock> l(summary_stats_map_lock_);
     if (summary_stats_map_.size() != 0) {
-      node.__isset.summary_stats_counters = true;
-      node.summary_stats_counters.resize(summary_stats_map_.size());
+      out_node->__isset.summary_stats_counters = true;
+      out_node->summary_stats_counters.resize(summary_stats_map_.size());
       int idx = 0;
       for (const SummaryStatsCounterMap::value_type& val: summary_stats_map_) {
-        val.second->ToThrift(&node.summary_stats_counters[idx++], val.first);
+        val.second->ToThrift(&out_node->summary_stats_counters[idx++], val.first);
       }
     }
   }
 }
 
-void RuntimeProfile::CollectNodes(bool indent, vector<CollectedNode>* nodes) const {
+void RuntimeProfileBase::CollectNodes(bool indent, vector<CollectedNode>* nodes) const {
   lock_guard<SpinLock> l(children_lock_);
   nodes->emplace_back(this, indent, children_.size());
   for (const auto& child : children_) {
@@ -1316,7 +1402,7 @@
 }
 
 int64_t RuntimeProfile::UnitsPerSecond(
-    const RuntimeProfile::Counter* total_counter, const RuntimeProfile::Counter* timer) {
+    const Counter* total_counter, const Counter* timer) {
   DCHECK(total_counter->unit() == TUnit::BYTES || total_counter->unit() == TUnit::UNIT);
   DCHECK(timer->unit() == TUnit::TIME_NS);
 
@@ -1333,7 +1419,7 @@
   return value;
 }
 
-RuntimeProfile::Counter* RuntimeProfile::AddRateCounter(
+RuntimeProfileBase::Counter* RuntimeProfile::AddRateCounter(
     const string& name, Counter* src_counter) {
   TUnit::type dst_unit;
   switch (src_counter->unit()) {
@@ -1350,7 +1436,7 @@
   {
     lock_guard<SpinLock> l(counter_map_lock_);
     bool created;
-    Counter* dst_counter = AddCounterLocked(name, dst_unit, "", &created);
+    Counter* dst_counter = AddCounterLocked(name, dst_unit, ROOT_COUNTER, &created);
     if (!created) return dst_counter;
     rate_counters_.push_back(dst_counter);
     PeriodicCounterUpdater::RegisterPeriodicCounter(src_counter, NULL, dst_counter,
@@ -1360,11 +1446,11 @@
   }
 }
 
-RuntimeProfile::Counter* RuntimeProfile::AddRateCounter(
+RuntimeProfileBase::Counter* RuntimeProfile::AddRateCounter(
     const string& name, SampleFunction fn, TUnit::type dst_unit) {
   lock_guard<SpinLock> l(counter_map_lock_);
   bool created;
-  Counter* dst_counter = AddCounterLocked(name, dst_unit, "", &created);
+  Counter* dst_counter = AddCounterLocked(name, dst_unit, ROOT_COUNTER, &created);
   if (!created) return dst_counter;
   rate_counters_.push_back(dst_counter);
   PeriodicCounterUpdater::RegisterPeriodicCounter(NULL, fn, dst_counter,
@@ -1373,12 +1459,13 @@
   return dst_counter;
 }
 
-RuntimeProfile::Counter* RuntimeProfile::AddSamplingCounter(
+RuntimeProfileBase::Counter* RuntimeProfile::AddSamplingCounter(
     const string& name, Counter* src_counter) {
   DCHECK(src_counter->unit() == TUnit::UNIT);
   lock_guard<SpinLock> l(counter_map_lock_);
   bool created;
-  Counter* dst_counter = AddCounterLocked(name, TUnit::DOUBLE_VALUE, "", &created);
+  Counter* dst_counter =
+      AddCounterLocked(name, TUnit::DOUBLE_VALUE, ROOT_COUNTER, &created);
   if (!created) return dst_counter;
   sampling_counters_.push_back(dst_counter);
   PeriodicCounterUpdater::RegisterPeriodicCounter(src_counter, NULL, dst_counter,
@@ -1387,11 +1474,12 @@
   return dst_counter;
 }
 
-RuntimeProfile::Counter* RuntimeProfile::AddSamplingCounter(
+RuntimeProfileBase::Counter* RuntimeProfile::AddSamplingCounter(
     const string& name, SampleFunction sample_fn) {
   lock_guard<SpinLock> l(counter_map_lock_);
   bool created;
-  Counter* dst_counter = AddCounterLocked(name, TUnit::DOUBLE_VALUE, "", &created);
+  Counter* dst_counter =
+      AddCounterLocked(name, TUnit::DOUBLE_VALUE, ROOT_COUNTER, &created);
   if (!created) return dst_counter;
   sampling_counters_.push_back(dst_counter);
   PeriodicCounterUpdater::RegisterPeriodicCounter(NULL, sample_fn, dst_counter,
@@ -1400,13 +1488,12 @@
   return dst_counter;
 }
 
-vector<RuntimeProfile::Counter*>* RuntimeProfile::AddBucketingCounters(
+vector<RuntimeProfileBase::Counter*>* RuntimeProfile::AddBucketingCounters(
     Counter* src_counter, int num_buckets) {
   lock_guard<SpinLock> l(counter_map_lock_);
-  vector<RuntimeProfile::Counter*>* buckets = pool_->Add(new vector<Counter*>);
+  vector<Counter*>* buckets = pool_->Add(new vector<Counter*>);
   for (int i = 0; i < num_buckets; ++i) {
-      buckets->push_back(
-          pool_->Add(new RuntimeProfile::Counter(TUnit::DOUBLE_VALUE, 0)));
+    buckets->push_back(pool_->Add(new Counter(TUnit::DOUBLE_VALUE, 0)));
   }
   bucketing_counters_.insert(buckets);
   has_active_periodic_counters_ = true;
@@ -1435,7 +1522,7 @@
   return timer;
 }
 
-void RuntimeProfile::PrintChildCounters(const string& prefix,
+void RuntimeProfileBase::PrintChildCounters(const string& prefix,
     const string& counter_name, const CounterMap& counter_map,
     const ChildCounterMap& child_counter_map, ostream* s) {
   ostream& stream = *s;
@@ -1445,18 +1532,15 @@
     for (const string& child_counter: child_counters) {
       CounterMap::const_iterator iter = counter_map.find(child_counter);
       if (iter == counter_map.end()) continue;
-      stream << prefix << "   - " << iter->first << ": "
-             << PrettyPrinter::Print(iter->second->value(), iter->second->unit(), true)
-             << endl;
-      RuntimeProfile::PrintChildCounters(prefix + "  ", child_counter, counter_map,
-          child_counter_map, s);
+      iter->second->PrettyPrint(prefix, iter->first, &stream);
+      RuntimeProfileBase::PrintChildCounters(
+          prefix + "  ", child_counter, counter_map, child_counter_map, s);
     }
   }
 }
 
-RuntimeProfile::SummaryStatsCounter* RuntimeProfile::AddSummaryStatsCounter(
+RuntimeProfileBase::SummaryStatsCounter* RuntimeProfile::AddSummaryStatsCounter(
     const string& name, TUnit::type unit, const std::string& parent_counter_name) {
-  DCHECK_EQ(is_averaged_profile_, false);
   lock_guard<SpinLock> l(summary_stats_map_lock_);
   if (summary_stats_map_.find(name) != summary_stats_map_.end()) {
     return summary_stats_map_[name];
@@ -1589,11 +1673,7 @@
   return counter;
 }
 
-void RuntimeProfile::ClearChunkedTimeSeriesCounters() {
-  {
-    lock_guard<SpinLock> l(counter_map_lock_);
-    for (auto& it : time_series_counter_map_) it.second->Clear();
-  }
+void RuntimeProfileBase::ClearChunkedTimeSeriesCounters() {
   {
     lock_guard<SpinLock> l(children_lock_);
     for (int i = 0; i < children_.size(); ++i) {
@@ -1602,6 +1682,14 @@
   }
 }
 
+void RuntimeProfile::ClearChunkedTimeSeriesCounters() {
+  {
+    lock_guard<SpinLock> l(counter_map_lock_);
+    for (auto& it : time_series_counter_map_) it.second->Clear();
+  }
+  RuntimeProfileBase::ClearChunkedTimeSeriesCounters();
+}
+
 void RuntimeProfile::TimeSeriesCounter::ToThrift(TTimeSeriesCounter* counter) {
   lock_guard<SpinLock> l(lock_);
   int num, period;
@@ -1626,8 +1714,153 @@
   }
 }
 
-void RuntimeProfile::SummaryStatsCounter::ToThrift(TSummaryStatsCounter* counter,
-    const std::string& name) {
+RuntimeProfileBase::AveragedCounter::AveragedCounter(TUnit::type unit, int num_samples)
+  : Counter(unit),
+    num_values_(num_samples),
+    has_value_(make_unique<AtomicBool[]>(num_samples)),
+    values_(make_unique<AtomicInt64[]>(num_samples)) {}
+
+RuntimeProfileBase::AveragedCounter::AveragedCounter(TUnit::type unit,
+    const std::vector<bool>& has_value, const std::vector<int64_t>& values)
+  : Counter(unit),
+    num_values_(values.size()),
+    has_value_(make_unique<AtomicBool[]>(values.size())),
+    values_(make_unique<AtomicInt64[]>(values.size())) {
+  DCHECK_EQ(has_value.size(), values.size());
+  for (int i = 0; i < values.size(); ++i) {
+    if (has_value[i]) {
+      has_value_[i].Store(true);
+      values_[i].Store(values[i]);
+    }
+  }
+}
+
+void RuntimeProfileBase::AveragedCounter::UpdateCounter(Counter* new_counter, int idx) {
+  DCHECK_EQ(new_counter->unit(), unit_);
+  DCHECK_GE(idx, 0);
+  DCHECK_LT(idx, num_values_);
+  if (unit_ == TUnit::DOUBLE_VALUE) {
+    double new_val = new_counter->double_value();
+    values_[idx].Store(BitcastToInt64(new_val));
+  } else {
+    values_[idx].Store(new_counter->value());
+  }
+  // Set has_value_ after the value is valid above so that readers won't
+  // see invalid values.
+  has_value_[idx].Store(true);
+}
+
+int64_t RuntimeProfileBase::AveragedCounter::value() const {
+  return unit_ == TUnit::DOUBLE_VALUE ? ComputeMean<double>() : ComputeMean<int64_t>();
+}
+
+template <typename T>
+int64_t RuntimeProfileBase::AveragedCounter::ComputeMean() const {
+  // Compute the mean in a single pass over the input. We could instead call GetStats(),
+  // but this would add some additional overhead when serializing thrift profiles, which
+  // include the mean counter values.
+  T sum = 0;
+  int num_vals = 0;
+  for (int i = 0; i < num_values_; ++i) {
+    if (has_value_[i].Load()) {
+      sum += BitcastFromInt64<T>(values_[i].Load());
+      ++num_vals;
+    }
+  }
+  if (num_vals == 0) return 0; // Avoid divide-by-zero.
+  return BitcastToInt64(sum / num_vals);
+}
+
+template <typename T>
+RuntimeProfileBase::AveragedCounter::Stats<T>
+RuntimeProfileBase::AveragedCounter::GetStats() const {
+  static_assert(std::is_same<T, int64_t>::value || std::is_same<T, double>::value,
+      "Only double and int64_t are supported");
+  DCHECK_EQ(unit_ == TUnit::DOUBLE_VALUE, (std::is_same<T, double>::value));
+  vector<T> vals;
+  vals.reserve(num_values_);
+  for (int i = 0; i < num_values_; ++i) {
+    if (has_value_[i].Load()) vals.push_back(BitcastFromInt64<T>(values_[i].Load()));
+  }
+  Stats<T> result;
+
+  if (!vals.empty()) {
+    sort(vals.begin(), vals.end());
+    result.num_vals = vals.size();
+    result.mean = std::accumulate(vals.begin(), vals.end(), 0) / result.num_vals;
+    result.min = vals[0];
+    result.max = vals.back();
+    int end_idx = vals.size() - 1;
+    result.p50 = vals[end_idx / 2];
+    result.p75 = vals[end_idx * 3 / 4];
+    result.p90 = vals[end_idx * 9 / 10];
+    result.p95 = vals[end_idx * 19 / 20];
+  }
+  return result;
+}
+
+void RuntimeProfileBase::AveragedCounter::PrettyPrint(
+    const string& prefix, const string& name, ostream* s) const {
+  if (unit_ == TUnit::DOUBLE_VALUE) {
+    PrettyPrintImpl<double>(prefix, name, s);
+  } else {
+    PrettyPrintImpl<int64_t>(prefix, name, s);
+  }
+}
+
+template <typename T>
+void RuntimeProfileBase::AveragedCounter::PrettyPrintImpl(
+    const string& prefix, const string& name, ostream* s) const {
+  Stats<T> stats = GetStats<T>();
+  (*s) << prefix << "   - " << name << ": ";
+  if (!FLAGS_gen_experimental_profile || stats.num_vals == 1) {
+    (*s) << PrettyPrinter::Print(stats.mean, unit_, true) << endl;
+    return;
+  }
+
+  // For counters with <> 1 values, show summary stats and the values.
+  (*s) << "mean=" << PrettyPrinter::Print(BitcastToInt64(stats.mean), unit_, true)
+       << " min=" << PrettyPrinter::Print(BitcastToInt64(stats.min), unit_, true)
+       << " p50=" << PrettyPrinter::Print(BitcastToInt64(stats.p50), unit_, true)
+       << " p75=" << PrettyPrinter::Print(BitcastToInt64(stats.p75), unit_, true)
+       << " p90=" << PrettyPrinter::Print(BitcastToInt64(stats.p90), unit_, true)
+       << " p95=" << PrettyPrinter::Print(BitcastToInt64(stats.p95), unit_, true)
+       << " max=" << PrettyPrinter::Print(BitcastToInt64(stats.max), unit_, true) << endl;
+  // Dump out individual values if they are not all identical.
+  if (stats.min != stats.max) {
+    (*s) << prefix << "     [";
+    for (int i = 0; i < num_values_; ++i) {
+      if (i != 0) {
+        if (i % 8 == 0) {
+          (*s) << ",\n" << prefix << "      ";
+        } else {
+          (*s) << ", ";
+        }
+      }
+      if (has_value_[i].Load()) {
+        (*s) << PrettyPrinter::Print(values_[i].Load(), unit_, false);
+      } else {
+        (*s) << "_";
+      }
+    }
+    (*s) << "]\n";
+  }
+}
+
+void RuntimeProfileBase::AveragedCounter::ToThrift(
+    const string& name, TAggCounter* tcounter) const {
+  tcounter->name = name;
+  tcounter->unit = unit_;
+  tcounter->has_value.resize(num_values_);
+  tcounter->values.resize(num_values_);
+  for (int i = 0; i < num_values_; ++i) {
+    tcounter->has_value[i] = has_value_[i].Load();
+    tcounter->values[i] = values_[i].Load();
+  }
+}
+
+void RuntimeProfileBase::SummaryStatsCounter::ToThrift(
+    TSummaryStatsCounter* counter, const std::string& name) {
   lock_guard<SpinLock> l(lock_);
   counter->name = name;
   counter->unit = unit_;
@@ -1637,7 +1870,30 @@
   counter->max_value = max_;
 }
 
-void RuntimeProfile::SummaryStatsCounter::UpdateCounter(int64_t new_value) {
+void RuntimeProfileBase::SummaryStatsCounter::ToThrift(const string& name,
+    TUnit::type unit, const vector<SummaryStatsCounter*>& counters,
+    TAggSummaryStatsCounter* tcounter) {
+  int num_vals = counters.size();
+  tcounter->name = name;
+  tcounter->unit = unit;
+  tcounter->has_value.resize(num_vals);
+  tcounter->sum.resize(num_vals);
+  tcounter->total_num_values.resize(num_vals);
+  tcounter->min_value.resize(num_vals);
+  tcounter->max_value.resize(num_vals);
+  for (int i = 0; i < num_vals; ++i) {
+    SummaryStatsCounter* counter = counters[i];
+    if (counter == nullptr) continue;
+    lock_guard<SpinLock> l(counter->lock_);
+    tcounter->has_value[i] = true;
+    tcounter->sum[i] = counter->sum_;
+    tcounter->min_value[i] = counter->min_;
+    tcounter->max_value[i] = counter->max_;
+    tcounter->total_num_values[i] = counter->total_num_values_;
+  }
+}
+
+void RuntimeProfileBase::SummaryStatsCounter::UpdateCounter(int64_t new_value) {
   lock_guard<SpinLock> l(lock_);
 
   ++total_num_values_;
@@ -1648,7 +1904,8 @@
   if (new_value > max_) max_ = new_value;
 }
 
-void RuntimeProfile::SummaryStatsCounter::SetStats(const TSummaryStatsCounter& counter) {
+void RuntimeProfileBase::SummaryStatsCounter::SetStats(
+    const TSummaryStatsCounter& counter) {
   // We drop this input if it looks malformed.
   if (counter.total_num_values < 0) return;
   lock_guard<SpinLock> l(lock_);
@@ -1661,22 +1918,67 @@
   value_.Store(total_num_values_ == 0 ? 0 : sum_ / total_num_values_);
 }
 
-int64_t RuntimeProfile::SummaryStatsCounter::MinValue() {
+void RuntimeProfileBase::SummaryStatsCounter::SetStats(const SummaryStatsCounter& other) {
+  lock_guard<SpinLock> ol(other.lock_);
+  lock_guard<SpinLock> l(lock_);
+  DCHECK_EQ(unit_, other.unit_);
+  total_num_values_ = other.total_num_values_;
+  min_ = other.min_;
+  max_ = other.max_;
+  sum_ = other.sum_;
+  value_.Store(total_num_values_ == 0 ? 0 : sum_ / total_num_values_);
+}
+
+void RuntimeProfileBase::SummaryStatsCounter::Merge(const SummaryStatsCounter& other) {
+  lock_guard<SpinLock> ol(other.lock_);
+  lock_guard<SpinLock> l(lock_);
+  total_num_values_ += other.total_num_values_;
+  min_ = min(min_, other.min_);
+  max_ = max(max_, other.max_);
+  sum_ += other.sum_;
+  value_.Store(total_num_values_ == 0 ? 0 : sum_ / total_num_values_);
+}
+
+int64_t RuntimeProfileBase::SummaryStatsCounter::MinValue() {
   lock_guard<SpinLock> l(lock_);
   return min_;
 }
 
-int64_t RuntimeProfile::SummaryStatsCounter::MaxValue() {
+int64_t RuntimeProfileBase::SummaryStatsCounter::MaxValue() {
   lock_guard<SpinLock> l(lock_);
   return max_;
 }
 
-int32_t RuntimeProfile::SummaryStatsCounter::TotalNumValues() {
+int32_t RuntimeProfileBase::SummaryStatsCounter::TotalNumValues() {
   lock_guard<SpinLock> l(lock_);
   return total_num_values_;
 }
 
-void RuntimeProfile::Counter::ToJson(Document& document, Value* val) const {
+void RuntimeProfileBase::SummaryStatsCounter::PrettyPrint(
+    const string& prefix, const string& name, ostream* s) const {
+  ostream& stream = *s;
+  stream << prefix << "   - " << name << ": ";
+  lock_guard<SpinLock> l(lock_);
+  if (total_num_values_ == 0) {
+    // No point printing all the stats if number of samples is zero.
+    stream << PrettyPrinter::Print(value_.Load(), unit_, true)
+           << " (Number of samples: " << total_num_values_ << ")";
+  } else {
+    stream << "(Avg: " << PrettyPrinter::Print(value_.Load(), unit_, true)
+           << " ; Min: " << PrettyPrinter::Print(min_, unit_, true)
+           << " ; Max: " << PrettyPrinter::Print(max_, unit_, true)
+           << " ; Number of samples: " << total_num_values_ << ")";
+  }
+  stream << endl;
+}
+
+void RuntimeProfileBase::Counter::PrettyPrint(
+    const string& prefix, const string& name, ostream* s) const {
+  (*s) << prefix << "   - " << name << ": " << PrettyPrinter::Print(value(), unit_, true)
+       << endl;
+}
+
+void RuntimeProfileBase::Counter::ToJson(Document& document, Value* val) const {
   Value counter_json(kObjectType);
   counter_json.AddMember("value", value(), document.GetAllocator());
   auto unit_itr = _TUnit_VALUES_TO_NAMES.find(unit_);
@@ -1739,4 +2041,229 @@
   *value = event_sequence_json;
 }
 
+AggregatedRuntimeProfile::AggregatedRuntimeProfile(
+    ObjectPool* pool, const string& name, int num_input_profiles, bool is_root)
+  : RuntimeProfileBase(pool, name), num_input_profiles_(num_input_profiles) {
+  DCHECK_GE(num_input_profiles, 0);
+  if (is_root) input_profile_names_.resize(num_input_profiles);
+  set<string>& root_counters = child_counter_map_[ROOT_COUNTER];
+  Counter* total_time_counter =
+      pool->Add(new AveragedCounter(TUnit::TIME_NS, num_input_profiles));
+  Counter* inactive_timer =
+      pool->Add(new AveragedCounter(TUnit::TIME_NS, num_input_profiles));
+  counter_map_[TOTAL_TIME_COUNTER_NAME] = total_time_counter;
+  root_counters.emplace(TOTAL_TIME_COUNTER_NAME);
+  counter_map_[INACTIVE_TIME_COUNTER_NAME] = inactive_timer;
+  root_counters.emplace(INACTIVE_TIME_COUNTER_NAME);
+}
+
+AggregatedRuntimeProfile* AggregatedRuntimeProfile::Create(
+    ObjectPool* pool, const string& name, int num_input_profiles, bool is_root) {
+  return pool->Add(new AggregatedRuntimeProfile(pool, name, num_input_profiles, is_root));
+}
+
+void AggregatedRuntimeProfile::InitFromThrift(
+    const TRuntimeProfileNode& node, ObjectPool* pool) {
+  DCHECK(node.__isset.aggregated);
+  DCHECK(node.aggregated.__isset.counters);
+
+  if (node.aggregated.__isset.input_profiles) {
+    input_profile_names_ = node.aggregated.input_profiles;
+  }
+  for (const TAggCounter& counter : node.aggregated.counters) {
+    counter_map_[counter.name] =
+        pool->Add(new AveragedCounter(counter.unit, counter.has_value, counter.values));
+  }
+  DCHECK(node.aggregated.__isset.info_strings);
+  for (const auto& entry : node.aggregated.info_strings) {
+    vector<string>& per_instance_vals = agg_info_strings_[entry.first];
+    if (per_instance_vals.empty()) {
+      per_instance_vals.resize(num_input_profiles_);
+    }
+    for (const auto& distinct_val_entry : entry.second) {
+      for (int32_t idx : distinct_val_entry.second) {
+        per_instance_vals[idx] = distinct_val_entry.first;
+      }
+    }
+  }
+
+  if (node.aggregated.__isset.summary_stats_counters) {
+    for (const TAggSummaryStatsCounter& val : node.aggregated.summary_stats_counters) {
+      DCHECK_GT(num_input_profiles_, 0);
+      DCHECK_EQ(val.sum.size(), num_input_profiles_);
+      DCHECK_EQ(val.total_num_values.size(), num_input_profiles_);
+      DCHECK_EQ(val.min_value.size(), num_input_profiles_);
+      DCHECK_EQ(val.max_value.size(), num_input_profiles_);
+      auto& entry = summary_stats_map_[val.name];
+      entry.first = val.unit;
+      vector<SummaryStatsCounter*>& instance_counters = entry.second;
+      instance_counters.resize(num_input_profiles_);
+      for (int idx = 0; idx < num_input_profiles_; ++idx) {
+        if (!val.has_value[idx]) continue;
+        instance_counters[idx] =
+            pool->Add(new SummaryStatsCounter(val.unit, val.total_num_values[idx],
+                val.min_value[idx], val.max_value[idx], val.sum[idx]));
+      }
+    }
+  }
+}
+
+// Print a sorted vector of indices in compressed form with subsequent indices
+// printed as ranges. E.g. [1, 2, 3, 4, 6] would result in "1-4,6".
+static void PrettyPrintIndexRanges(ostream* s, const vector<int32_t>& indices) {
+  if (indices.empty()) return;
+  ostream& stream = *s;
+  int32_t start_idx = indices[0];
+  int32_t prev_idx = indices[0];
+  for (int i = 0; i < indices.size(); ++i) {
+    int32_t idx = indices[i];
+    if (idx > prev_idx + 1) {
+      // Start of a new range. Print the previous range of values.
+      stream << start_idx;
+      if (start_idx < prev_idx) stream << "-" << prev_idx;
+      stream << ",";
+      start_idx = idx;
+    }
+    prev_idx = idx;
+  }
+  // Print the last range of values.
+  stream << start_idx;
+  if (start_idx < prev_idx) stream << "-" << prev_idx;
+}
+void AggregatedRuntimeProfile::PrettyPrintInfoStrings(
+    ostream* s, const string& prefix) const {
+  // Aggregated info strings are only shown in experimental profile.
+  if (!FLAGS_gen_experimental_profile) return;
+  ostream& stream = *s;
+  {
+    lock_guard<SpinLock> l(input_profile_name_lock_);
+    if (!input_profile_names_.empty()) {
+      // TODO: IMPALA-9382: improve pretty-printing here
+      stream << prefix
+             << "Instances: " << boost::algorithm::join(input_profile_names_, ", ")
+             << endl;
+    }
+  }
+
+  {
+    lock_guard<SpinLock> l(agg_info_strings_lock_);
+    for (const auto& entry : agg_info_strings_) {
+      map<string, vector<int32_t>> distinct_vals = GroupDistinctInfoStrings(entry.second);
+      for (const auto& distinct_entry : distinct_vals) {
+        stream << prefix << "  " << entry.first;
+        stream << "[";
+        PrettyPrintIndexRanges(s, distinct_entry.second);
+        stream << "]: " << distinct_entry.first << endl;
+      }
+    }
+  }
+}
+
+void AggregatedRuntimeProfile::PrettyPrintSubclassCounters(
+    ostream* s, const string& prefix) const {
+  // Hide aggregated state when we are not using the transposed profile
+  // format.
+  if (!FLAGS_gen_experimental_profile) return;
+  {
+    lock_guard<SpinLock> l(summary_stats_map_lock_);
+    for (const auto& v : summary_stats_map_) {
+      // Display fully aggregated stats first.
+      SummaryStatsCounter aggregated_stats(v.second.first);
+      AggregateSummaryStats(v.second.second, &aggregated_stats);
+      aggregated_stats.PrettyPrint(prefix, v.first, s);
+      // Display per-instance stats, if there is more than one instance.
+      if (v.second.second.size() > 1) {
+        for (int idx = 0; idx < v.second.second.size(); ++idx) {
+          if (v.second.second[idx] == nullptr) continue;
+          const string& per_instance_prefix = Substitute("$0[$1]", prefix, idx);
+          aggregated_stats.PrettyPrint(per_instance_prefix, v.first, s);
+        }
+      }
+    }
+  }
+}
+
+void AggregatedRuntimeProfile::ToThriftSubclass(
+    vector<pair<const string&, const Counter*>>& counter_map_entries,
+    TRuntimeProfileNode* out_node) const {
+  out_node->__isset.aggregated = true;
+  out_node->aggregated.__set_num_instances(num_input_profiles_);
+  if (!input_profile_names_.empty()) {
+    out_node->aggregated.__isset.input_profiles = true;
+    out_node->aggregated.input_profiles = input_profile_names_;
+  }
+
+  // Populate both the aggregated counters, which contain the full information from
+  // the counters, and the regular counters, which can be understood by older
+  // readers.
+  out_node->counters.reserve(counter_map_entries.size());
+  out_node->aggregated.__isset.counters = true;
+  out_node->aggregated.counters.reserve(counter_map_entries.size());
+  for (const auto& entry : counter_map_entries) {
+    out_node->counters.emplace_back();
+    TCounter& counter = out_node->counters.back();
+    counter.name = entry.first;
+    counter.value = entry.second->value();
+    counter.unit = entry.second->unit();
+
+    const AveragedCounter* avg_counter =
+        dynamic_cast<const AveragedCounter*>(entry.second);
+    DCHECK(avg_counter != nullptr) << entry.first;
+    out_node->aggregated.counters.emplace_back();
+    avg_counter->ToThrift(entry.first, &out_node->aggregated.counters.back());
+  }
+
+  out_node->aggregated.__isset.info_strings = true;
+  {
+    lock_guard<SpinLock> l(agg_info_strings_lock_);
+    for (const auto& entry : agg_info_strings_) {
+      out_node->aggregated.info_strings[entry.first] =
+          GroupDistinctInfoStrings(entry.second);
+    }
+  }
+
+  out_node->__isset.summary_stats_counters = true;
+  {
+    lock_guard<SpinLock> l(summary_stats_map_lock_);
+    out_node->summary_stats_counters.resize(summary_stats_map_.size());
+    out_node->aggregated.__isset.summary_stats_counters = true;
+    out_node->aggregated.summary_stats_counters.resize(summary_stats_map_.size());
+    int counter_idx = 0;
+    for (const auto& entry : summary_stats_map_) {
+      DCHECK_GT(entry.second.second.size(), 0)
+          << "no counters can be created w/o instances";
+      DCHECK_EQ(num_input_profiles_, entry.second.second.size());
+      SummaryStatsCounter::ToThrift(entry.first, entry.second.first, entry.second.second,
+          &out_node->aggregated.summary_stats_counters[counter_idx]);
+      // Compute summarized stats to include at the top level.
+      SummaryStatsCounter aggregated_stats(entry.second.first);
+      AggregateSummaryStats(entry.second.second, &aggregated_stats);
+      aggregated_stats.ToThrift(
+          &out_node->summary_stats_counters[counter_idx++], entry.first);
+    }
+  }
+}
+
+void AggregatedRuntimeProfile::ToJsonSubclass(Value* parent, Document* d) const {
+  // TODO: IMPALA-9382: add Json aggregated profile.
+}
+
+map<string, vector<int32_t>> AggregatedRuntimeProfile::GroupDistinctInfoStrings(
+    const vector<string>& info_string_values) const {
+  map<string, vector<int32_t>> distinct_vals;
+  DCHECK_EQ(info_string_values.size(), num_input_profiles_);
+  for (int idx = 0; idx < num_input_profiles_; ++idx) {
+    if (!info_string_values[idx].empty()) {
+      distinct_vals[info_string_values[idx]].push_back(idx);
+    }
+  }
+  return distinct_vals;
+}
+
+void AggregatedRuntimeProfile::AggregateSummaryStats(
+    const vector<SummaryStatsCounter*> counters, SummaryStatsCounter* result) {
+  for (SummaryStatsCounter* counter : counters) {
+    if (counter != nullptr) result->Merge(*counter);
+  }
+}
 }
diff --git a/be/src/util/runtime-profile.h b/be/src/util/runtime-profile.h
index b55ac54..db8f6e6 100644
--- a/be/src/util/runtime-profile.h
+++ b/be/src/util/runtime-profile.h
@@ -30,7 +30,10 @@
 namespace impala {
 
 class ObjectPool;
+class RuntimeProfile;
 
+/// RuntimeProfileBase is the common subclass of all runtime profiles.
+///
 /// Runtime profile is a group of profiling counters.  It supports adding named counters
 /// and being able to serialize and deserialize them.
 /// The profiles support a tree structure to form a hierarchy of counters.
@@ -76,8 +79,7 @@
 ///         of previously retrieved values.
 ///
 /// All methods are thread-safe unless otherwise mentioned.
-class RuntimeProfile { // NOLINT: This struct is not packed, but there are not so many
-                       // of them that it makes a performance difference
+class RuntimeProfileBase {
  public:
   class Counter {
    public:
@@ -111,11 +113,16 @@
 
     virtual int64_t value() const { return value_.Load(); }
 
-    virtual double double_value() const {
-      int64_t v = value_.Load();
+    double double_value() const {
+      int64_t v = value();
       return *reinterpret_cast<const double*>(&v);
     }
 
+    /// Prints the contents of the counter in a name: value format, prefixed on
+    /// each line by 'prefix' and terminated with a newline.
+    virtual void PrettyPrint(
+        const std::string& prefix, const std::string& name, std::ostream* s) const;
+
     /// Builds a new Value into 'val', using (if required) the allocator from
     /// 'document'. Should set the following fields where appropriate:
     /// counter_name, value, kind, unit
@@ -131,30 +138,245 @@
   };
 
   class AveragedCounter;
+  class SummaryStatsCounter;
+
+  typedef boost::function<int64_t()> SampleFunction;
+
+  virtual ~RuntimeProfileBase();
+
+  /// Gets the counter object with 'name'.  Returns NULL if there is no counter with
+  /// that name.
+  Counter* GetCounter(const std::string& name);
+
+  /// Adds all counters with 'name' that are registered either in this or
+  /// in any of the child profiles to 'counters'.
+  void GetCounters(const std::string& name, std::vector<Counter*>* counters);
+
+  /// Recursively compute the fraction of the 'total_time' spent in this profile and
+  /// its children. This function updates local_time_frac_ for each profile.
+  void ComputeTimeInProfile();
+
+  /// Prints the contents of the profile in a name: value format.
+  /// Does not hold locks when it makes any function calls.
+  void PrettyPrint(std::ostream* s, const std::string& prefix = "") const;
+
+  void GetChildren(std::vector<RuntimeProfileBase*>* children);
+
+  /// Gets all profiles in tree, including this one.
+  void GetAllChildren(std::vector<RuntimeProfileBase*>* children);
+
+  /// Adds a string to the runtime profile.  If a value already exists for 'key',
+  /// the value will be updated.
+  /// TODO: IMPALA-9382: this can be moved to RuntimeProfile once we remove callsites for
+  /// this function on AggregatedRuntimeProfile.
+  void AddInfoString(const std::string& key, const std::string& value);
+
+  /// Returns name of this profile
+  const std::string& name() const { return name_; }
+  const TRuntimeProfileNodeMetadata& metadata() const { return metadata_; }
+
+  /// Returns the counter for the total elapsed time.
+  Counter* total_time_counter() const {
+    auto it = counter_map_.find(TOTAL_TIME_COUNTER_NAME);
+    DCHECK(it != counter_map_.end());
+    return it->second;
+  }
+
+  /// Returns the counter for the inactive time.
+  Counter* inactive_timer() const {
+    auto it = counter_map_.find(INACTIVE_TIME_COUNTER_NAME);
+    DCHECK(it != counter_map_.end());
+    return it->second;
+  }
+
+  int64_t local_time() const { return local_time_ns_.Load(); }
+  int64_t total_time() const { return total_time_ns_.Load(); }
+
+  /// Returns the number of counters in this profile. Used for unit tests.
+  int num_counters() const;
+
+  /// Return the number of input instances that contributed to this profile.
+  /// Always 1 for non-aggregated profiles.
+  virtual int GetNumInputProfiles() const = 0;
+
+ protected:
+  /// Name of the counter maintaining the total time.
+  static const std::string TOTAL_TIME_COUNTER_NAME;
+  static const std::string LOCAL_TIME_COUNTER_NAME;
+  static const std::string INACTIVE_TIME_COUNTER_NAME;
+
+  /// Pool for allocated counters. Usually owned by the creator of this
+  /// object, but occasionally allocated in the constructor.
+  ObjectPool* pool_;
+
+  /// Name for this runtime profile.
+  std::string name_;
+
+  /// Detailed metadata that identifies the plan node, sink, etc.
+  TRuntimeProfileNodeMetadata metadata_;
+
+  /// Map from counter names to counters.  The profile owns the memory for the
+  /// counters.
+  typedef std::map<std::string, Counter*> CounterMap;
+  CounterMap counter_map_;
+
+  /// Map from parent counter name to a set of child counter name.
+  /// All top level counters are the child of "" (root).
+  typedef std::map<std::string, std::set<std::string>> ChildCounterMap;
+  ChildCounterMap child_counter_map_;
+
+  /// Protects counter_map_, child_counter_map_, RuntimeProfile::bucketing_counters_,
+  /// RuntimeProfile::rate_counters_, RuntimeProfile::sampling_counters_,
+  /// RuntimeProfile::time_series_counter_map_, and
+  /// RuntimeProfile::has_active_periodic_counters_.
+  mutable SpinLock counter_map_lock_;
+
+  /// TODO: IMPALA-9382: info strings can be moved to RuntimeProfile once we remove
+  /// callsites for this function on AggregatedRuntimeProfile.
+  typedef std::map<std::string, std::string> InfoStrings;
+  InfoStrings info_strings_;
+
+  /// Keeps track of the order in which InfoStrings are displayed when printed.
+  typedef std::vector<std::string> InfoStringsDisplayOrder;
+  InfoStringsDisplayOrder info_strings_display_order_;
+
+  /// Protects info_strings_ and info_strings_display_order_.
+  mutable SpinLock info_strings_lock_;
+
+  /// Child profiles. Does not own memory.
+  /// We record children in both a map (to facilitate updates) and a vector
+  /// (to print things in the order they were registered)
+  typedef std::map<std::string, RuntimeProfileBase*> ChildMap;
+  ChildMap child_map_;
+
+  /// Vector of (profile, indentation flag).
+  typedef std::vector<std::pair<RuntimeProfileBase*, bool>> ChildVector;
+  ChildVector children_;
+
+  /// Protects child_map_ and children_.
+  mutable SpinLock children_lock_;
+
+  /// Time spent in just in this profile (i.e. not the children) as a fraction
+  /// of the total time in the entire profile tree. This is a double's bit pattern
+  /// stored in an integer. Computed in ComputeTimeInProfile().
+  /// Atomic so that it can be read concurrently with the value being calculated.
+  AtomicInt64 local_time_frac_{0};
+
+  /// Time spent in this node (not including the children). Computed in
+  /// ComputeTimeInProfile(). Atomic b/c it can be read concurrently with
+  /// ComputeTimeInProfile() executing.
+  AtomicInt64 local_time_ns_{0};
+
+  /// Total time spent in this node. Computed in ComputeTimeInProfile() and is
+  /// the maximum of the total time spent in children and the value of
+  /// counter_total_time_. Atomic b/c it can be read concurrently with
+  /// ComputeTimeInProfile() executing.
+  AtomicInt64 total_time_ns_{0};
+
+  RuntimeProfileBase(ObjectPool* pool, const std::string& name);
+
+  ///  Inserts 'child' before the iterator 'insert_pos' in 'children_'.
+  /// 'children_lock_' must be held by the caller.
+  void AddChildLocked(
+      RuntimeProfileBase* child, bool indent, ChildVector::iterator insert_pos);
+
+  /// Clear all chunked time series counters in this profile and all children.
+  virtual void ClearChunkedTimeSeriesCounters();
+
+  struct CollectedNode {
+    CollectedNode(const RuntimeProfileBase* node, bool indent, int num_children)
+      : node(node), indent(indent), num_children(num_children) {}
+    const RuntimeProfileBase* const node;
+    const bool indent;
+    const int num_children;
+  };
+
+  /// Collect this node and descendants into 'nodes'. The order is a pre-order traversal
+  /// 'indent' is true if this node should be indented.
+  void CollectNodes(bool indent, std::vector<CollectedNode>* nodes) const;
+
+  /// Helpers to serialize the individual plan nodes to thrift.
+  void ToThriftHelper(std::vector<TRuntimeProfileNode>* nodes) const;
+  void ToThriftHelper(TRuntimeProfileNode* out_node) const;
+
+  /// Adds subclass-specific state to 'out_node'.
+  virtual void ToThriftSubclass(
+      std::vector<std::pair<const std::string&, const Counter*>>& counter_map_entries,
+      TRuntimeProfileNode* out_node) const = 0;
+
+  /// Create a subtree of runtime profiles from nodes, starting at *node_idx.
+  /// On return, *node_idx is the index one past the end of this subtree.
+  static RuntimeProfileBase* CreateFromThriftHelper(
+      ObjectPool* pool, const std::vector<TRuntimeProfileNode>& nodes, int* node_idx);
+
+  /// Init subclass-specific state from 'node'.
+  virtual void InitFromThrift(const TRuntimeProfileNode& node, ObjectPool* pool) = 0;
+
+  /// Helper for ToJson().
+  void ToJsonHelper(rapidjson::Value* parent, rapidjson::Document* d) const;
+
+  /// Adds subclass-specific state to 'parent'.
+  virtual void ToJsonSubclass(rapidjson::Value* parent, rapidjson::Document* d) const = 0;
+
+  /// Print the child counters of the given counter name.
+  static void PrintChildCounters(const std::string& prefix,
+      const std::string& counter_name, const CounterMap& counter_map,
+      const ChildCounterMap& child_counter_map, std::ostream* s);
+
+  /// Print info strings. Implemented by subclass which may store them
+  /// in different ways
+  virtual void PrettyPrintInfoStrings(
+      std::ostream* s, const std::string& prefix) const = 0;
+
+  /// Print any additional counters from the base class.
+  virtual void PrettyPrintSubclassCounters(
+      std::ostream* s, const std::string& prefix) const = 0;
+
+  /// Add all the counters of this instance into the given parent node in JSON format
+  /// Args:
+  ///   parent: the root node to add all the counters
+  ///   d: document of this json, could be used to get Allocator
+  ///   counter_name: this will be used to find its child counters in child_counter_map
+  ///   counter_map: A map of counters name to counter
+  ///   child_counter_map: A map of counter to its child counters
+  void ToJsonCounters(rapidjson::Value* parent, rapidjson::Document* d,
+      const string& counter_name, const CounterMap& counter_map,
+      const ChildCounterMap& child_counter_map) const;
+
+  /// Implementation of AddInfoString() and AppendInfoString(). If 'append' is false,
+  /// implements AddInfoString(), otherwise implements AppendInfoString().
+  /// Redaction rules are applied on the info string if 'redact' is true.
+  /// Trailing whitespace is removed.
+  /// TODO: IMPALA-9382: this can be moved to RuntimeProfile once we remove callsites for
+  /// this function on AggregatedRuntimeProfile.
+  void AddInfoStringInternal(
+      const std::string& key, std::string value, bool append, bool redact = false);
+};
+
+/// A standard runtime profile that can be mutated and updated.
+class RuntimeProfile : public RuntimeProfileBase {
+ public:
+  // Import the nested class names from the base class so we can still refer
+  // to them as RuntimeProfile::Counter, etc in the rest of the codebase.
+  using RuntimeProfileBase::Counter;
+  using RuntimeProfileBase::SummaryStatsCounter;
   class ConcurrentTimerCounter;
   class DerivedCounter;
   class HighWaterMarkCounter;
-  class SummaryStatsCounter;
   class EventSequence;
   class ThreadCounters;
   class TimeSeriesCounter;
   class SamplingTimeSeriesCounter;
   class ChunkedTimeSeriesCounter;
 
-  typedef boost::function<int64_t ()> SampleFunction;
-
   /// Create a runtime profile object with 'name'. The profile, counters and any other
   /// structures owned by the profile are allocated from 'pool'.
-  /// If 'is_averaged_profile' is true, the counters in this profile will be derived
-  /// averages (of unit AveragedCounter) from other profiles, so the counter map will
-  /// be left empty. Otherwise, the counter map is initialized with a single entry for
-  /// TotalTime.
-  static RuntimeProfile* Create(ObjectPool* pool, const std::string& name,
-      bool is_averaged_profile = false);
+  static RuntimeProfile* Create(ObjectPool* pool, const std::string& name);
 
   ~RuntimeProfile();
 
-  /// Deserialize from thrift.  Runtime profiles are allocated from the pool.
+  /// Deserialize a runtime profile tree from thrift. Allocated objects are stored in
+  /// 'pool'.
   static RuntimeProfile* CreateFromThrift(ObjectPool* pool,
       const TRuntimeProfileTree& profiles);
 
@@ -165,12 +387,12 @@
   /// relative to the parent.
   /// If location is non-null, child will be inserted after location.  Location must
   /// already be added to the profile.
-  void AddChild(RuntimeProfile* child,
-      bool indent = true, RuntimeProfile* location = NULL);
+  void AddChild(
+      RuntimeProfileBase* child, bool indent = true, RuntimeProfile* location = NULL);
 
   /// Adds a child profile, similarly to AddChild(). The child profile is put before any
   /// existing profiles.
-  void PrependChild(RuntimeProfile* child, bool indent = true);
+  void PrependChild(RuntimeProfileBase* child, bool indent = true);
 
   /// Creates a new child profile with the given 'name'. A child profile with that name
   /// must not already exist. If 'prepend' is true, prepended before other child profiles,
@@ -182,19 +404,11 @@
   /// invalidate pointers to profiles.
   void SortChildrenByTotalTime();
 
-  /// Updates the AveragedCounter counters in this profile with the counters from the
-  /// 'src' profile. If a counter is present in 'src' but missing in this profile, a new
-  /// AveragedCounter is created with the same name. This method should not be invoked
-  /// if is_average_profile_ is false. Obtains locks on the counter maps and child counter
-  /// maps in both this and 'src' profiles.
-  void UpdateAverage(RuntimeProfile* src);
-
   /// Updates this profile w/ the thrift profile.
   /// Counters and child profiles in thrift_profile that already exist in this profile
   /// are updated. Counters that do not already exist are created.
   /// Info strings matched up by key and are updated or added, depending on whether
   /// the key has already been registered.
-  /// TODO: Event sequences are ignored
   void Update(const TRuntimeProfileTree& thrift_profile);
 
   /// Add a counter with 'name'/'unit'.  Returns a counter object that the caller can
@@ -235,22 +449,6 @@
   // once.
   void AddLocalTimeCounter(const SampleFunction& counter_fn);
 
-  /// Gets the counter object with 'name'.  Returns NULL if there is no counter with
-  /// that name.
-  Counter* GetCounter(const std::string& name);
-
-  /// Gets the summary stats counter with 'name'. Returns NULL if there is no summary
-  /// stats counter with that name.
-  SummaryStatsCounter* GetSummaryStatsCounter(const std::string& name);
-
-  /// Adds all counters with 'name' that are registered either in this or
-  /// in any of the child profiles to 'counters'.
-  void GetCounters(const std::string& name, std::vector<Counter*>* counters);
-
-  /// Adds a string to the runtime profile.  If a value already exists for 'key',
-  /// the value will be updated.
-  void AddInfoString(const std::string& key, const std::string& value);
-
   /// Same as AddInfoString(), except that this method applies the redaction
   /// rules on 'value' before adding it to the runtime profile.
   void AddInfoStringRedacted(const std::string& key, const std::string& value);
@@ -286,8 +484,6 @@
   /// TODO: EventSequences are not merged by Merge() or Update()
   EventSequence* AddEventSequence(const std::string& key);
   EventSequence* AddEventSequence(const std::string& key, const TEventSequence& from);
-
-  /// Returns event sequence with the provided name if it exists, otherwise NULL.
   EventSequence* GetEventSequence(const std::string& name) const;
 
   /// Updates 'value' of info string with 'key'. No-op if the key doesn't exist.
@@ -297,77 +493,20 @@
   /// the key does not exist.
   const std::string* GetInfoString(const std::string& key) const;
 
+  /// Gets the summary stats counter with 'name'. Returns NULL if there is no summary
+  /// stats counter with that name.
+  SummaryStatsCounter* GetSummaryStatsCounter(const std::string& name);
+
   /// Stops updating all counters in this profile that are periodically updated by a
   /// background thread (i.e. sampling, rate, bucketing and time series counters).
   /// Must be called before the profile is destroyed if any such counters are active.
   /// Does not stop counters on descendant profiles.
   void StopPeriodicCounters();
 
-  /// Returns the counter for the total elapsed time.
-  Counter* total_time_counter() { return counter_map_[TOTAL_TIME_COUNTER_NAME]; }
-  Counter* inactive_timer() { return counter_map_[INACTIVE_TIME_COUNTER_NAME]; }
-  int64_t local_time() { return local_time_ns_.Load(); }
-  int64_t total_time() { return total_time_ns_.Load(); }
-
-  /// Prints the contents of the profile in a name: value format.
-  /// Does not hold locks when it makes any function calls.
-  void PrettyPrint(std::ostream* s, const std::string& prefix="") const;
-
-  /// Serializes profile to thrift.
-  /// Does not hold locks when it makes any function calls.
-  void ToThrift(TRuntimeProfileTree* tree) const;
-
-  /// Store profile into JSON format into a document
-  void ToJsonHelper(rapidjson::Value* parent, rapidjson::Document* d) const;
-  void ToJson(rapidjson::Document* d) const;
-
-  /// Serializes the runtime profile to a buffer.  This first serializes the
-  /// object using thrift compact binary format and then gzip compresses it.
-  /// This is not a lightweight operation and should not be in the hot path.
-  Status Compress(std::vector<uint8_t>* out) const;
-
-  /// Deserializes a compressed profile into a TRuntimeProfileTree. 'compressed_profile'
-  /// is expected to have been serialized by Compress().
-  static Status DecompressToThrift(
-      const std::vector<uint8_t>& compressed_profile, TRuntimeProfileTree* out);
-
-  /// Deserializes a compressed profile into a RuntimeProfile tree owned by 'pool'.
-  /// 'compressed_profile' is expected to have been serialized by Compress().
-  static Status DecompressToProfile(const std::vector<uint8_t>& compressed_profile,
-      ObjectPool* pool, RuntimeProfile** out);
-
-  /// Serializes the runtime profile to a string.  This first serializes the
-  /// object using thrift compact binary format, then gzip compresses it and
-  /// finally encodes it as base64.  This is not a lightweight operation and
-  /// should not be in the hot path.
-  Status SerializeToArchiveString(std::string* out) const WARN_UNUSED_RESULT;
-  Status SerializeToArchiveString(std::stringstream* out) const WARN_UNUSED_RESULT;
-
-  /// Deserializes a string into a TRuntimeProfileTree. 'archive_str' is expected to have
-  /// been serialized by SerializeToArchiveString().
-  static Status DeserializeFromArchiveString(
-      const std::string& archive_str, TRuntimeProfileTree* out);
-
-  /// Divides all counters by n
-  void Divide(int n);
-
-  void GetChildren(std::vector<RuntimeProfile*>* children);
-
-  /// Gets all profiles in tree, including this one.
-  void GetAllChildren(std::vector<RuntimeProfile*>* children);
-
-  /// Returns the number of counters in this profile
-  int num_counters() const { return counter_map_.size(); }
-
-  /// Returns name of this profile
-  const std::string& name() const { return name_; }
-
   /// *only call this on top-level profiles*
   /// (because it doesn't re-file child profiles)
   void set_name(const std::string& name) { name_ = name; }
 
-  const TRuntimeProfileNodeMetadata& metadata() const { return metadata_; }
-
   /// Called if this corresponds to a plan node. Sets metadata so that later code that
   /// analyzes the profile can identify this as the plan node's profile.
   void SetPlanNodeId(int node_id);
@@ -446,43 +585,73 @@
       const std::string& name, TUnit::type unit, SampleFunction sample_fn);
 
   /// Clear all chunked time series counters in this profile and all children.
-  void ClearChunkedTimeSeriesCounters();
+  void ClearChunkedTimeSeriesCounters() override;
 
-  /// Recursively compute the fraction of the 'total_time' spent in this profile and
-  /// its children.
-  /// This function updates local_time_frac_ for each profile.
-  void ComputeTimeInProfile();
+  /// Serializes an entire runtime profile tree to thrift.
+  /// This is defined in RuntimeProfile instead of RuntimeProfileBase becase we require
+  /// that a runtime profile root be a RuntimeProfile.
+  /// Does not hold locks when it makes any function calls.
+  void ToThrift(TRuntimeProfileTree* tree) const;
+
+  /// Store an entire runtime profile tree into JSON document 'd'.
+  void ToJson(rapidjson::Document* d) const;
+
+  /// Serializes the runtime profile to a buffer.  This first serializes the
+  /// object using thrift compact binary format and then gzip compresses it.
+  /// This is not a lightweight operation and should not be in the hot path.
+  Status Compress(std::vector<uint8_t>* out) const;
+
+  /// Deserializes a compressed profile into a TRuntimeProfileTree. 'compressed_profile'
+  /// is expected to have been serialized by Compress().
+  static Status DecompressToThrift(
+      const std::vector<uint8_t>& compressed_profile, TRuntimeProfileTree* out);
+
+  /// Deserializes a compressed profile into a RuntimeProfile tree owned by 'pool'.
+  /// 'compressed_profile' is expected to have been serialized by Compress().
+  static Status DecompressToProfile(const std::vector<uint8_t>& compressed_profile,
+      ObjectPool* pool, RuntimeProfile** out);
+
+  /// Serializes the runtime profile to a string.  This first serializes the
+  /// object using thrift compact binary format, then gzip compresses it and
+  /// finally encodes it as base64.  This is not a lightweight operation and
+  /// should not be in the hot path.
+  Status SerializeToArchiveString(std::string* out) const;
+  Status SerializeToArchiveString(std::stringstream* out) const;
+
+  /// Deserializes a string into a TRuntimeProfileTree. 'archive_str' is expected to have
+  /// been serialized by SerializeToArchiveString().
+  static Status DeserializeFromArchiveString(
+      const std::string& archive_str, TRuntimeProfileTree* out);
 
   /// Set ExecSummary
   void SetTExecSummary(const TExecSummary& summary);
 
-  /// Get a copy of exec_summary tp t_exec_summary
+  /// Get a copy of exec_summary to t_exec_summary
   void GetExecSummary(TExecSummary* t_exec_summary) const;
 
+ protected:
+  virtual int GetNumInputProfiles() const override { return 1; }
+
+  /// Adds subclass-specific state to 'out_node'.
+  void ToThriftSubclass(
+      std::vector<std::pair<const std::string&, const Counter*>>& counter_map_entries,
+      TRuntimeProfileNode* out_node) const override;
+
+  /// Init subclass-specific state from 'node'.
+  void InitFromThrift(const TRuntimeProfileNode& node, ObjectPool* pool) override;
+
+  /// Adds subclass-specific state to 'parent'.
+  void ToJsonSubclass(rapidjson::Value* parent, rapidjson::Document* d) const override;
+
+  /// Print info strings from this subclass.
+  void PrettyPrintInfoStrings(std::ostream* s, const std::string& prefix) const override;
+
+  /// Print any additional counters from this subclass.
+  void PrettyPrintSubclassCounters(
+      std::ostream* s, const std::string& prefix) const override;
+
  private:
-  /// Pool for allocated counters. Usually owned by the creator of this
-  /// object, but occasionally allocated in the constructor.
-  ObjectPool* pool_;
-
-  /// Name for this runtime profile.
-  std::string name_;
-
-  /// Detailed metadata that identifies the plan node, sink, etc.
-  TRuntimeProfileNodeMetadata metadata_;
-
-  /// True if this profile is an average derived from other profiles.
-  /// All counters in this profile must be of unit AveragedCounter.
-  bool is_averaged_profile_;
-
-  /// Map from counter names to counters.  The profile owns the memory for the
-  /// counters.
-  typedef std::map<std::string, Counter*> CounterMap;
-  CounterMap counter_map_;
-
-  /// Map from parent counter name to a set of child counter name.
-  /// All top level counters are the child of "" (root).
-  typedef std::map<std::string, std::set<std::string>> ChildCounterMap;
-  ChildCounterMap child_counter_map_;
+  friend class AggregatedRuntimeProfile;
 
   /// A set of bucket counters registered in this runtime profile.
   std::set<std::vector<Counter*>*> bucketing_counters_;
@@ -504,33 +673,6 @@
   /// sampling and time series counters.
   bool has_active_periodic_counters_ = false;
 
-  /// Protects counter_map_, child_counter_map_, bucketing_counters_, rate_counters_,
-  /// sampling_counters_, time_series_counter_map_, and has_active_periodic_counters_.
-  mutable SpinLock counter_map_lock_;
-
-  /// Child profiles.  Does not own memory.
-  /// We record children in both a map (to facilitate updates) and a vector
-  /// (to print things in the order they were registered)
-  typedef std::map<std::string, RuntimeProfile*> ChildMap;
-  ChildMap child_map_;
-
-  /// Vector of (profile, indentation flag).
-  typedef std::vector<std::pair<RuntimeProfile*, bool>> ChildVector;
-  ChildVector children_;
-
-  /// Protects child_map_ and children_.
-  mutable SpinLock children_lock_;
-
-  typedef std::map<std::string, std::string> InfoStrings;
-  InfoStrings info_strings_;
-
-  /// Keeps track of the order in which InfoStrings are displayed when printed.
-  typedef std::vector<std::string> InfoStringsDisplayOrder;
-  InfoStringsDisplayOrder info_strings_display_order_;
-
-  /// Protects info_strings_ and info_strings_display_order_.
-  mutable SpinLock info_strings_lock_;
-
   typedef std::map<std::string, EventSequence*> EventSequenceMap;
   EventSequenceMap event_sequence_map_;
 
@@ -543,29 +685,12 @@
   /// Protects summary_stats_map_.
   mutable SpinLock summary_stats_map_lock_;
 
-  Counter counter_total_time_;
+  Counter counter_total_time_{TUnit::TIME_NS};
 
   /// Total time spent waiting (on non-children) that should not be counted when
   /// computing local_time_frac_. This is updated for example in the exchange
   /// node when waiting on the sender from another fragment.
-  Counter inactive_timer_;
-
-  /// Time spent in just in this profile (i.e. not the children) as a fraction
-  /// of the total time in the entire profile tree. This is a double's bit pattern
-  /// stored in an integer. Computed in ComputeTimeInProfile().
-  /// Atomic so that it can be read concurrently with the value being calculated.
-  AtomicInt64 local_time_frac_{0};
-
-  /// Time spent in this node (not including the children). Computed in
-  /// ComputeTimeInProfile(). Atomic b/c it can be read concurrently with
-  /// ComputeTimeInProfile() executing.
-  AtomicInt64 local_time_ns_{0};
-
-  /// Total time spent in this node. Computed in ComputeTimeInProfile() and is
-  /// the maximum of the total time spent in children and the value of
-  /// counter_total_time_. Atomic b/c it can be read concurrently with
-  /// ComputeTimeInProfile() executing.
-  AtomicInt64 total_time_ns_{0};
+  Counter inactive_timer_{TUnit::TIME_NS};
 
   /// The Exec Summary
   TExecSummary t_exec_summary_;
@@ -574,53 +699,15 @@
   mutable SpinLock t_exec_summary_lock_;
 
   /// Constructor used by Create().
-  RuntimeProfile(ObjectPool* pool, const std::string& name, bool is_averaged_profile);
+  RuntimeProfile(ObjectPool* pool, const std::string& name);
 
   /// Update a subtree of profiles from nodes, rooted at *idx.
   /// On return, *idx points to the node immediately following this subtree.
   void Update(const std::vector<TRuntimeProfileNode>& nodes, int* idx);
 
-  /// Helper function to compute compute the fraction of the total time spent in
-  /// this profile and its children.
-  /// Called recusively.
-  void ComputeTimeInProfile(int64_t total_time);
-
-  /// Implementation of AddInfoString() and AppendInfoString(). If 'append' is false,
-  /// implements AddInfoString(), otherwise implements AppendInfoString().
-  /// Redaction rules are applied on the info string if 'redact' is true.
-  /// Trailing whitspace is removed.
-  void AddInfoStringInternal(
-      const std::string& key, std::string value, bool append, bool redact = false);
-
-  /// Helper to serialize the individual plan nodes to thrift.
-  void ToThrift(std::vector<TRuntimeProfileNode>* nodes) const;
-  void ToThrift(TRuntimeProfileNode* out_node) const;
-
-  struct CollectedNode {
-    CollectedNode(const RuntimeProfile* node, bool indent, int num_children) :
-      node(node), indent(indent), num_children(num_children) {}
-    const RuntimeProfile* const node;
-    const bool indent;
-    const int num_children;
-  };
-
-  /// Collect this node and descendants into 'nodes'. The order is a pre-order traversal
-  /// 'indent' is true if this node should be indented.
-  void CollectNodes(bool indent, std::vector<CollectedNode>* nodes) const;
-
   /// Send exec_summary to thrift
   void ExecSummaryToThrift(TRuntimeProfileTree* tree) const;
 
-  /// Name of the counter maintaining the total time.
-  static const std::string TOTAL_TIME_COUNTER_NAME;
-  static const std::string LOCAL_TIME_COUNTER_NAME;
-  static const std::string INACTIVE_TIME_COUNTER_NAME;
-
-  /// Create a subtree of runtime profiles from nodes, starting at *node_idx.
-  /// On return, *node_idx is the index one past the end of this subtree
-  static RuntimeProfile* CreateFromThrift(
-      ObjectPool* pool, const std::vector<TRuntimeProfileNode>& nodes, int* node_idx);
-
   /// Internal implementations of the Add*Counter() functions for use when the caller
   /// holds counter_map_lock_. Also returns 'created', which is true if a new counter was
   /// created and false if a counter with the given name already existed.
@@ -630,26 +717,101 @@
       TUnit::type unit, const std::string& parent_counter_name, bool* created);
   ConcurrentTimerCounter* AddConcurrentTimerCounterLocked(const std::string& name,
       TUnit::type unit, const std::string& parent_counter_name, bool* created);
+};
 
-  ///  Inserts 'child' before the iterator 'insert_pos' in 'children_'.
-  /// 'children_lock_' must be held by the caller.
-  void AddChildLocked(
-      RuntimeProfile* child, bool indent, ChildVector::iterator insert_pos);
+/// An aggregated profile that results from combining one or more RuntimeProfiles.
+/// Contains averaged and otherwise aggregated versions of the counters from the
+/// input profiles.
+///
+/// An AggregatedRuntimeProfile can only have other AggregatedRuntimeProfiles as
+/// children.
+class AggregatedRuntimeProfile : public RuntimeProfileBase {
+ public:
+  /// Create an aggregated runtime profile with 'name'. The profile, counters and any
+  /// other structures owned by the profile are allocated from 'pool'.
+  /// The counters in this profile will be aggregated AveragedCounters merged
+  /// from other profiles.
+  ///
+  /// 'is_root' must be true if this is the root of the averaged profile tree which
+  /// will have Update() called on it. The descendants of the root are created and
+  /// updated via Update().
+  static AggregatedRuntimeProfile* Create(ObjectPool* pool, const std::string& name,
+      int num_input_profiles, bool is_root = true);
 
-  /// Print the child counters of the given counter name
-  static void PrintChildCounters(const std::string& prefix,
-      const std::string& counter_name, const CounterMap& counter_map,
-      const ChildCounterMap& child_counter_map, std::ostream* s);
+  /// Updates the AveragedCounter counters in this profile with the counters from the
+  /// 'src' profile. If a counter is present in 'src' but missing in this profile, a new
+  /// AveragedCounter is created with the same name. Obtains locks on the counter maps
+  /// and child counter maps in both this and 'src' profiles.
+  /// TODO: IMPALA-9382: update method name and comment.
+  ///
+  /// Note that 'src' must be all instances of RuntimeProfile - no
+  /// AggregatedRuntimeProfiles can be part of the input.
+  void Update(RuntimeProfile* src, int idx);
 
-  /// Add all the counters of this instance into the given parent node in JSON format
-  /// Args:
-  ///   parent: the root node to add all the counters
-  ///   d: document of this json, could be used to get Allocator
-  ///   counter_name: this will be used to find its child counters in child_counter_map
-  ///   counter_map: A map of counters name to counter
-  ///   child_counter_map: A map of counter to its child counters
-  void ToJsonCounters(rapidjson::Value* parent, rapidjson::Document* d,
-      const string& counter_name, const CounterMap& counter_map,
-      const ChildCounterMap& child_counter_map) const;
+ protected:
+  virtual int GetNumInputProfiles() const override { return num_input_profiles_; }
+
+  /// Adds subclass-specific state to 'out_node'. 'counter_map_entries' is a snapshot
+  /// of 'counter_map_'.
+  void ToThriftSubclass(
+      std::vector<std::pair<const std::string&, const Counter*>>& counter_map_entries,
+      TRuntimeProfileNode* out_node) const override;
+
+  /// Init subclass-specific state from 'node'.
+  void InitFromThrift(const TRuntimeProfileNode& node, ObjectPool* pool) override;
+
+  /// Adds subclass-specific state to 'parent'.
+  void ToJsonSubclass(rapidjson::Value* parent, rapidjson::Document* d) const override;
+
+  /// Print info strings from this subclass.
+  void PrettyPrintInfoStrings(std::ostream* s, const std::string& prefix) const override;
+
+  /// Print any additional counters from this subclass.
+  void PrettyPrintSubclassCounters(
+      std::ostream* s, const std::string& prefix) const override;
+
+ private:
+  /// Number of profiles that will contribute to this aggregated profile.
+  const int num_input_profiles_;
+
+  /// Names of the input profiles. Only use on averaged profiles that are the root
+  /// of an averaged profile tree.
+  /// The size is 'num_input_profiles_'. Protected by 'input_profile_name_lock_'
+  std::vector<std::string> input_profile_names_;
+  mutable SpinLock input_profile_name_lock_;
+
+  /// Aggregated info strings from the input profile. The key is the info string name.
+  /// The value vector contains an entry for every input profile.
+  std::map<std::string, std::vector<std::string>> agg_info_strings_;
+
+  /// Protects 'agg_info_strings_'.
+  mutable SpinLock agg_info_strings_lock_;
+
+  /// Per-instance summary stats. Value is the unit and all of the instance of the
+  /// counter. Some of the counters may be null if the input profile did not have
+  /// that counter present.
+  /// Protected by 'summary_stats_map_lock_'.
+  typedef std::map<std::string, std::pair<TUnit::type, std::vector<SummaryStatsCounter*>>>
+      AggSummaryStatsCounterMap;
+  AggSummaryStatsCounterMap summary_stats_map_;
+
+  /// Protects summary_stats_map_.
+  mutable SpinLock summary_stats_map_lock_;
+
+  AggregatedRuntimeProfile(
+      ObjectPool* pool, const std::string& name, int num_input_profiles, bool is_root);
+
+  /// Group the values in 'info_string_values' by value, with a vector of indices where
+  /// that value appears. 'info_string_values' must be a value from 'info_strings_'.
+  std::map<std::string, std::vector<int32_t>> GroupDistinctInfoStrings(
+      const std::vector<std::string>& info_string_values) const;
+
+  /// Helper for Update() that is invoked recursively on 'src'.
+  void UpdateRecursive(RuntimeProfile* src, int idx);
+
+  /// Aggregate summary stats into a single counter. Entries in
+  /// 'counter' may be NULL.
+  static void AggregateSummaryStats(
+      const std::vector<SummaryStatsCounter*> counters, SummaryStatsCounter* result);
 };
 }
diff --git a/common/thrift/RuntimeProfile.thrift b/common/thrift/RuntimeProfile.thrift
index 95d370a..ea63b9f 100644
--- a/common/thrift/RuntimeProfile.thrift
+++ b/common/thrift/RuntimeProfile.thrift
@@ -54,6 +54,18 @@
   3: required i64 value
 }
 
+// Aggregated version of TCounter, belonging to a TAggregatedRuntimeProfileNode.
+// Lists have TAggregatedRuntimeProfileNode.num_instances entries.
+struct TAggCounter {
+  1: required string name
+  2: required Metrics.TUnit unit
+  // True if a value was set for this instance.
+  3: required list<bool> has_value
+  // The actual values. values[i] holds the value if has_value[i] == true, and is ignored
+  // if has_value[i] == false.
+  4: required list<i64> values
+}
+
 // Thrift version of RuntimeProfile::EventSequence - list of (label, timestamp) pairs
 // which represent an ordered sequence of events.
 struct TEventSequence {
@@ -95,6 +107,25 @@
   6: required i64 max_value
 }
 
+// Aggregated version of TSummaryStatsCounter, belonging to a
+// TAggregatedRuntimeProfileNode.
+// Lists have TAggregatedRuntimeProfileNode.num_instances entries.
+struct TAggSummaryStatsCounter {
+  1: required string name
+  2: required Metrics.TUnit unit
+
+  // True if a value was set for this instance.
+  3: required list<bool> has_value
+
+  // The actual values of the summary stats counter, with each field stored in a separate
+  // list. The ith element of each list holds a valid value if has_value[i] == true, and
+  // is ignored if has_value[i] == false.
+  4: required list<i64> sum
+  5: required list<i64> total_num_values
+  6: required list<i64> min_value
+  7: required list<i64> max_value
+}
+
 // Metadata to help identify what entity the profile node corresponds to.
 union TRuntimeProfileNodeMetadata {
   // Set if this node corresponds to a plan node.
@@ -104,10 +135,62 @@
   2: Types.TDataSinkId data_sink_id
 }
 
-// A single runtime profile
+struct TAggregatedRuntimeProfileNode {
+  // Number of instances that were included in the stats in this profile.
+  1: optional i32 num_instances
+
+  // Names of the profiles that contributed to this aggregated profile. This is only
+  // set at the root of the aggregated profile tree. Nodes below that root will have
+  // the same number of instances but the names of the input profiles are not duplicated.
+  // The indices of these instances are used in many of the aggregated profile elements.
+  // E.g. indices in TAggCounter.values line up with these indices, and the indices used
+  // as map values in info_strings correspond to the indices here.
+  2: optional list<string> input_profiles
+
+  // The nesting of these counters is determined by 'child_counters_map' in the parent
+  // TRuntimeProfileNode.
+  3: optional list<TAggCounter> counters
+
+  // Info strings stored in a dense representation. The first map key is the info string
+  // key. The second map key is a distinct value of that key. The value is a list of
+  // input indices that had the value. This means that the common case, where many
+  // instances have identical strings, can be represented very compactly.
+  //
+  // E.g. the "ExecOption" and "Table Name" fields for 10 HDFS_SCAN_NODE instances could
+  // be represented as:
+  // {
+  //   "Table Name": {"tpch.lineitem": [0,1,2,3,4,5,6,7,8,9]},
+  //   "ExecOption": {
+  //      "ExecOption: TEXT Codegen Enabled, Codegen enabled: 2 out of 2":
+  //       [0,1,2,3,4,6,7,8,9],
+  //      "ExecOption: TEXT Codegen Enabled, Codegen enabled: 3 out of 3":
+  //       [5]
+  //   }
+  // }
+  4: optional map<string, map<string, list<i32>>> info_strings
+
+  // List of summary stats counters.
+  5: optional list<TAggSummaryStatsCounter> summary_stats_counters
+
+  // TODO: IMPALA-9382: decide on representation. This is a placeholder
+  // 6: optional list<list<TEventSequence>> event_sequences
+
+  // TODO: IMPALA-9382: decide on representation. This is a placeholder
+  // 7: optional list<list<TTimeSeriesCounter>> time_series_counters
+}
+
+
+// A single node in the runtime profile tree. This can either be an unaggregated node,
+// which has the singular counter values from the original RuntimeProfile object, or
+// it can be an aggregated node, which was merged together from one or more unaggregated
+// input nodes.
 struct TRuntimeProfileNode {
   1: required string name
   2: required i32 num_children
+
+  // In a unaggregated profile, individual counters.
+  // In an aggregated profile, these are the averaged counters only.
+  // The nesting of these counters is determined by 'child_counters_map' in the parent.
   3: required list<TCounter> counters
 
   // Legacy field. May contain the node ID for plan nodes.
@@ -118,14 +201,21 @@
   // corresponds to indent param of RuntimeProfile::AddChild()
   5: required bool indent
 
+  // ======================================================================
+  // BEGIN: unaggregated state.
+  // These fields represent unaggregated profile state only.
+  // These fields are not used in aggregated profile nodes, i.e. profile V2,
+  // unless otherwise noted.
+  // ======================================================================
   // map of key,value info strings that capture any kind of additional information
   // about the profiled object
   6: required map<string, string> info_strings
 
-  // Auxilliary structure to capture the info strings display order when printed
+  // Auxiliary structure to capture the info strings display order when printed
   7: required list<string> info_strings_display_order
 
-  // map from parent counter name to child counter name
+  // map from parent counter name to child counter name. This applies to both
+  // 'counters' and 'aggregated.counters'.
   8: required map<string, set<string>> child_counters_map
 
   // List of event sequences that capture ordered events in a query's lifetime
@@ -134,18 +224,47 @@
   // List of time series counters
   10: optional list<TTimeSeriesCounter> time_series_counters
 
-  // List of summary stats counters
+  // ======================================================================
+  // END: unaggregated state.
+  // ======================================================================
+
+  // List of summary stats counters.
+  // Used in both unaggregated profiles and aggregated profiles. In aggregated profiles,
+  // summarizes stats from all input profiles.
   11: optional list<TSummaryStatsCounter> summary_stats_counters
 
   // Metadata about the entity that this node refers to.
   12: optional TRuntimeProfileNodeMetadata node_metadata
+
+  // Aggregated profile counters - contains the same counters as above, except transposed
+  // so that one TRuntimeProfileNode contains counters for all input profiles.
+  13: optional TAggregatedRuntimeProfileNode aggregated
 }
 
-// A flattened tree of runtime profiles, obtained by an
-// pre-order traversal
+// A flattened tree of runtime profiles, obtained by an pre-order traversal. The contents
+// of the profile vary between different version.
+//
+// Version 1:
+// The fully-expanded runtime profile tree generated by the query is included. For every
+// fragment, each fragment instance has a separate profile subtree and an averaged profile
+// for the fragment is also included with averaged counter values.
+//
+// Version 2 (experimental):
+// Different from version 1, there is only a single aggregated profile tree for each
+// fragment. All nodes in this tree use the aggregated profile representation. Otherwise
+// the structure of the profile is the same.
 struct TRuntimeProfileTree {
   1: required list<TRuntimeProfileNode> nodes
   2: optional ExecStats.TExecSummary exec_summary
+
+  // The version of the runtime profile representation. Different versions may have
+  // different invariants or information. Thrift structures for new versions need to
+  // remain readable by readers with old versions of the thrift file, but may remove
+  // information.
+  // Version 1: this field is unset
+  // Version 2: this field is set to 2
+  // TODO: IMPALA-9382: document which versions of Impala generate which version.
+  3: optional i32 profile_version
 }
 
 // A list of TRuntimeProfileTree structures.