blob: ee684700707040d11205fc7e19b6b058dd29cbaf [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "util/thread.h"
#include <set>
#include <map>
#include <unistd.h>
#include <sys/syscall.h>
#include <sys/types.h>
#include "common/thread-debug-info.h"
#include "util/coding-util.h"
#include "util/debug-util.h"
#include "util/error-util.h"
#include "util/jni-util.h"
#include "util/metrics.h"
#include "util/webserver.h"
#include "util/os-util.h"
#include "common/names.h"
#ifndef NDEBUG
DECLARE_bool(thread_creation_fault_injection);
#endif
namespace this_thread = boost::this_thread;
using namespace rapidjson;
namespace impala {
static const string JVM_THREADS_WEB_PAGE = "/jvm-threadz";
static const string JVM_THREADS_TEMPLATE = "jvm-threadz.tmpl";
static const string THREADS_WEB_PAGE = "/threadz";
static const string THREADS_TEMPLATE = "threadz.tmpl";
static const string THREAD_GROUP_WEB_PAGE = "/thread-group";
static const string THREAD_GROUP_TEMPLATE = "/thread-group.tmpl";
class ThreadMgr;
// Singleton instance of ThreadMgr. Only visible in this file, used only by Thread.
// The Thread class adds a reference to thread_manager while it is supervising a thread so
// that a race between the end of the process's main thread (and therefore the destruction
// of thread_manager) and the end of a thread that tries to remove itself from the
// manager after the destruction can be avoided.
shared_ptr<ThreadMgr> thread_manager;
namespace {
// Example output:
// "overview" : {
// "thread_count" : 30,
// "daemon_count" : 4,
// "peak_count" : 40
// }
// "threads": [
// {
// "summary" : "main ID:1 RUNNABLE",
// "cpu_time_sec" : 1.303,
// "user_time_sec" : 2.323,
// "blocked_time_ms" : -1,
// "blocked_count" : 20,
// "is_native" : false
// },
// { ... }
// ]
void JvmThreadsUrlCallback(const Webserver::WebRequest& req, Document* doc);
void ThreadOverviewUrlCallback(bool include_jvm_threads,
const Webserver::WebRequest& req, Document* document);
}
// A singleton class that tracks all live threads, and groups them together for easy
// auditing. Used only by Thread.
class ThreadMgr {
public:
ThreadMgr() : metrics_enabled_(false) { }
Status StartInstrumentation(MetricGroup* metrics);
// Registers a thread to the supplied category. The key is a boost::thread::id, used
// instead of the system TID since boost::thread::id is always available, unlike
// gettid() which might fail.
void AddThread(const thread::id& thread, const string& name, const string& category,
int64_t tid);
// Removes a thread from the supplied category. If the thread has
// already been removed, this is a no-op.
void RemoveThread(const thread::id& boost_id, const string& category);
// Example output:
// "total_threads": 144,
// "thread-groups": [
// {
// "name": "common",
// "size": 1
// },
// {
// "name": "disk-io-mgr",
// "size": 2
// },
// {
// "name": "hdfs-worker-pool",
// "size": 16
// },
// ... etc ...
// ]
void GetThreadOverview(Document* document);
// Example output:
// "thread-group": {
// "category": "disk-io-mgr",
// "size": 2
// },
// "threads": [
// {
// "name": "work-loop(Disk: 0, Thread: 0)-17049",
// "user_ns": 0,
// "kernel_ns": 0,
// "iowait_ns": 0
// },
// {
// "name": "work-loop(Disk: 1, Thread: 0)-17050",
// "user_ns": 0,
// "kernel_ns": 0,
// "iowait_ns": 0
// }
// ]
void ThreadGroupUrlCallback(const Webserver::WebRequest& req, Document* output);
private:
// Container class for any details we want to capture about a thread
// TODO: Add start-time.
// TODO: Track fragment ID.
class ThreadDescriptor {
public:
ThreadDescriptor() { }
ThreadDescriptor(const string& category, const string& name, int64_t thread_id)
: name_(name), category_(category), thread_id_(thread_id) {
}
const string& name() const { return name_; }
const string& category() const { return category_; }
int64_t thread_id() const { return thread_id_; }
private:
string name_;
string category_;
int64_t thread_id_;
};
// A ThreadCategory is a set of threads that are logically related.
// TODO: unordered_map is incompatible with boost::thread::id, but would be more
// efficient here.
struct ThreadCategory {
int64_t num_threads_created;
map<const thread::id, ThreadDescriptor> threads_by_id;
};
// All thread categorys, keyed on the category name.
typedef map<string, ThreadCategory> ThreadCategoryMap;
// Protects thread_categories_ and metrics_enabled_
mutex lock_;
// All thread categorys that ever contained a thread, even if empty
ThreadCategoryMap thread_categories_;
// True after StartInstrumentation(..) returns
bool metrics_enabled_;
// Metrics to track all-time total number of threads, and the
// current number of running threads.
IntGauge* total_threads_metric_;
IntGauge* current_num_threads_metric_;
};
Status ThreadMgr::StartInstrumentation(MetricGroup* metrics) {
DCHECK(metrics != NULL);
lock_guard<mutex> l(lock_);
metrics_enabled_ = true;
total_threads_metric_ = metrics->AddGauge("thread-manager.total-threads-created", 0L);
current_num_threads_metric_ = metrics->AddGauge("thread-manager.running-threads", 0L);
return Status::OK();
}
void ThreadMgr::AddThread(const thread::id& thread, const string& name,
const string& category, int64_t tid) {
lock_guard<mutex> l(lock_);
ThreadCategory& thread_category = thread_categories_[category];
thread_category.threads_by_id[thread] = ThreadDescriptor(category, name, tid);
++thread_category.num_threads_created;
if (metrics_enabled_) {
current_num_threads_metric_->Increment(1L);
total_threads_metric_->Increment(1L);
}
}
void ThreadMgr::RemoveThread(const thread::id& boost_id, const string& category) {
lock_guard<mutex> l(lock_);
ThreadCategoryMap::iterator category_it = thread_categories_.find(category);
DCHECK(category_it != thread_categories_.end());
category_it->second.threads_by_id.erase(boost_id);
if (metrics_enabled_) current_num_threads_metric_->Increment(-1L);
}
void ThreadMgr::GetThreadOverview(Document* document) {
lock_guard<mutex> l(lock_);
if (metrics_enabled_) {
document->AddMember("total_threads", current_num_threads_metric_->GetValue(),
document->GetAllocator());
}
Value lst(kArrayType);
for (const ThreadCategoryMap::value_type& category: thread_categories_) {
Value val(kObjectType);
Value name(category.first.c_str(), document->GetAllocator());
val.AddMember("name", name, document->GetAllocator());
val.AddMember("size", static_cast<uint64_t>(category.second.threads_by_id.size()),
document->GetAllocator());
val.AddMember("num_created",
static_cast<uint64_t>(category.second.num_threads_created),
document->GetAllocator());
// TODO: URLEncode() name?
lst.PushBack(val, document->GetAllocator());
}
document->AddMember("thread-groups", lst, document->GetAllocator());
}
void ThreadMgr::ThreadGroupUrlCallback(const Webserver::WebRequest& req,
Document* document) {
lock_guard<mutex> l(lock_);
vector<const ThreadCategory*> categories_to_print;
const auto& args = req.parsed_args;
Webserver::ArgumentMap::const_iterator category_it = args.find("group");
string category_name = (category_it == args.end()) ? "all" : category_it->second;
if (category_name != "all") {
ThreadCategoryMap::const_iterator category =
thread_categories_.find(category_name);
if (category == thread_categories_.end()) {
return;
}
categories_to_print.push_back(&category->second);
Value val(kObjectType);
Value name(category->first.c_str(), document->GetAllocator());
val.AddMember("category", name, document->GetAllocator());
val.AddMember("size", static_cast<uint64_t>(category->second.threads_by_id.size()),
document->GetAllocator());
document->AddMember("thread-group", val, document->GetAllocator());
} else {
for (const ThreadCategoryMap::value_type& category: thread_categories_) {
categories_to_print.push_back(&category.second);
}
}
Value lst(kArrayType);
for (const ThreadCategory* category: categories_to_print) {
for (const auto& thread : category->threads_by_id) {
Value val(kObjectType);
Value name(thread.second.name().c_str(), document->GetAllocator());
val.AddMember("name", name, document->GetAllocator());
val.AddMember("id", thread.second.thread_id(), document->GetAllocator());
ThreadStats stats;
Status status = GetThreadStats(thread.second.thread_id(), &stats);
if (!status.ok()) {
LOG_EVERY_N(INFO, 100) << "Could not get per-thread statistics: "
<< status.GetDetail();
} else {
val.AddMember("user_ns", static_cast<double>(stats.user_ns) / 1e9,
document->GetAllocator());
val.AddMember("kernel_ns", static_cast<double>(stats.kernel_ns) / 1e9,
document->GetAllocator());
val.AddMember("iowait_ns", static_cast<double>(stats.iowait_ns) / 1e9,
document->GetAllocator());
}
lst.PushBack(val, document->GetAllocator());
}
}
document->AddMember("threads", lst, document->GetAllocator());
}
Status Thread::StartThread(const std::string& category, const std::string& name,
const ThreadFunctor& functor, unique_ptr<Thread>* thread,
bool fault_injection_eligible) {
DCHECK(thread_manager.get() != nullptr)
<< "Thread created before InitThreading called";
DCHECK(thread->get() == nullptr);
#ifndef NDEBUG
if (fault_injection_eligible && FLAGS_thread_creation_fault_injection) {
// Fail roughly 1% of the time on eligible codepaths.
if ((rand() % 100) == 1) {
return Status(Substitute("Fake thread creation failure (category: $0, name: $1)",
category, name));
}
}
#endif
unique_ptr<Thread> t(new Thread(category, name));
Promise<int64_t> thread_started;
try {
t->thread_.reset(
new boost::thread(&Thread::SuperviseThread, t->name_, t->category_, functor,
GetThreadDebugInfo(), &thread_started));
} catch (boost::thread_resource_error& e) {
return Status(TErrorCode::THREAD_CREATION_FAILED, name, category, e.what());
}
// TODO: This slows down thread creation although not enormously. To make this faster,
// consider delaying thread_started.Get() until the first call to tid(), but bear in
// mind that some coordination is required between SuperviseThread() and this to make
// sure that the thread is still available to have its tid set.
t->tid_ = thread_started.Get();
VLOG(2) << "Started thread " << t->tid() << " - " << category << ":" << name;
*thread = move(t);
return Status::OK();
}
void Thread::SuperviseThread(const string& name, const string& category,
Thread::ThreadFunctor functor, const ThreadDebugInfo* parent_thread_info,
Promise<int64_t>* thread_started) {
int64_t system_tid = syscall(SYS_gettid);
if (system_tid == -1) {
string error_msg = GetStrErrMsg();
LOG_EVERY_N(INFO, 100) << "Could not determine thread ID: " << error_msg;
}
// Make a copy, since we want to refer to these variables after the unsafe point below.
string category_copy = category.empty() ? "no-category" : category;;
shared_ptr<ThreadMgr> thread_mgr_ref = thread_manager;
string name_copy = name.empty() ? Substitute("thread-$0", system_tid) : name;
// Use boost's get_id rather than the system thread ID as the unique key for this thread
// since the latter is more prone to being recycled.
thread_mgr_ref->AddThread(this_thread::get_id(), name_copy, category_copy, system_tid);
ThreadDebugInfo thread_debug_info;
thread_debug_info.SetThreadName(name_copy);
thread_debug_info.SetParentInfo(parent_thread_info);
thread_started->Set(system_tid);
// Any reference to any parameter not copied in by value may no longer be valid after
// this point, since the caller that is waiting on *tid != 0 may wake, take the lock and
// destroy the enclosing Thread object.
functor();
thread_mgr_ref->RemoveThread(this_thread::get_id(), category_copy);
}
void ThreadGroup::AddThread(unique_ptr<Thread>&& thread) {
threads_.emplace_back(move(thread));
}
void ThreadGroup::JoinAll() {
for (auto& thread : threads_) thread->Join();
}
int ThreadGroup::Size() const {
return threads_.size();
}
namespace {
void RegisterUrlCallbacks(bool include_jvm_threads, Webserver* webserver) {
DCHECK(webserver != nullptr);
auto overview_callback = [include_jvm_threads]
(const Webserver::WebRequest& req, Document* doc) {
ThreadOverviewUrlCallback(include_jvm_threads, req, doc);
};
webserver->RegisterUrlCallback(
THREADS_WEB_PAGE, THREADS_TEMPLATE, overview_callback, true);
auto group_callback = [] (const Webserver::WebRequest& req, Document* doc) {
thread_manager->ThreadGroupUrlCallback(req, doc);
};
webserver->RegisterUrlCallback(THREAD_GROUP_WEB_PAGE, THREAD_GROUP_TEMPLATE,
group_callback, false);
if (include_jvm_threads) {
auto jvm_threads_callback = [] (const Webserver::WebRequest& req, Document* doc) {
JvmThreadsUrlCallback(req, doc);
};
webserver->RegisterUrlCallback(JVM_THREADS_WEB_PAGE, JVM_THREADS_TEMPLATE,
jvm_threads_callback, false);
}
}
void ThreadOverviewUrlCallback(bool include_jvm_threads,
const Webserver::WebRequest& req, Document* document) {
thread_manager->GetThreadOverview(document);
if (!include_jvm_threads) return;
// Add information about the JVM threads
TGetJvmThreadsInfoRequest request;
request.get_complete_info = false;
TGetJvmThreadsInfoResponse response;
Status status = JniUtil::GetJvmThreadsInfo(request, &response);
if (!status.ok()) {
Value error(Substitute("Couldn't retrieve information about JVM threads: $0",
status.GetDetail()).c_str(), document->GetAllocator());
document->AddMember("error", error, document->GetAllocator());
return;
}
Value jvm_threads_val(kObjectType);
jvm_threads_val.AddMember("name", "jvm", document->GetAllocator());
jvm_threads_val.AddMember("total", response.total_thread_count,
document->GetAllocator());
jvm_threads_val.AddMember("daemon", response.daemon_thread_count,
document->GetAllocator());
document->AddMember("jvm-threads", jvm_threads_val, document->GetAllocator());
}
void JvmThreadsUrlCallback(const Webserver::WebRequest& req, Document* doc) {
DCHECK(doc != NULL);
TGetJvmThreadsInfoRequest request;
request.get_complete_info = true;
TGetJvmThreadsInfoResponse response;
Status status = JniUtil::GetJvmThreadsInfo(request, &response);
if (!status.ok()) {
Value error(Substitute("Couldn't retrieve information about JVM threads: $0",
status.GetDetail()).c_str(), doc->GetAllocator());
doc->AddMember("error", error, doc->GetAllocator());
return;
}
Value overview(kObjectType);
overview.AddMember("thread_count", response.total_thread_count, doc->GetAllocator());
overview.AddMember("daemon_count", response.daemon_thread_count, doc->GetAllocator());
overview.AddMember("peak_count", response.peak_thread_count, doc->GetAllocator());
doc->AddMember("overview", overview, doc->GetAllocator());
Value lst(kArrayType);
for (const TJvmThreadInfo& thread: response.threads) {
Value val(kObjectType);
Value summary(thread.summary.c_str(), doc->GetAllocator());
val.AddMember("summary", summary, doc->GetAllocator());
val.AddMember("cpu_time_sec", static_cast<double>(thread.cpu_time_in_ns) / 1e9,
doc->GetAllocator());
val.AddMember("user_time_sec", static_cast<double>(thread.user_time_in_ns) / 1e9,
doc->GetAllocator());
val.AddMember("blocked_time_ms", thread.blocked_time_in_ms, doc->GetAllocator());
val.AddMember("blocked_count", thread.blocked_count, doc->GetAllocator());
val.AddMember("is_native", thread.is_in_native, doc->GetAllocator());
lst.PushBack(val, doc->GetAllocator());
}
doc->AddMember("jvm-threads", lst, doc->GetAllocator());
}
}
void InitThreading() {
DCHECK(thread_manager.get() == nullptr);
thread_manager.reset(new ThreadMgr());
}
Status StartThreadInstrumentation(MetricGroup* metrics, Webserver* webserver,
bool include_jvm_threads) {
DCHECK(metrics != nullptr);
DCHECK(webserver != nullptr);
RETURN_IF_ERROR(thread_manager->StartInstrumentation(metrics));
RegisterUrlCallbacks(include_jvm_threads, webserver);
return Status::OK();
}
}