blob: e5f81870d0835f602dc74c6d7b25edcf347d9000 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/server/diagnostics_log.h"
#include <cstddef>
#include <cstdint>
#include <functional>
#include <memory>
#include <optional>
#include <ostream>
#include <queue>
#include <string>
#include <utility>
#include <vector>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include <sparsehash/dense_hash_set>
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/stringprintf.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/gutil/walltime.h"
#include "kudu/util/array_view.h"
#include "kudu/util/condition_variable.h"
#include "kudu/util/debug-util.h"
#include "kudu/util/env.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/jsonwriter.h"
#include "kudu/util/logging.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
#include "kudu/util/mutex.h"
#include "kudu/util/random.h"
#include "kudu/util/random_util.h"
#include "kudu/util/rolling_log.h"
#include "kudu/util/scoped_cleanup.h"
#include "kudu/util/status.h"
#include "kudu/util/thread.h"
using std::pair;
using std::priority_queue;
using std::string;
using std::unique_ptr;
using std::vector;
using strings::Substitute;
// GLog already implements symbolization. Just import their hidden symbol.
namespace google {
// Symbolizes a program counter. On success, returns true and write the
// symbol name to "out". The symbol name is demangled if possible
// (supports symbols generated by GCC 3.x or newer). Otherwise,
// returns false.
bool Symbolize(void *pc, char *out, size_t out_size);
}
DEFINE_int32(diagnostics_log_stack_traces_interval_ms, 60000,
"The interval at which the server will a snapshot of its thread stacks to the "
"diagnostics log. In fact, the server will log at a random interval betweeen "
"zero and twice the configured value to avoid biasing samples towards periodic "
"processes which happen exactly on some particular schedule. If this is set to "
"0, stack traces will be not be periodically logged, but will still be logged "
"on events such as queue overflows. Setting this to a negative value disables "
"stack trace logging entirely.");
TAG_FLAG(diagnostics_log_stack_traces_interval_ms, runtime);
TAG_FLAG(diagnostics_log_stack_traces_interval_ms, experimental);
namespace kudu {
namespace server {
// Track which symbols have been emitted to the log already.
class DiagnosticsLog::SymbolSet {
public:
SymbolSet() {
set_.set_empty_key(nullptr);
}
// Return true if the addr was added, false if it already existed.
bool Add(void* addr) {
// We can't add nullptr since that's the 'empty' key. However this
// also will never have a real symbol, so we'll just pretend it's already
// present.
return addr && InsertIfNotPresent(&set_, addr);
}
void ResetIfLogRolled(int roll_count) {
if (roll_count_ != roll_count) {
roll_count_ = roll_count;
set_.clear();
}
}
private:
int roll_count_ = 0;
google::dense_hash_set<void*> set_;
};
DiagnosticsLog::DiagnosticsLog(string log_dir,
string program_name,
MetricRegistry* metric_registry) :
log_dir_(std::move(log_dir)),
program_name_(std::move(program_name)),
metric_registry_(metric_registry),
wake_(&lock_),
metrics_log_interval_(MonoDelta::FromSeconds(60)),
symbols_(new SymbolSet()) {
}
DiagnosticsLog::~DiagnosticsLog() {
Stop();
}
void DiagnosticsLog::SetMetricsLogInterval(MonoDelta interval) {
MutexLock l(lock_);
metrics_log_interval_ = interval;
}
void DiagnosticsLog::DumpStacksNow(std::string reason) {
MutexLock l(lock_);
dump_stacks_now_reason_ = std::move(reason);
wake_.Signal();
}
Status DiagnosticsLog::Start() {
unique_ptr<RollingLog> l(new RollingLog(Env::Default(), log_dir_, program_name_, "diagnostics"));
RETURN_NOT_OK_PREPEND(l->Open(), "unable to open diagnostics log");
log_ = std::move(l);
Status s = Thread::Create("server", "diag-logger",
[this]() { this->RunThread(); }, &thread_);
if (!s.ok()) {
// Don't leave the log open if we failed to start our thread.
log_.reset();
}
return s;
}
void DiagnosticsLog::Stop() {
if (!thread_) return;
{
MutexLock l(lock_);
stop_ = true;
wake_.Signal();
}
thread_->Join();
thread_.reset();
stop_ = false;
WARN_NOT_OK(log_->Close(), "Unable to close diagnostics log");
}
MonoTime DiagnosticsLog::ComputeNextWakeup(DiagnosticsLog::WakeupType type) const {
switch (type) {
case WakeupType::STACKS:
if (FLAGS_diagnostics_log_stack_traces_interval_ms > 0) {
// Instead of directly using the configured interval, we use a uniform random
// interval whose mean is the configured value. This prevents biasing our stack
// samples. For example, if there is some background process which happens once a
// minute, and the user also configured the stacks to once a minute, an operator
// might incorrectly surmise that the background task was _always_ running.
// Randomizing the samples avoids such correlations.
Random rng(GetRandomSeed32());
int64_t ms = rng.Uniform(FLAGS_diagnostics_log_stack_traces_interval_ms * 2);
return MonoTime::Now() + MonoDelta::FromMilliseconds(ms);
} else {
// Stack tracing is disabled. However we still wake up periodically because the
// flag is runtime-modifiable, and we need to wake up to notice that it might have
// changed.
return MonoTime::Now() + MonoDelta::FromSeconds(5);
}
break;
case WakeupType::METRICS:
return MonoTime::Now() + metrics_log_interval_;
}
__builtin_unreachable();
}
void DiagnosticsLog::RunThread() {
MutexLock l(lock_);
// Set up a priority queue which tracks our future scheduled wake-ups.
typedef pair<MonoTime, WakeupType> QueueElem;
priority_queue<QueueElem, vector<QueueElem>, std::greater<QueueElem>> wakeups;
wakeups.emplace(ComputeNextWakeup(WakeupType::METRICS), WakeupType::METRICS);
wakeups.emplace(ComputeNextWakeup(WakeupType::STACKS), WakeupType::STACKS);
while (!stop_) {
MonoTime next_log = wakeups.top().first;
wake_.WaitUntil(next_log);
string reason;
WakeupType what;
if (dump_stacks_now_reason_) {
what = WakeupType::STACKS;
reason = std::move(*dump_stacks_now_reason_);
dump_stacks_now_reason_.reset();
} else if (MonoTime::Now() >= next_log) {
what = wakeups.top().second;
reason = "periodic";
wakeups.pop();
wakeups.emplace(ComputeNextWakeup(what), what);
} else {
// Spurious wakeup, or a stop trigger.
continue;
}
// Unlock the mutex while actually logging metrics or stacks since it's somewhat
// slow and we don't want to block threads trying to signal us.
l.Unlock();
SCOPED_CLEANUP({ l.Lock(); });
Status s;
if (what == WakeupType::METRICS) {
WARN_NOT_OK(LogMetrics(), "Unable to collect metrics to diagnostics log");
} else if (what == WakeupType::STACKS && FLAGS_diagnostics_log_stack_traces_interval_ms >= 0) {
WARN_NOT_OK(LogStacks(reason), "Unable to collect stacks to diagnostics log");
}
}
}
Status DiagnosticsLog::LogStacks(const string& reason) {
StackTraceSnapshot snap;
snap.set_capture_thread_names(false);
RETURN_NOT_OK(snap.SnapshotAllStacks());
std::ostringstream buf;
MicrosecondsInt64 now = GetCurrentTimeMicros();
// Because symbols are potentially long strings, and likely to be
// very repetitive, we do a sort of dictionary encoding here. When
// we roll a file, we clear our symbol table. Then, within that
// file, the first time we see any address, we add it to the table
// and make sure it is output in a 'symbols' record. Subsequent
// repetitions of the same address do not need to re-output the
// symbol.
symbols_->ResetIfLogRolled(log_->roll_count());
vector<std::pair<void*, string>> new_symbols;
snap.VisitGroups([&](ArrayView<StackTraceSnapshot::ThreadInfo> group) {
const StackTrace& stack = group[0].stack;
for (int i = 0; i < stack.num_frames(); i++) {
void* addr = stack.frame(i);
if (symbols_->Add(addr)) {
char buf[1024];
// Subtract 1 from the address before symbolizing, because the
// address on the stack is actually the return address of the function
// call rather than the address of the call instruction itself.
if (google::Symbolize(static_cast<char*>(addr) - 1, buf, sizeof(buf))) {
new_symbols.emplace_back(addr, buf);
}
// If symbolization fails, don't bother adding it. Readers of the log
// will just see that it's missing from the symbol map and should handle that
// as an unknown symbol.
}
}
});
if (!new_symbols.empty()) {
buf << "I" << FormatTimestampForLog(now)
<< " symbols " << now << " ";
JsonWriter jw(&buf, JsonWriter::COMPACT);
jw.StartObject();
for (auto& p : new_symbols) {
jw.String(StringPrintf("%p", p.first));
jw.String(p.second);
}
jw.EndObject();
buf << "\n";
}
buf << "I" << FormatTimestampForLog(now) << " stacks " << now << " ";
JsonWriter jw(&buf, JsonWriter::COMPACT);
jw.StartObject();
jw.String("reason");
jw.String(reason);
jw.String("groups");
jw.StartArray();
snap.VisitGroups([&](ArrayView<StackTraceSnapshot::ThreadInfo> group) {
jw.StartObject();
jw.String("tids");
jw.StartArray();
for (auto& t : group) {
// TODO(todd): should we also output the thread names, perhaps in
// a sort of dictionary fashion? It's more instructive but in
// practice the stack traces should usually indicate the work
// that's being done, anyway, which is enough to tie back to the
// thread. The TID is smaller and useful for correlating against
// messages in the normal glog as well.
jw.Int64(t.tid);
}
jw.EndArray();
jw.String("stack");
jw.StartArray();
const StackTrace& stack = group[0].stack;
for (int i = 0; i < stack.num_frames(); i++) {
jw.String(StringPrintf("%p", stack.frame(i)));
}
jw.EndArray();
jw.EndObject();
});
jw.EndArray(); // array of thread groups
jw.EndObject(); // end of top-level object
buf << "\n";
RETURN_NOT_OK(log_->Append(buf.str()));
return Status::OK();
}
Status DiagnosticsLog::LogMetrics() {
MetricJsonOptions opts;
opts.include_raw_histograms = true;
opts.only_modified_in_or_after_epoch = metrics_epoch_;
// We don't output any metrics which have never been incremented. Though
// this seems redundant with the "only include changed metrics" above, it
// also ensures that we don't dump a bunch of zero data on startup.
opts.include_untouched_metrics = false;
// Entity attributes aren't that useful in the context of this log. We can
// always grab the entity attributes separately if necessary.
opts.include_entity_attributes = false;
std::ostringstream buf;
MicrosecondsInt64 now = GetCurrentTimeMicros();
buf << "I" << FormatTimestampForLog(now)
<< " metrics " << now << " ";
// Collect the metrics JSON string.
int64_t this_log_epoch = Metric::current_epoch();
Metric::IncrementEpoch();
JsonWriter writer(&buf, JsonWriter::COMPACT);
RETURN_NOT_OK(metric_registry_->WriteAsJson(&writer, opts));
buf << "\n";
RETURN_NOT_OK(log_->Append(buf.str()));
// Next time we fetch, only show those that changed after the epoch
// we just logged.
//
// NOTE: we only bump this in the successful log case so that if we failed to
// write above, we wouldn't skip any changes.
metrics_epoch_ = this_log_epoch + 1;
return Status::OK();
}
} // namespace server
} // namespace kudu