IMPALA-9046: Profile counter that indicates if a JVM pause occurred

Adds a new section to the host profiles that includes JVM GC related
metrics. These metrics are taken from JMX and the JvmPauseMonitor.

The host profiles will now include a section like below:

        JVM:
           - GcCount: 19
           - GcNumInfoThresholdExceeded: 0
           - GcNumWarnThresholdExceeded: 0
           - GcTimeMillis: 17s476ms
           - GcTotalExtraSleepTimeMillis: 380

GcNumInfoThresholdExceeded, GcNumWarnThresholdExceeded, and
GcTotalExtraSleepTimeMillis are all taken from JvmPauseMonitor.
GcCount and GcTimeMillis are taken from JMX (specifically,
GarbageCollectorMXBean).

The counters themselves are derived from the impalad host-level metrics.

Changed the 'lock_' in JvmMetricCache (in memory-metrics.h) from a mutex
to a shared_mutex. Most accessors of the JvmMetricCache member variables
are read-only. A write only occurs lazily at most every second. This
should help reduce lock contention on JvmMetricCache now that all
queries will start accessing info stored by the JvmMetricCache.

Testing:
* Ran core tests
* Added a test that runs Java UDF, which triggers JVM GC

Change-Id: Idbaae2f9142b8be94532a0a147668a3d96091b0b
Reviewed-on: http://gerrit.cloudera.org:8080/16414
Reviewed-by: Tim Armstrong <tarmstrong@cloudera.com>
Reviewed-by: Sahil Takiar <stakiar@cloudera.com>
Tested-by: Sahil Takiar <stakiar@cloudera.com>
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 5bffa6e..a6ded7e 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -48,6 +48,7 @@
 #include "util/container-util.h"
 #include "util/debug-util.h"
 #include "util/impalad-metrics.h"
+#include "util/memory-metrics.h"
 #include "util/metrics.h"
 #include "util/system-state-info.h"
 #include "util/thread.h"
@@ -71,6 +72,26 @@
 
 namespace impala {
 
+PROFILE_DEFINE_DERIVED_COUNTER(GcCount, STABLE_LOW, TUnit::UNIT,
+    "Per-Impalad Counter: The number of GC collections that have occurred in the Impala "
+    "process over the duration of the query. Reported by JMX.");
+PROFILE_DEFINE_DERIVED_COUNTER(GcTimeMillis, STABLE_LOW, TUnit::TIME_MS,
+    "Per-Impalad Counter: The amount of time spent in GC in the Impala process over the "
+    "duration of the query. Reported by JMX.");
+PROFILE_DEFINE_DERIVED_COUNTER(GcNumWarnThresholdExceeded, STABLE_LOW,
+    TUnit::UNIT,
+    "Per-Impalad Counter: The number of JVM process pauses that occurred in this Impala "
+    "process over the duration of the query. Tracks the number of pauses at the WARN "
+    "threshold. See JvmPauseMonitor for details. Reported by the JvmPauseMonitor.");
+PROFILE_DEFINE_DERIVED_COUNTER(GcNumInfoThresholdExceeded, STABLE_LOW,
+    TUnit::UNIT,
+    "Per-Impalad Counter: The number of JVM process pauses that occurred in this Impala "
+    "process over the duration of the query. Tracks the number of pauses at the INFO "
+    "threshold. See JvmPauseMonitor for details. Reported by the JvmPauseMonitor.");
+PROFILE_DEFINE_DERIVED_COUNTER(GcTotalExtraSleepTimeMillis, STABLE_LOW, TUnit::TIME_MS,
+    "Per-Impalad Counter: The amount of time the JVM process paused over the duration "
+    "of the query. See JvmPauseMonitor for details. Reported by the JvmPauseMonitor.");
+
 QueryState::ScopedRef::ScopedRef(const TUniqueId& query_id) {
   DCHECK(ExecEnv::GetInstance()->query_exec_mgr() != nullptr);
   query_state_ = ExecEnv::GetInstance()->query_exec_mgr()->GetQueryState(query_id);
@@ -159,6 +180,45 @@
 
   ExecEnv* exec_env = ExecEnv::GetInstance();
 
+  RuntimeProfile* jvm_host_profile = RuntimeProfile::Create(&obj_pool_, "JVM");
+  host_profile_->AddChild(jvm_host_profile);
+
+  int64_t gc_count = JvmMemoryCounterMetric::GC_COUNT->GetValue();
+  PROFILE_GcCount.Instantiate(jvm_host_profile,
+      [gc_count]() {
+        return JvmMemoryCounterMetric::GC_COUNT->GetValue() - gc_count;
+      });
+
+  int64_t gc_time_millis = JvmMemoryCounterMetric::GC_TIME_MILLIS->GetValue();
+  PROFILE_GcTimeMillis.Instantiate(jvm_host_profile,
+      [gc_time_millis]() {
+        return JvmMemoryCounterMetric::GC_TIME_MILLIS->GetValue() - gc_time_millis;
+      });
+
+  int64_t gc_num_warn_threshold_exceeded =
+      JvmMemoryCounterMetric::GC_NUM_WARN_THRESHOLD_EXCEEDED->GetValue();
+  PROFILE_GcNumWarnThresholdExceeded.Instantiate(jvm_host_profile,
+      [gc_num_warn_threshold_exceeded]() {
+        return JvmMemoryCounterMetric::GC_NUM_WARN_THRESHOLD_EXCEEDED->GetValue()
+            - gc_num_warn_threshold_exceeded;
+      });
+
+  int64_t gc_num_info_threshold_exceeded =
+      JvmMemoryCounterMetric::GC_NUM_INFO_THRESHOLD_EXCEEDED->GetValue();
+  PROFILE_GcNumInfoThresholdExceeded.Instantiate(jvm_host_profile,
+      [gc_num_info_threshold_exceeded]() {
+        return JvmMemoryCounterMetric::GC_NUM_INFO_THRESHOLD_EXCEEDED->GetValue()
+            - gc_num_info_threshold_exceeded;
+      });
+
+  int64_t gc_total_extra_sleep_time_millis =
+      JvmMemoryCounterMetric::GC_TOTAL_EXTRA_SLEEP_TIME_MILLIS->GetValue();
+  PROFILE_GcTotalExtraSleepTimeMillis.Instantiate(jvm_host_profile,
+      [gc_total_extra_sleep_time_millis]() {
+        return JvmMemoryCounterMetric::GC_TOTAL_EXTRA_SLEEP_TIME_MILLIS->GetValue()
+            - gc_total_extra_sleep_time_millis;
+      });
+
   // Initialize resource tracking counters.
   if (query_ctx().trace_resource_usage) {
     SystemStateInfo* system_state_info = exec_env->system_state_info();
diff --git a/be/src/util/memory-metrics.cc b/be/src/util/memory-metrics.cc
index ccfc6b5..1a8ec9b 100644
--- a/be/src/util/memory-metrics.cc
+++ b/be/src/util/memory-metrics.cc
@@ -29,7 +29,10 @@
 #include "util/time.h"
 
 using boost::algorithm::to_lower;
-using std::lock_guard;
+using boost::shared_lock;
+using boost::shared_mutex;
+using boost::unique_lock;
+
 using namespace impala;
 using namespace strings;
 
@@ -61,6 +64,13 @@
 JvmMemoryMetric* JvmMemoryMetric::HEAP_MAX_USAGE = nullptr;
 JvmMemoryMetric* JvmMemoryMetric::NON_HEAP_COMMITTED = nullptr;
 
+JvmMemoryCounterMetric* JvmMemoryCounterMetric::GC_COUNT = nullptr;
+JvmMemoryCounterMetric* JvmMemoryCounterMetric::GC_TIME_MILLIS = nullptr;
+JvmMemoryCounterMetric* JvmMemoryCounterMetric::GC_NUM_WARN_THRESHOLD_EXCEEDED = nullptr;
+JvmMemoryCounterMetric* JvmMemoryCounterMetric::GC_NUM_INFO_THRESHOLD_EXCEEDED = nullptr;
+JvmMemoryCounterMetric* JvmMemoryCounterMetric::GC_TOTAL_EXTRA_SLEEP_TIME_MILLIS =
+    nullptr;
+
 BufferPoolMetric* BufferPoolMetric::LIMIT = nullptr;
 BufferPoolMetric* BufferPoolMetric::SYSTEM_ALLOCATED = nullptr;
 BufferPoolMetric* BufferPoolMetric::RESERVED = nullptr;
@@ -209,22 +219,37 @@
     JvmMemoryMetric::CreateAndRegister(
         metrics, "jvm.$0.peak-init-usage-bytes", name, PEAK_INIT);
   }
-  JvmMemoryCounterMetric::CreateAndRegister(metrics, "jvm.gc_time_millis",
-      [](const TGetJvmMemoryMetricsResponse& r) { return r.gc_time_millis; });
-  JvmMemoryCounterMetric::CreateAndRegister(metrics, "jvm.gc_num_info_threshold_exceeded",
-      [](const TGetJvmMemoryMetricsResponse& r) {
-      return r.gc_num_info_threshold_exceeded;
-  });
-  JvmMemoryCounterMetric::CreateAndRegister(metrics, "jvm.gc_num_warn_threshold_exceeded",
-      [](const TGetJvmMemoryMetricsResponse& r) {
-      return r.gc_num_warn_threshold_exceeded;
-  });
-  JvmMemoryCounterMetric::CreateAndRegister(metrics, "jvm.gc_count",
-      [](const TGetJvmMemoryMetricsResponse& r) { return r.gc_count; });
-  JvmMemoryCounterMetric::CreateAndRegister(metrics,
-      "jvm.gc_total_extra_sleep_time_millis", [](const TGetJvmMemoryMetricsResponse& r) {
-      return r.gc_total_extra_sleep_time_millis;
-  });
+
+  JvmMemoryCounterMetric::GC_TIME_MILLIS =
+      JvmMemoryCounterMetric::CreateAndRegister(metrics,
+          "jvm.gc_time_millis",
+          [](const TGetJvmMemoryMetricsResponse& r) {
+          return r.gc_time_millis;
+          });
+  JvmMemoryCounterMetric::GC_NUM_INFO_THRESHOLD_EXCEEDED =
+      JvmMemoryCounterMetric::CreateAndRegister(metrics,
+          "jvm.gc_num_info_threshold_exceeded",
+          [](const TGetJvmMemoryMetricsResponse& r) {
+            return r.gc_num_info_threshold_exceeded;
+          });
+  JvmMemoryCounterMetric::GC_NUM_WARN_THRESHOLD_EXCEEDED =
+      JvmMemoryCounterMetric::CreateAndRegister(metrics,
+          "jvm.gc_num_warn_threshold_exceeded",
+          [](const TGetJvmMemoryMetricsResponse& r) {
+            return r.gc_num_warn_threshold_exceeded;
+          });
+  JvmMemoryCounterMetric::GC_COUNT =
+    JvmMemoryCounterMetric::CreateAndRegister(metrics,
+          "jvm.gc_count",
+          [](const TGetJvmMemoryMetricsResponse& r) {
+            return r.gc_count;
+          });
+  JvmMemoryCounterMetric::GC_TOTAL_EXTRA_SLEEP_TIME_MILLIS =
+      JvmMemoryCounterMetric::CreateAndRegister(metrics,
+          "jvm.gc_total_extra_sleep_time_millis",
+          [](const TGetJvmMemoryMetricsResponse& r) {
+            return r.gc_total_extra_sleep_time_millis;
+          });
   initialized_ = true;
 }
 
@@ -251,6 +276,19 @@
 }
 
 void JvmMetricCache::GrabMetricsIfNecessary() {
+  // Acquire the read lock and check if the cache timeout has expired. Acquiring the read
+  // lock should be cheap, so optimize for the fast path where the timeout has not been
+  // hit.
+  {
+    shared_lock<shared_mutex> shared_l(lock_);
+    int64_t now = MonotonicMillis();
+    if (now - last_fetch_ < CACHE_PERIOD_MILLIS) return;
+  }
+
+  // If the timeout has been hit, then refresh 'last_response_'. Since the read lock is
+  // released before the write lock is acquired, it is possible another thread
+  // already refreshed 'last_response_' so check the value of 'last_fetch_' again.
+  unique_lock<shared_mutex> unique_l(lock_);
   int64_t now = MonotonicMillis();
   if (now - last_fetch_ < CACHE_PERIOD_MILLIS) return;
   Status status = JniUtil::GetJvmMemoryMetrics(&last_response_);
@@ -264,16 +302,16 @@
 
 int64_t JvmMetricCache::GetCounterMetric(
     int64_t(*accessor)(const TGetJvmMemoryMetricsResponse&)) {
-  lock_guard<std::mutex> lock_guard(lock_);
   GrabMetricsIfNecessary();
+  shared_lock<shared_mutex> shared_l(lock_);
   return accessor(last_response_);
 }
 
 int64_t JvmMetricCache::GetPoolMetric(const std::string& mempool_name,
     JvmMemoryMetricType type) {
-  lock_guard<std::mutex> lock_guard(lock_);
   GrabMetricsIfNecessary();
 
+  shared_lock<shared_mutex> shared_l(lock_);
   for (const TJvmMemoryPool& pool : last_response_.memory_pools) {
     if (pool.name == mempool_name) {
       switch (type) {
@@ -303,9 +341,9 @@
 }
 
 vector<string> JvmMetricCache::GetPoolNames() {
-  lock_guard<std::mutex> lock_guard(lock_);
   GrabMetricsIfNecessary();
   vector<string> names;
+  shared_lock<shared_mutex> shared_l(lock_);
   for (const TJvmMemoryPool& usage: last_response_.memory_pools) {
     names.push_back(usage.name);
   }
diff --git a/be/src/util/memory-metrics.h b/be/src/util/memory-metrics.h
index e3b9f51..977e44d 100644
--- a/be/src/util/memory-metrics.h
+++ b/be/src/util/memory-metrics.h
@@ -19,7 +19,7 @@
 
 #include "util/metrics.h"
 
-#include <mutex>
+#include <boost/thread/shared_mutex.hpp>
 #include <gperftools/malloc_extension.h>
 #if defined(ADDRESS_SANITIZER) || defined(THREAD_SANITIZER)
 #include <sanitizer/allocator_interface.h>
@@ -179,13 +179,18 @@
     static JvmMetricCache* GetInstance();
 
   private:
-    /// Updates metrics if over CACHE_PERIOD_MILLIS has elapsed.
+    /// Updates metrics if over CACHE_PERIOD_MILLIS has elapsed. Thread safe.
     void GrabMetricsIfNecessary();
 
-    std::mutex lock_;
+    /// A shared lock that allows low-overhead reads of last_fetch_ and last_response_.
+    /// The write lock is only acquired when the CACHE_PERIOD_MILLIS timeout expires and
+    /// last_response_ and last_fetch_ need to be updated.
+    boost::shared_mutex lock_;
+
     /// Time when metrics were last fetched, using MonotonicMillis().
     /// Protected by lock_.
     int64_t last_fetch_ = 0;
+
     /// Last available metrics.
     /// Protected by lock_.
     TGetJvmMemoryMetricsResponse last_response_;
@@ -243,7 +248,14 @@
 // A counter that represents metrics about JVM Memory. It acesses the underlying
 // data via JniUtil::GetJvmMemoryMetrics() via JvmMetricCache.
 class JvmMemoryCounterMetric : public IntCounter {
-  virtual int64_t GetValue() override;
+  public:
+    virtual int64_t GetValue() override;
+
+    static JvmMemoryCounterMetric* GC_COUNT;
+    static JvmMemoryCounterMetric* GC_TIME_MILLIS;
+    static JvmMemoryCounterMetric* GC_NUM_WARN_THRESHOLD_EXCEEDED;
+    static JvmMemoryCounterMetric* GC_NUM_INFO_THRESHOLD_EXCEEDED;
+    static JvmMemoryCounterMetric* GC_TOTAL_EXTRA_SLEEP_TIME_MILLIS;
 
   private:
     friend class JvmMemoryMetric;
diff --git a/tests/custom_cluster/test_observability.py b/tests/custom_cluster/test_observability.py
new file mode 100644
index 0000000..4a4a5a9
--- /dev/null
+++ b/tests/custom_cluster/test_observability.py
@@ -0,0 +1,49 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+import re
+
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.util.filesystem_utils import get_fs_path
+from tests.util.parse_util import parse_duration_string_ms
+
+
+class TestObservability(CustomClusterTestSuite):
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
+  @pytest.mark.execute_serially
+  def test_host_profile_jvm_gc_metrics(self, unique_database):
+    self.execute_query_expect_success(self.client,
+        "create function {0}.gc(int) returns int location '{1}' \
+        symbol='org.apache.impala.JavaGcUdfTest'".format(
+            unique_database, get_fs_path('/test-warehouse/impala-hive-udfs.jar')))
+    profile = self.execute_query_expect_success(self.client,
+        "select {0}.gc(int_col) from functional.alltypes limit 1000".format(
+            unique_database)).runtime_profile
+
+    gc_count_regex = "GcCount:.*\((.*)\)"
+    gc_count_match = re.search(gc_count_regex, profile)
+    assert gc_count_match, profile
+    assert int(gc_count_match.group(1)) > 0, profile
+
+    gc_time_millis_regex = "GcTimeMillis: (.*)"
+    gc_time_millis_match = re.search(gc_time_millis_regex, profile)
+    assert gc_time_millis_match, profile
+    assert parse_duration_string_ms(gc_time_millis_match.group(1)) > 0
diff --git a/tests/test-hive-udfs/src/main/java/org/apache/impala/JavaGcUdfTest.java b/tests/test-hive-udfs/src/main/java/org/apache/impala/JavaGcUdfTest.java
new file mode 100644
index 0000000..62d5027
--- /dev/null
+++ b/tests/test-hive-udfs/src/main/java/org/apache/impala/JavaGcUdfTest.java
@@ -0,0 +1,40 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala;
+
+import org.apache.hadoop.hive.ql.exec.UDF;
+import org.apache.hadoop.io.IntWritable;
+
+import java.lang.System;
+
+public class JavaGcUdfTest extends UDF {
+  public JavaGcUdfTest() {}
+
+  public IntWritable evaluate(IntWritable a) {
+    if (a == null) return null;
+    gc();
+    return new IntWritable(a.get());
+  }
+
+  private void gc() {
+    for (int i = 0; i < 100; i++) {
+      int[] gc_array = new int[100];
+    }
+    System.gc();
+  }
+}