IMPALA-9046: Profile counter that indicates if a JVM pause occurred
Adds a new section to the host profiles that includes JVM GC related
metrics. These metrics are taken from JMX and the JvmPauseMonitor.
The host profiles will now include a section like below:
JVM:
- GcCount: 19
- GcNumInfoThresholdExceeded: 0
- GcNumWarnThresholdExceeded: 0
- GcTimeMillis: 17s476ms
- GcTotalExtraSleepTimeMillis: 380
GcNumInfoThresholdExceeded, GcNumWarnThresholdExceeded, and
GcTotalExtraSleepTimeMillis are all taken from JvmPauseMonitor.
GcCount and GcTimeMillis are taken from JMX (specifically,
GarbageCollectorMXBean).
The counters themselves are derived from the impalad host-level metrics.
Changed the 'lock_' in JvmMetricCache (in memory-metrics.h) from a mutex
to a shared_mutex. Most accessors of the JvmMetricCache member variables
are read-only. A write only occurs lazily at most every second. This
should help reduce lock contention on JvmMetricCache now that all
queries will start accessing info stored by the JvmMetricCache.
Testing:
* Ran core tests
* Added a test that runs Java UDF, which triggers JVM GC
Change-Id: Idbaae2f9142b8be94532a0a147668a3d96091b0b
Reviewed-on: http://gerrit.cloudera.org:8080/16414
Reviewed-by: Tim Armstrong <tarmstrong@cloudera.com>
Reviewed-by: Sahil Takiar <stakiar@cloudera.com>
Tested-by: Sahil Takiar <stakiar@cloudera.com>
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 5bffa6e..a6ded7e 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -48,6 +48,7 @@
#include "util/container-util.h"
#include "util/debug-util.h"
#include "util/impalad-metrics.h"
+#include "util/memory-metrics.h"
#include "util/metrics.h"
#include "util/system-state-info.h"
#include "util/thread.h"
@@ -71,6 +72,26 @@
namespace impala {
+PROFILE_DEFINE_DERIVED_COUNTER(GcCount, STABLE_LOW, TUnit::UNIT,
+ "Per-Impalad Counter: The number of GC collections that have occurred in the Impala "
+ "process over the duration of the query. Reported by JMX.");
+PROFILE_DEFINE_DERIVED_COUNTER(GcTimeMillis, STABLE_LOW, TUnit::TIME_MS,
+ "Per-Impalad Counter: The amount of time spent in GC in the Impala process over the "
+ "duration of the query. Reported by JMX.");
+PROFILE_DEFINE_DERIVED_COUNTER(GcNumWarnThresholdExceeded, STABLE_LOW,
+ TUnit::UNIT,
+ "Per-Impalad Counter: The number of JVM process pauses that occurred in this Impala "
+ "process over the duration of the query. Tracks the number of pauses at the WARN "
+ "threshold. See JvmPauseMonitor for details. Reported by the JvmPauseMonitor.");
+PROFILE_DEFINE_DERIVED_COUNTER(GcNumInfoThresholdExceeded, STABLE_LOW,
+ TUnit::UNIT,
+ "Per-Impalad Counter: The number of JVM process pauses that occurred in this Impala "
+ "process over the duration of the query. Tracks the number of pauses at the INFO "
+ "threshold. See JvmPauseMonitor for details. Reported by the JvmPauseMonitor.");
+PROFILE_DEFINE_DERIVED_COUNTER(GcTotalExtraSleepTimeMillis, STABLE_LOW, TUnit::TIME_MS,
+ "Per-Impalad Counter: The amount of time the JVM process paused over the duration "
+ "of the query. See JvmPauseMonitor for details. Reported by the JvmPauseMonitor.");
+
QueryState::ScopedRef::ScopedRef(const TUniqueId& query_id) {
DCHECK(ExecEnv::GetInstance()->query_exec_mgr() != nullptr);
query_state_ = ExecEnv::GetInstance()->query_exec_mgr()->GetQueryState(query_id);
@@ -159,6 +180,45 @@
ExecEnv* exec_env = ExecEnv::GetInstance();
+ RuntimeProfile* jvm_host_profile = RuntimeProfile::Create(&obj_pool_, "JVM");
+ host_profile_->AddChild(jvm_host_profile);
+
+ int64_t gc_count = JvmMemoryCounterMetric::GC_COUNT->GetValue();
+ PROFILE_GcCount.Instantiate(jvm_host_profile,
+ [gc_count]() {
+ return JvmMemoryCounterMetric::GC_COUNT->GetValue() - gc_count;
+ });
+
+ int64_t gc_time_millis = JvmMemoryCounterMetric::GC_TIME_MILLIS->GetValue();
+ PROFILE_GcTimeMillis.Instantiate(jvm_host_profile,
+ [gc_time_millis]() {
+ return JvmMemoryCounterMetric::GC_TIME_MILLIS->GetValue() - gc_time_millis;
+ });
+
+ int64_t gc_num_warn_threshold_exceeded =
+ JvmMemoryCounterMetric::GC_NUM_WARN_THRESHOLD_EXCEEDED->GetValue();
+ PROFILE_GcNumWarnThresholdExceeded.Instantiate(jvm_host_profile,
+ [gc_num_warn_threshold_exceeded]() {
+ return JvmMemoryCounterMetric::GC_NUM_WARN_THRESHOLD_EXCEEDED->GetValue()
+ - gc_num_warn_threshold_exceeded;
+ });
+
+ int64_t gc_num_info_threshold_exceeded =
+ JvmMemoryCounterMetric::GC_NUM_INFO_THRESHOLD_EXCEEDED->GetValue();
+ PROFILE_GcNumInfoThresholdExceeded.Instantiate(jvm_host_profile,
+ [gc_num_info_threshold_exceeded]() {
+ return JvmMemoryCounterMetric::GC_NUM_INFO_THRESHOLD_EXCEEDED->GetValue()
+ - gc_num_info_threshold_exceeded;
+ });
+
+ int64_t gc_total_extra_sleep_time_millis =
+ JvmMemoryCounterMetric::GC_TOTAL_EXTRA_SLEEP_TIME_MILLIS->GetValue();
+ PROFILE_GcTotalExtraSleepTimeMillis.Instantiate(jvm_host_profile,
+ [gc_total_extra_sleep_time_millis]() {
+ return JvmMemoryCounterMetric::GC_TOTAL_EXTRA_SLEEP_TIME_MILLIS->GetValue()
+ - gc_total_extra_sleep_time_millis;
+ });
+
// Initialize resource tracking counters.
if (query_ctx().trace_resource_usage) {
SystemStateInfo* system_state_info = exec_env->system_state_info();
diff --git a/be/src/util/memory-metrics.cc b/be/src/util/memory-metrics.cc
index ccfc6b5..1a8ec9b 100644
--- a/be/src/util/memory-metrics.cc
+++ b/be/src/util/memory-metrics.cc
@@ -29,7 +29,10 @@
#include "util/time.h"
using boost::algorithm::to_lower;
-using std::lock_guard;
+using boost::shared_lock;
+using boost::shared_mutex;
+using boost::unique_lock;
+
using namespace impala;
using namespace strings;
@@ -61,6 +64,13 @@
JvmMemoryMetric* JvmMemoryMetric::HEAP_MAX_USAGE = nullptr;
JvmMemoryMetric* JvmMemoryMetric::NON_HEAP_COMMITTED = nullptr;
+JvmMemoryCounterMetric* JvmMemoryCounterMetric::GC_COUNT = nullptr;
+JvmMemoryCounterMetric* JvmMemoryCounterMetric::GC_TIME_MILLIS = nullptr;
+JvmMemoryCounterMetric* JvmMemoryCounterMetric::GC_NUM_WARN_THRESHOLD_EXCEEDED = nullptr;
+JvmMemoryCounterMetric* JvmMemoryCounterMetric::GC_NUM_INFO_THRESHOLD_EXCEEDED = nullptr;
+JvmMemoryCounterMetric* JvmMemoryCounterMetric::GC_TOTAL_EXTRA_SLEEP_TIME_MILLIS =
+ nullptr;
+
BufferPoolMetric* BufferPoolMetric::LIMIT = nullptr;
BufferPoolMetric* BufferPoolMetric::SYSTEM_ALLOCATED = nullptr;
BufferPoolMetric* BufferPoolMetric::RESERVED = nullptr;
@@ -209,22 +219,37 @@
JvmMemoryMetric::CreateAndRegister(
metrics, "jvm.$0.peak-init-usage-bytes", name, PEAK_INIT);
}
- JvmMemoryCounterMetric::CreateAndRegister(metrics, "jvm.gc_time_millis",
- [](const TGetJvmMemoryMetricsResponse& r) { return r.gc_time_millis; });
- JvmMemoryCounterMetric::CreateAndRegister(metrics, "jvm.gc_num_info_threshold_exceeded",
- [](const TGetJvmMemoryMetricsResponse& r) {
- return r.gc_num_info_threshold_exceeded;
- });
- JvmMemoryCounterMetric::CreateAndRegister(metrics, "jvm.gc_num_warn_threshold_exceeded",
- [](const TGetJvmMemoryMetricsResponse& r) {
- return r.gc_num_warn_threshold_exceeded;
- });
- JvmMemoryCounterMetric::CreateAndRegister(metrics, "jvm.gc_count",
- [](const TGetJvmMemoryMetricsResponse& r) { return r.gc_count; });
- JvmMemoryCounterMetric::CreateAndRegister(metrics,
- "jvm.gc_total_extra_sleep_time_millis", [](const TGetJvmMemoryMetricsResponse& r) {
- return r.gc_total_extra_sleep_time_millis;
- });
+
+ JvmMemoryCounterMetric::GC_TIME_MILLIS =
+ JvmMemoryCounterMetric::CreateAndRegister(metrics,
+ "jvm.gc_time_millis",
+ [](const TGetJvmMemoryMetricsResponse& r) {
+ return r.gc_time_millis;
+ });
+ JvmMemoryCounterMetric::GC_NUM_INFO_THRESHOLD_EXCEEDED =
+ JvmMemoryCounterMetric::CreateAndRegister(metrics,
+ "jvm.gc_num_info_threshold_exceeded",
+ [](const TGetJvmMemoryMetricsResponse& r) {
+ return r.gc_num_info_threshold_exceeded;
+ });
+ JvmMemoryCounterMetric::GC_NUM_WARN_THRESHOLD_EXCEEDED =
+ JvmMemoryCounterMetric::CreateAndRegister(metrics,
+ "jvm.gc_num_warn_threshold_exceeded",
+ [](const TGetJvmMemoryMetricsResponse& r) {
+ return r.gc_num_warn_threshold_exceeded;
+ });
+ JvmMemoryCounterMetric::GC_COUNT =
+ JvmMemoryCounterMetric::CreateAndRegister(metrics,
+ "jvm.gc_count",
+ [](const TGetJvmMemoryMetricsResponse& r) {
+ return r.gc_count;
+ });
+ JvmMemoryCounterMetric::GC_TOTAL_EXTRA_SLEEP_TIME_MILLIS =
+ JvmMemoryCounterMetric::CreateAndRegister(metrics,
+ "jvm.gc_total_extra_sleep_time_millis",
+ [](const TGetJvmMemoryMetricsResponse& r) {
+ return r.gc_total_extra_sleep_time_millis;
+ });
initialized_ = true;
}
@@ -251,6 +276,19 @@
}
void JvmMetricCache::GrabMetricsIfNecessary() {
+ // Acquire the read lock and check if the cache timeout has expired. Acquiring the read
+ // lock should be cheap, so optimize for the fast path where the timeout has not been
+ // hit.
+ {
+ shared_lock<shared_mutex> shared_l(lock_);
+ int64_t now = MonotonicMillis();
+ if (now - last_fetch_ < CACHE_PERIOD_MILLIS) return;
+ }
+
+ // If the timeout has been hit, then refresh 'last_response_'. Since the read lock is
+ // released before the write lock is acquired, it is possible another thread
+ // already refreshed 'last_response_' so check the value of 'last_fetch_' again.
+ unique_lock<shared_mutex> unique_l(lock_);
int64_t now = MonotonicMillis();
if (now - last_fetch_ < CACHE_PERIOD_MILLIS) return;
Status status = JniUtil::GetJvmMemoryMetrics(&last_response_);
@@ -264,16 +302,16 @@
int64_t JvmMetricCache::GetCounterMetric(
int64_t(*accessor)(const TGetJvmMemoryMetricsResponse&)) {
- lock_guard<std::mutex> lock_guard(lock_);
GrabMetricsIfNecessary();
+ shared_lock<shared_mutex> shared_l(lock_);
return accessor(last_response_);
}
int64_t JvmMetricCache::GetPoolMetric(const std::string& mempool_name,
JvmMemoryMetricType type) {
- lock_guard<std::mutex> lock_guard(lock_);
GrabMetricsIfNecessary();
+ shared_lock<shared_mutex> shared_l(lock_);
for (const TJvmMemoryPool& pool : last_response_.memory_pools) {
if (pool.name == mempool_name) {
switch (type) {
@@ -303,9 +341,9 @@
}
vector<string> JvmMetricCache::GetPoolNames() {
- lock_guard<std::mutex> lock_guard(lock_);
GrabMetricsIfNecessary();
vector<string> names;
+ shared_lock<shared_mutex> shared_l(lock_);
for (const TJvmMemoryPool& usage: last_response_.memory_pools) {
names.push_back(usage.name);
}
diff --git a/be/src/util/memory-metrics.h b/be/src/util/memory-metrics.h
index e3b9f51..977e44d 100644
--- a/be/src/util/memory-metrics.h
+++ b/be/src/util/memory-metrics.h
@@ -19,7 +19,7 @@
#include "util/metrics.h"
-#include <mutex>
+#include <boost/thread/shared_mutex.hpp>
#include <gperftools/malloc_extension.h>
#if defined(ADDRESS_SANITIZER) || defined(THREAD_SANITIZER)
#include <sanitizer/allocator_interface.h>
@@ -179,13 +179,18 @@
static JvmMetricCache* GetInstance();
private:
- /// Updates metrics if over CACHE_PERIOD_MILLIS has elapsed.
+ /// Updates metrics if over CACHE_PERIOD_MILLIS has elapsed. Thread safe.
void GrabMetricsIfNecessary();
- std::mutex lock_;
+ /// A shared lock that allows low-overhead reads of last_fetch_ and last_response_.
+ /// The write lock is only acquired when the CACHE_PERIOD_MILLIS timeout expires and
+ /// last_response_ and last_fetch_ need to be updated.
+ boost::shared_mutex lock_;
+
/// Time when metrics were last fetched, using MonotonicMillis().
/// Protected by lock_.
int64_t last_fetch_ = 0;
+
/// Last available metrics.
/// Protected by lock_.
TGetJvmMemoryMetricsResponse last_response_;
@@ -243,7 +248,14 @@
// A counter that represents metrics about JVM Memory. It acesses the underlying
// data via JniUtil::GetJvmMemoryMetrics() via JvmMetricCache.
class JvmMemoryCounterMetric : public IntCounter {
- virtual int64_t GetValue() override;
+ public:
+ virtual int64_t GetValue() override;
+
+ static JvmMemoryCounterMetric* GC_COUNT;
+ static JvmMemoryCounterMetric* GC_TIME_MILLIS;
+ static JvmMemoryCounterMetric* GC_NUM_WARN_THRESHOLD_EXCEEDED;
+ static JvmMemoryCounterMetric* GC_NUM_INFO_THRESHOLD_EXCEEDED;
+ static JvmMemoryCounterMetric* GC_TOTAL_EXTRA_SLEEP_TIME_MILLIS;
private:
friend class JvmMemoryMetric;
diff --git a/tests/custom_cluster/test_observability.py b/tests/custom_cluster/test_observability.py
new file mode 100644
index 0000000..4a4a5a9
--- /dev/null
+++ b/tests/custom_cluster/test_observability.py
@@ -0,0 +1,49 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+import re
+
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.util.filesystem_utils import get_fs_path
+from tests.util.parse_util import parse_duration_string_ms
+
+
+class TestObservability(CustomClusterTestSuite):
+ @classmethod
+ def get_workload(self):
+ return 'functional-query'
+
+ @pytest.mark.execute_serially
+ def test_host_profile_jvm_gc_metrics(self, unique_database):
+ self.execute_query_expect_success(self.client,
+ "create function {0}.gc(int) returns int location '{1}' \
+ symbol='org.apache.impala.JavaGcUdfTest'".format(
+ unique_database, get_fs_path('/test-warehouse/impala-hive-udfs.jar')))
+ profile = self.execute_query_expect_success(self.client,
+ "select {0}.gc(int_col) from functional.alltypes limit 1000".format(
+ unique_database)).runtime_profile
+
+ gc_count_regex = "GcCount:.*\((.*)\)"
+ gc_count_match = re.search(gc_count_regex, profile)
+ assert gc_count_match, profile
+ assert int(gc_count_match.group(1)) > 0, profile
+
+ gc_time_millis_regex = "GcTimeMillis: (.*)"
+ gc_time_millis_match = re.search(gc_time_millis_regex, profile)
+ assert gc_time_millis_match, profile
+ assert parse_duration_string_ms(gc_time_millis_match.group(1)) > 0
diff --git a/tests/test-hive-udfs/src/main/java/org/apache/impala/JavaGcUdfTest.java b/tests/test-hive-udfs/src/main/java/org/apache/impala/JavaGcUdfTest.java
new file mode 100644
index 0000000..62d5027
--- /dev/null
+++ b/tests/test-hive-udfs/src/main/java/org/apache/impala/JavaGcUdfTest.java
@@ -0,0 +1,40 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala;
+
+import org.apache.hadoop.hive.ql.exec.UDF;
+import org.apache.hadoop.io.IntWritable;
+
+import java.lang.System;
+
+public class JavaGcUdfTest extends UDF {
+ public JavaGcUdfTest() {}
+
+ public IntWritable evaluate(IntWritable a) {
+ if (a == null) return null;
+ gc();
+ return new IntWritable(a.get());
+ }
+
+ private void gc() {
+ for (int i = 0; i < 100; i++) {
+ int[] gc_array = new int[100];
+ }
+ System.gc();
+ }
+}