IMPALA-4618: Fixing #Hosts and adding #Instances in exec summary

When mt_dop > 0, the summary is reporting the number of fragment
instances, instead of the number of hosts as the header would
imply.

This commit fixes the issue so the number of hosts will be shown
under the #Hosts column. The commit also adds an #Inst column
where the number of instances are shown (current behaviour).

Tests:
 * Changed profile tests with mt_dop > 0.
 * Updated benchmark tests and shell tests accordingly.

Change-Id: I3bdf9a06d9bd842b2397cd16c28294b6bec7af69
Reviewed-on: http://gerrit.cloudera.org:8080/14715
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 1d2e88f..2ba678d 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -225,8 +225,15 @@
 
       const TPlan& plan = fragment.plan;
       const TDataSink& output_sink = fragment.output_sink;
-      int num_instances =
-          schedule.GetFragmentExecParams(fragment.idx).instance_exec_params.size();
+      // Count the number of hosts and instances.
+      const vector<FInstanceExecParams>& instance_params =
+          schedule.GetFragmentExecParams(fragment.idx).instance_exec_params;
+      unordered_set<TNetworkAddress> host_set;
+      for (const FInstanceExecParams& instance: instance_params) {
+        host_set.insert(instance.host);
+      }
+      int num_hosts = host_set.size();
+      int num_instances = instance_params.size();
 
       // Add the data sink at the root of the fragment.
       data_sink_id_to_idx_map[fragment.idx] = thrift_exec_summary.nodes.size();
@@ -241,6 +248,7 @@
       node_summary.__set_num_children(1);
       DCHECK(output_sink.__isset.estimated_stats);
       node_summary.__set_estimated_stats(output_sink.estimated_stats);
+      node_summary.__set_num_hosts(num_hosts);
       node_summary.exec_stats.resize(num_instances);
 
       // We don't track rows returned from sinks, but some clients like impala-shell
@@ -262,6 +270,7 @@
         node_summary.__set_num_children(node.num_children);
         DCHECK(node.__isset.estimated_stats);
         node_summary.__set_estimated_stats(node.estimated_stats);
+        node_summary.__set_num_hosts(num_hosts);
         node_summary.exec_stats.resize(num_instances);
       }
 
diff --git a/be/src/util/summary-util.cc b/be/src/util/summary-util.cc
index ab90e87..aded383 100644
--- a/be/src/util/summary-util.cc
+++ b/be/src/util/summary-util.cc
@@ -73,6 +73,7 @@
 
   vector<string> row;
   row.push_back(label_ss.str());
+  row.push_back(lexical_cast<string>(node.num_hosts));
   row.push_back(lexical_cast<string>(node.exec_stats.size())); // Num instances
   row.push_back(PrettyPrinter::Print(avg_time, TUnit::TIME_NS));
   row.push_back(PrettyPrinter::Print(max_stats.latency_ns, TUnit::TIME_NS));
@@ -121,6 +122,7 @@
   printer.set_max_output_width(1000);
   printer.AddColumn("Operator", true);
   printer.AddColumn("#Hosts", false);
+  printer.AddColumn("#Inst", false);
   printer.AddColumn("Avg Time", false);
   printer.AddColumn("Max Time", false);
   printer.AddColumn("#Rows", false);
diff --git a/common/thrift/ExecStats.thrift b/common/thrift/ExecStats.thrift
index 60b6a18..30c1fcd 100644
--- a/common/thrift/ExecStats.thrift
+++ b/common/thrift/ExecStats.thrift
@@ -75,6 +75,10 @@
 
   // If true, this is an exchange node that is the receiver of a broadcast.
   8: optional bool is_broadcast
+
+  // The number of hosts. It cannot be inferred from exec_stats, since the length of the
+  // list can be greater when mt_dop > 0.
+  9: optional i32 num_hosts
 }
 
 // Progress counters for an in-flight query.
diff --git a/shell/impala_client.py b/shell/impala_client.py
index 4dd22a8..66eed04 100755
--- a/shell/impala_client.py
+++ b/shell/impala_client.py
@@ -560,12 +560,12 @@
     def prettyprint_time(time_val):
       return prettyprint(time_val, ["ns", "us", "ms", "s"], 1000.0)
 
-    hosts = 0
+    instances = 0
     if node.exec_stats is not None:
-      hosts = len(node.exec_stats)
+      instances = len(node.exec_stats)
     is_sink = node.node_id == -1
     row = [ label_prefix + node.label,
-            hosts,
+            node.num_hosts, instances,
             prettyprint_time(avg_time),
             prettyprint_time(max_stats.latency_ns),
             "" if is_sink else prettyprint_units(cardinality),
diff --git a/shell/impala_shell.py b/shell/impala_shell.py
index 554ae26..1e67415 100755
--- a/shell/impala_shell.py
+++ b/shell/impala_shell.py
@@ -1077,8 +1077,9 @@
       self.last_summary = time.time()
 
   def _default_summary_table(self):
-    return self.construct_table_with_header(["Operator", "#Hosts", "Avg Time", "Max Time",
-                                             "#Rows", "Est. #Rows", "Peak Mem",
+    return self.construct_table_with_header(["Operator", "#Hosts", "#Inst",
+                                             "Avg Time", "Max Time", "#Rows",
+                                             "Est. #Rows", "Peak Mem",
                                              "Est. Peak Mem", "Detail"])
 
   def _execute_stmt(self, query_str, is_dml=False, print_web_link=False):
diff --git a/testdata/workloads/functional-query/queries/QueryTest/mt-dop-parquet-scheduling.test b/testdata/workloads/functional-query/queries/QueryTest/mt-dop-parquet-scheduling.test
index d0b1a2a..800fb67 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/mt-dop-parquet-scheduling.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/mt-dop-parquet-scheduling.test
@@ -41,12 +41,12 @@
 '0'
 ---- RUNTIME_PROFILE
 row_regex:.*AdmissionSlots: 4 .*
-row_regex:.*F04:ROOT * 1 .*
-row_regex:.*04:AGGREGATE * 12 .*
-row_regex:.*00:UNION * 12 *
-row_regex:.*02:SCAN HDFS * 12 .*alltypessmall.*
-row_regex:.*03:SCAN HDFS * 12 .*alltypestiny.*
-row_regex:.*01:SCAN HDFS * 12 .*alltypes.*
+row_regex:.*F04:ROOT * 1 * 1 .*
+row_regex:.*04:AGGREGATE * 3 * 12 .*
+row_regex:.*00:UNION * 3 * 12 *
+row_regex:.*02:SCAN HDFS * 3 * 12 .*alltypessmall.*
+row_regex:.*03:SCAN HDFS * 3 * 12 .*alltypestiny.*
+row_regex:.*01:SCAN HDFS * 3 * 12 .*alltypes.*
 ====
 ---- QUERY
 # Same idea, but with smallest scan first to check that the scheduler is taking the
@@ -61,12 +61,12 @@
 '0'
 ---- RUNTIME_PROFILE
 row_regex:.*AdmissionSlots: 4 .*
-row_regex:.*F04:ROOT * 1 .*
-row_regex:.*04:AGGREGATE * 12 .*
-row_regex:.*00:UNION * 12 *
-row_regex:.*02:SCAN HDFS * 12 .*alltypessmall.*
-row_regex:.*03:SCAN HDFS * 12 .*alltypes.*
-row_regex:.*01:SCAN HDFS * 12 .*alltypestiny.*
+row_regex:.*F04:ROOT * 1 * 1 .*
+row_regex:.*04:AGGREGATE * 3 * 12 .*
+row_regex:.*00:UNION * 3 * 12 *
+row_regex:.*02:SCAN HDFS * 3 * 12 .*alltypessmall.*
+row_regex:.*03:SCAN HDFS * 3 * 12 .*alltypes.*
+row_regex:.*01:SCAN HDFS * 3 * 12 .*alltypestiny.*
 ====
 ---- QUERY
 # This query should have one scan and one exchange in the interior fragment.
@@ -80,13 +80,13 @@
 '0'
 ---- RUNTIME_PROFILE
 row_regex:.*AdmissionSlots: 4.*
-row_regex:.*F04:ROOT * 1 .*
-row_regex:.*04:AGGREGATE * 12 .*
-row_regex:.*06:AGGREGATE * 12 .*
-row_regex:.*03:AGGREGATE * 12 .*
-row_regex:.*00:UNION * 12 *
-row_regex:.*02:SCAN HDFS * 12 .*alltypes.*
-row_regex:.*01:SCAN HDFS * 12 .*alltypestiny.*
+row_regex:.*F04:ROOT * 1 * 1 .*
+row_regex:.*04:AGGREGATE * 3 * 12 .*
+row_regex:.*06:AGGREGATE * 3 * 12 .*
+row_regex:.*03:AGGREGATE * 3 * 12 .*
+row_regex:.*00:UNION * 3 * 12 *
+row_regex:.*02:SCAN HDFS * 3 * 12 .*alltypes.*
+row_regex:.*01:SCAN HDFS * 3 * 12 .*alltypestiny.*
 ====
 ---- QUERY
 # This query should have one scan and one exchange in the interior fragment.
@@ -102,13 +102,13 @@
 '0'
 ---- RUNTIME_PROFILE
 row_regex:.*AdmissionSlots: 4.*
-row_regex:.*F04:ROOT * 1 .*
-row_regex:.*04:AGGREGATE * 12 .*
-row_regex:.*06:AGGREGATE * 12 .*
-row_regex:.*03:AGGREGATE * 4 .*
-row_regex:.*00:UNION * 12 *
-row_regex:.*02:SCAN HDFS * 4 .*alltypestiny.*
-row_regex:.*01:SCAN HDFS * 12 .*alltypes.*
+row_regex:.*F04:ROOT * 1 * 1 .*
+row_regex:.*04:AGGREGATE * 3 * 12 .*
+row_regex:.*06:AGGREGATE * 3 * 12 .*
+row_regex:.*03:AGGREGATE * 3 * 4 .*
+row_regex:.*00:UNION * 3 * 12 *
+row_regex:.*02:SCAN HDFS * 3 * 4 .*alltypestiny.*
+row_regex:.*01:SCAN HDFS * 3 * 12 .*alltypes.*
 ====
 ---- QUERY
 # This query should have one scan and two exchanges in the interior fragment.
@@ -123,11 +123,11 @@
 ---- RESULTS
 '0'
 ---- RUNTIME_PROFILE
-row_regex:.*F06:ROOT * 1 .*
+row_regex:.*F06:ROOT * 1 * 1 .*
 row_regex:.*AdmissionSlots: 2.*
-row_regex:.*00:UNION * 6 .*
-row_regex:.*08:AGGREGATE * 6 .*
-row_regex:.*03:AGGREGATE * 6 .*
-row_regex:.*04:SCAN HDFS * 6 .*
-row_regex:.*01:SCAN HDFS * 6 .*
+row_regex:.*00:UNION * 3 * 6 .*
+row_regex:.*08:AGGREGATE * 3 * 6 .*
+row_regex:.*03:AGGREGATE * 3 * 6 .*
+row_regex:.*04:SCAN HDFS * 3 * 6 .*
+row_regex:.*01:SCAN HDFS * 3 * 6 .*
 ====
diff --git a/tests/benchmark/report_benchmark_results.py b/tests/benchmark/report_benchmark_results.py
index 5312f01..0274e64 100755
--- a/tests/benchmark/report_benchmark_results.py
+++ b/tests/benchmark/report_benchmark_results.py
@@ -66,6 +66,7 @@
 NAME = 'name'
 NUM_CLIENTS = 'num_clients'
 NUM_HOSTS = 'num_hosts'
+NUM_INSTANCES = 'num_instances'
 NUM_ROWS = 'num_rows'
 OPERATOR = 'operator'
 PEAK_MEM = 'peak_mem'
@@ -687,6 +688,7 @@
     table = prettytable.PrettyTable(
         ["Operator",
           "#Hosts",
+          "#Inst"
           "Avg Time",
           "Std Dev",
           "Max Time",
@@ -698,6 +700,7 @@
     for row in self.rows:
       table_row = [ row[PREFIX] + row[OPERATOR],
           prettyprint_values(row[NUM_HOSTS]),
+          prettyprint_values(row[NUM_INSTANCES]),
           prettyprint_time(row[AVG_TIME]),
           prettyprint_time(row[STDDEV_TIME]),
           prettyprint_time(row[MAX_TIME]),
@@ -805,7 +808,7 @@
         ref_row = self.ref_combined_summary.rows[i]
 
         comparison_row = {}
-        for key in [PREFIX, OPERATOR, NUM_HOSTS, AVG_TIME, STDDEV_TIME,
+        for key in [PREFIX, OPERATOR, NUM_HOSTS, NUM_INSTANCES, AVG_TIME, STDDEV_TIME,
             MAX_TIME, PEAK_MEM, NUM_ROWS, EST_NUM_ROWS, EST_PEAK_MEM, DETAIL]:
           comparison_row[key] = row[key]
 
@@ -859,6 +862,7 @@
             comparison_row[RSTD], comparison_row[REF_RSTD])
 
         comparison_row[NUM_HOSTS] = row[NUM_HOSTS]
+        comparison_row[NUM_INSTANCES] = row[NUM_INSTANCES]
         comparison_row[NUM_ROWS] = row[NUM_ROWS]
         comparison_row[EST_NUM_ROWS] = row[EST_NUM_ROWS]
 
@@ -891,6 +895,7 @@
           'Base StdDev(%)',
           'Delta(StdDev(%))',
           '#Hosts',
+          '#Inst',
           '#Rows',
           'Est #Rows'])
     table.align = 'l'
@@ -903,6 +908,7 @@
           '{0:.2%}'.format(row[REF_RSTD]),
           '{0:+.2%}'.format(row[DELTA_RSTD]),
           prettyprint_values(row[NUM_HOSTS]),
+          prettyprint_values(row[NUM_INSTANCES]),
           prettyprint_values(row[NUM_ROWS]),
           prettyprint_values(row[EST_NUM_ROWS]) ]
 
@@ -943,6 +949,7 @@
           'Base Max',
           'Delta(Max)',
           '#Hosts',
+          '#Inst',
           '#Rows',
           'Est #Rows'])
     table.align = 'l'
@@ -961,6 +968,7 @@
             prettyprint_time(row[BASELINE_MAX]),
             prettyprint_percent(row[DELTA_MAX]),
             prettyprint_values(row[NUM_HOSTS]),
+            prettyprint_values(row[NUM_INSTANCES]),
             prettyprint_values(row[NUM_ROWS]),
             prettyprint_values(row[EST_NUM_ROWS])]
 
diff --git a/tests/shell/test_shell_commandline.py b/tests/shell/test_shell_commandline.py
index 718d934..3090cf7 100644
--- a/tests/shell/test_shell_commandline.py
+++ b/tests/shell/test_shell_commandline.py
@@ -327,7 +327,7 @@
     args = ['-p', '-q', 'select 1; profile;']
     result_set = run_impala_shell_cmd(vector, args)
     # This regex helps us uniquely identify a profile.
-    regex = re.compile("Operator\s+#Hosts\s+Avg\s+Time")
+    regex = re.compile("Operator\s+#Hosts\s+#Inst\s+Avg\s+Time")
     # We expect two query profiles.
     assert len(re.findall(regex, result_set.stdout)) == 2, \
         "Could not detect two profiles, stdout: %s" % result_set.stdout