blob: fd68cfe8b182a0880dc97d9d2efc8a7c37992e5a [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "util/runtime-profile-counters.h"
#include <algorithm>
#include <iomanip>
#include <iostream>
#include <utility>
#include <boost/bind.hpp>
#include <boost/thread/locks.hpp>
#include <boost/thread/thread.hpp>
#include "common/object-pool.h"
#include "gutil/strings/strip.h"
#include "kudu/util/logging.h"
#include "rpc/thrift-util.h"
#include "runtime/mem-tracker.h"
#include "util/coding-util.h"
#include "util/compress.h"
#include "util/container-util.h"
#include "util/debug-util.h"
#include "util/periodic-counter-updater.h"
#include "util/pretty-printer.h"
#include "util/redactor.h"
#include "util/scope-exit-trigger.h"
#include "util/ubsan.h"
#include "common/names.h"
DECLARE_int32(status_report_interval_ms);
DECLARE_int32(periodic_counter_update_period_ms);
using namespace rapidjson;
namespace impala {
// Thread counters name
static const string THREAD_TOTAL_TIME = "TotalWallClockTime";
static const string THREAD_USER_TIME = "UserTime";
static const string THREAD_SYS_TIME = "SysTime";
static const string THREAD_VOLUNTARY_CONTEXT_SWITCHES = "VoluntaryContextSwitches";
static const string THREAD_INVOLUNTARY_CONTEXT_SWITCHES = "InvoluntaryContextSwitches";
// The root counter name for all top level counters.
static const string ROOT_COUNTER = "";
const string RuntimeProfile::TOTAL_TIME_COUNTER_NAME = "TotalTime";
const string RuntimeProfile::LOCAL_TIME_COUNTER_NAME = "LocalTime";
const string RuntimeProfile::INACTIVE_TIME_COUNTER_NAME = "InactiveTotalTime";
RuntimeProfile* RuntimeProfile::Create(ObjectPool* pool, const string& name,
bool is_averaged_profile) {
return pool->Add(new RuntimeProfile(pool, name, is_averaged_profile));
}
RuntimeProfile::RuntimeProfile(
ObjectPool* pool, const string& name, bool is_averaged_profile)
: pool_(pool),
name_(name),
is_averaged_profile_(is_averaged_profile),
counter_total_time_(TUnit::TIME_NS),
inactive_timer_(TUnit::TIME_NS),
local_time_percent_(0),
local_time_ns_(0) {
Counter* total_time_counter;
Counter* inactive_timer;
if (!is_averaged_profile) {
total_time_counter = &counter_total_time_;
inactive_timer = &inactive_timer_;
} else {
total_time_counter = pool->Add(new AveragedCounter(TUnit::TIME_NS));
inactive_timer = pool->Add(new AveragedCounter(TUnit::TIME_NS));
}
counter_map_[TOTAL_TIME_COUNTER_NAME] = total_time_counter;
counter_map_[INACTIVE_TIME_COUNTER_NAME] = inactive_timer;
}
RuntimeProfile::~RuntimeProfile() {
DCHECK(!has_active_periodic_counters_);
}
void RuntimeProfile::StopPeriodicCounters() {
lock_guard<SpinLock> l(counter_map_lock_);
if (!has_active_periodic_counters_) return;
for (Counter* sampling_counter : sampling_counters_) {
PeriodicCounterUpdater::StopSamplingCounter(sampling_counter);
}
for (Counter* rate_counter : rate_counters_) {
PeriodicCounterUpdater::StopRateCounter(rate_counter);
}
for (vector<Counter*>* counters : bucketing_counters_) {
PeriodicCounterUpdater::StopBucketingCounters(counters);
}
for (auto& time_series_counter_entry : time_series_counter_map_) {
PeriodicCounterUpdater::StopTimeSeriesCounter(time_series_counter_entry.second);
}
has_active_periodic_counters_ = false;
}
RuntimeProfile* RuntimeProfile::CreateFromThrift(ObjectPool* pool,
const TRuntimeProfileTree& profiles) {
if (profiles.nodes.size() == 0) return NULL;
int idx = 0;
RuntimeProfile* profile = RuntimeProfile::CreateFromThrift(pool, profiles.nodes, &idx);
profile->SetTExecSummary(profiles.exec_summary);
return profile;
}
RuntimeProfile* RuntimeProfile::CreateFromThrift(ObjectPool* pool,
const vector<TRuntimeProfileNode>& nodes, int* idx) {
DCHECK_LT(*idx, nodes.size());
const TRuntimeProfileNode& node = nodes[*idx];
RuntimeProfile* profile = Create(pool, node.name);
profile->metadata_ = node.node_metadata;
for (int i = 0; i < node.counters.size(); ++i) {
const TCounter& counter = node.counters[i];
profile->counter_map_[counter.name] =
pool->Add(new Counter(counter.unit, counter.value));
}
if (node.__isset.event_sequences) {
for (const TEventSequence& sequence: node.event_sequences) {
profile->event_sequence_map_[sequence.name] =
pool->Add(new EventSequence(sequence.timestamps, sequence.labels));
}
}
if (node.__isset.time_series_counters) {
for (const TTimeSeriesCounter& val: node.time_series_counters) {
// Capture all incoming time series counters with the same type since re-sampling
// will have happened on the sender side.
profile->time_series_counter_map_[val.name] = pool->Add(
new ChunkedTimeSeriesCounter(val.name, val.unit, val.period_ms, val.values));
}
}
if (node.__isset.summary_stats_counters) {
for (const TSummaryStatsCounter& val: node.summary_stats_counters) {
profile->summary_stats_map_[val.name] =
pool->Add(new SummaryStatsCounter(
val.unit, val.total_num_values, val.min_value, val.max_value, val.sum));
}
}
profile->child_counter_map_ = node.child_counters_map;
profile->info_strings_ = node.info_strings;
profile->info_strings_display_order_ = node.info_strings_display_order;
++*idx;
for (int i = 0; i < node.num_children; ++i) {
profile->AddChild(RuntimeProfile::CreateFromThrift(pool, nodes, idx));
}
return profile;
}
void RuntimeProfile::UpdateAverage(RuntimeProfile* other) {
DCHECK(other != NULL);
DCHECK(is_averaged_profile_);
// Merge this level
{
CounterMap::iterator dst_iter;
CounterMap::const_iterator src_iter;
lock_guard<SpinLock> l(counter_map_lock_);
lock_guard<SpinLock> m(other->counter_map_lock_);
for (src_iter = other->counter_map_.begin();
src_iter != other->counter_map_.end(); ++src_iter) {
// Ignore this counter for averages.
if (src_iter->first == INACTIVE_TIME_COUNTER_NAME) continue;
dst_iter = counter_map_.find(src_iter->first);
AveragedCounter* avg_counter;
// Get the counter with the same name in dst_iter (this->counter_map_)
// Create one if it doesn't exist.
if (dst_iter == counter_map_.end()) {
avg_counter = pool_->Add(new AveragedCounter(src_iter->second->unit()));
counter_map_[src_iter->first] = avg_counter;
} else {
DCHECK(dst_iter->second->unit() == src_iter->second->unit());
avg_counter = static_cast<AveragedCounter*>(dst_iter->second);
}
avg_counter->UpdateCounter(src_iter->second);
}
// TODO: Can we unlock the counter_map_lock_ here?
ChildCounterMap::const_iterator child_counter_src_itr;
for (child_counter_src_itr = other->child_counter_map_.begin();
child_counter_src_itr != other->child_counter_map_.end();
++child_counter_src_itr) {
set<string>* child_counters = FindOrInsert(&child_counter_map_,
child_counter_src_itr->first, set<string>());
child_counters->insert(child_counter_src_itr->second.begin(),
child_counter_src_itr->second.end());
}
}
{
lock_guard<SpinLock> l(children_lock_);
lock_guard<SpinLock> m(other->children_lock_);
// Recursively merge children with matching names.
// Track the current position in the vector so we preserve the order of children
// if children are added after the first Update()/UpdateAverage() call (IMPALA-6694).
// E.g. if the first update sends [B, D] and the second update sends [A, B, C, D],
// then this code makes sure that children_ is [A, B, C, D] afterwards.
ChildVector::iterator insert_pos = children_.begin();
for (int i = 0; i < other->children_.size(); ++i) {
RuntimeProfile* other_child = other->children_[i].first;
ChildMap::iterator j = child_map_.find(other_child->name_);
RuntimeProfile* child = NULL;
if (j != child_map_.end()) {
child = j->second;
// Search forward until the insert position is either at the end of the vector
// or after this child. This preserves the order if the relative order of
// children in all updates is consistent.
bool found_child = false;
while (insert_pos != children_.end() && !found_child) {
found_child = insert_pos->first == child;
++insert_pos;
}
} else {
child = Create(pool_, other_child->name_, true);
child->metadata_ = other_child->metadata_;
bool indent_other_child = other->children_[i].second;
child_map_[child->name_] = child;
insert_pos = children_.insert(insert_pos, make_pair(child, indent_other_child));
++insert_pos;
}
child->UpdateAverage(other_child);
}
}
ComputeTimeInProfile();
}
void RuntimeProfile::Update(const TRuntimeProfileTree& thrift_profile) {
int idx = 0;
Update(thrift_profile.nodes, &idx);
DCHECK_EQ(idx, thrift_profile.nodes.size());
}
void RuntimeProfile::Update(const vector<TRuntimeProfileNode>& nodes, int* idx) {
if (UNLIKELY(nodes.size()) == 0) return;
DCHECK_LT(*idx, nodes.size());
const TRuntimeProfileNode& node = nodes[*idx];
{
// Update this level.
map<string, Counter*>::iterator dst_iter;
lock_guard<SpinLock> l(counter_map_lock_);
for (int i = 0; i < node.counters.size(); ++i) {
const TCounter& tcounter = node.counters[i];
CounterMap::iterator j = counter_map_.find(tcounter.name);
if (j == counter_map_.end()) {
counter_map_[tcounter.name] =
pool_->Add(new Counter(tcounter.unit, tcounter.value));
} else {
if (j->second->unit() != tcounter.unit) {
LOG(ERROR) << "Cannot update counters with the same name ("
<< j->first << ") but different units.";
} else {
j->second->Set(tcounter.value);
}
}
}
ChildCounterMap::const_iterator child_counter_src_itr;
for (child_counter_src_itr = node.child_counters_map.begin();
child_counter_src_itr != node.child_counters_map.end();
++child_counter_src_itr) {
set<string>* child_counters = FindOrInsert(&child_counter_map_,
child_counter_src_itr->first, set<string>());
child_counters->insert(child_counter_src_itr->second.begin(),
child_counter_src_itr->second.end());
}
}
{
const InfoStrings& info_strings = node.info_strings;
lock_guard<SpinLock> l(info_strings_lock_);
for (const string& key: node.info_strings_display_order) {
// Look for existing info strings and update in place. If there
// are new strings, add them to the end of the display order.
// TODO: Is nodes.info_strings always a superset of
// info_strings_? If so, can just copy the display order.
InfoStrings::const_iterator it = info_strings.find(key);
DCHECK(it != info_strings.end());
InfoStrings::iterator existing = info_strings_.find(key);
if (existing == info_strings_.end()) {
info_strings_.emplace(key, it->second);
info_strings_display_order_.push_back(key);
} else {
info_strings_[key] = it->second;
}
}
}
{
lock_guard<SpinLock> l(counter_map_lock_);
for (int i = 0; i < node.time_series_counters.size(); ++i) {
const TTimeSeriesCounter& c = node.time_series_counters[i];
TimeSeriesCounterMap::iterator it = time_series_counter_map_.find(c.name);
if (it == time_series_counter_map_.end()) {
// Capture all incoming time series counters with the same type since re-sampling
// will have happened on the sender side.
time_series_counter_map_[c.name] = pool_->Add(
new ChunkedTimeSeriesCounter(c.name, c.unit, c.period_ms, c.values));
} else {
int64_t start_idx = c.__isset.start_index ? c.start_index : 0;
it->second->SetSamples(c.period_ms, c.values, start_idx);
}
}
}
{
lock_guard<SpinLock> l(event_sequence_lock_);
for (int i = 0; i < node.event_sequences.size(); ++i) {
const TEventSequence& seq = node.event_sequences[i];
EventSequenceMap::iterator it = event_sequence_map_.find(seq.name);
if (it == event_sequence_map_.end()) {
event_sequence_map_[seq.name] =
pool_->Add(new EventSequence(seq.timestamps, seq.labels));
} else {
it->second->AddNewerEvents(seq.timestamps, seq.labels);
}
}
}
{
lock_guard<SpinLock> l(summary_stats_map_lock_);
for (int i = 0; i < node.summary_stats_counters.size(); ++i) {
const TSummaryStatsCounter& c = node.summary_stats_counters[i];
SummaryStatsCounterMap::iterator it = summary_stats_map_.find(c.name);
if (it == summary_stats_map_.end()) {
summary_stats_map_[c.name] =
pool_->Add(new SummaryStatsCounter(
c.unit, c.total_num_values, c.min_value, c.max_value, c.sum));
} else {
it->second->SetStats(c);
}
}
}
++*idx;
{
lock_guard<SpinLock> l(children_lock_);
// Track the current position in the vector so we preserve the order of children
// if children are added after the first Update()/UpdateAverage() call (IMPALA-6694).
// E.g. if the first update sends [B, D] and the second update sends [A, B, C, D],
// then this code makes sure that children_ is [A, B, C, D] afterwards.
ChildVector::iterator insert_pos = children_.begin();
// Update children with matching names; create new ones if they don't match.
for (int i = 0; i < node.num_children; ++i) {
const TRuntimeProfileNode& tchild = nodes[*idx];
ChildMap::iterator j = child_map_.find(tchild.name);
RuntimeProfile* child = NULL;
if (j != child_map_.end()) {
child = j->second;
// Search forward until the insert position is either at the end of the vector
// or after this child. This preserves the order if the relative order of
// children in all updates is consistent.
bool found_child = false;
while (insert_pos != children_.end() && !found_child) {
found_child = insert_pos->first == child;
++insert_pos;
}
} else {
child = Create(pool_, tchild.name);
child->metadata_ = tchild.node_metadata;
child_map_[tchild.name] = child;
insert_pos = children_.insert(insert_pos, make_pair(child, tchild.indent));
++insert_pos;
}
child->Update(nodes, idx);
}
}
}
void RuntimeProfile::Divide(int n) {
DCHECK_GT(n, 0);
map<string, Counter*>::iterator iter;
{
lock_guard<SpinLock> l(counter_map_lock_);
for (iter = counter_map_.begin(); iter != counter_map_.end(); ++iter) {
if (iter->second->unit() == TUnit::DOUBLE_VALUE) {
iter->second->Set(iter->second->double_value() / n);
} else {
iter->second->value_.Store(iter->second->value() / n);
}
}
}
{
lock_guard<SpinLock> l(children_lock_);
for (ChildMap::iterator i = child_map_.begin(); i != child_map_.end(); ++i) {
i->second->Divide(n);
}
}
}
void RuntimeProfile::ComputeTimeInProfile() {
ComputeTimeInProfile(total_time_counter()->value());
}
void RuntimeProfile::ComputeTimeInProfile(int64_t total) {
// Recurse on children. After this, childrens' total time is up to date.
{
lock_guard<SpinLock> l(children_lock_);
for (int i = 0; i < children_.size(); ++i) {
children_[i].first->ComputeTimeInProfile();
}
}
// Get total time from children
int64_t children_total_time = 0;
{
lock_guard<SpinLock> l(children_lock_);
for (int i = 0; i < children_.size(); ++i) {
children_total_time += children_[i].first->total_time();
}
}
// IMPALA-5200: Take the max, because the parent includes all of the time from the
// children, whether or not its total time counter has been updated recently enough
// to see this.
total_time_ns_ = max(children_total_time, total_time_counter()->value());
// If a local time counter exists, use its value as local time. Otherwise, derive the
// local time from total time and the child time.
bool has_local_time_counter = false;
{
lock_guard<SpinLock> l(counter_map_lock_);
CounterMap::const_iterator itr = counter_map_.find(LOCAL_TIME_COUNTER_NAME);
if (itr != counter_map_.end()) {
local_time_ns_ = itr->second->value();
has_local_time_counter = true;
}
}
if (!has_local_time_counter) {
local_time_ns_ = total_time_ns_ - children_total_time;
if (!is_averaged_profile_) {
local_time_ns_ -= inactive_timer()->value();
}
}
// Counters have some margin, set to 0 if it was negative.
local_time_ns_ = ::max<int64_t>(0, local_time_ns_);
local_time_percent_ =
static_cast<double>(local_time_ns_) / total_time_ns_;
local_time_percent_ = ::min(1.0, local_time_percent_) * 100;
}
void RuntimeProfile::AddChild(RuntimeProfile* child, bool indent, RuntimeProfile* loc) {
lock_guard<SpinLock> l(children_lock_);
ChildVector::iterator insert_pos;
if (loc == NULL) {
insert_pos = children_.end();
} else {
bool found = false;
for (ChildVector::iterator it = children_.begin(); it != children_.end(); ++it) {
if (it->first == loc) {
insert_pos = it + 1;
found = true;
break;
}
}
DCHECK(found) << "Invalid loc";
}
AddChildLocked(child, indent, insert_pos);
}
void RuntimeProfile::AddChildLocked(
RuntimeProfile* child, bool indent, ChildVector::iterator insert_pos) {
children_lock_.DCheckLocked();
DCHECK(child != NULL);
if (child_map_.count(child->name_) > 0) {
// This child has already been added, so do nothing.
// Otherwise, the map and vector will be out of sync.
return;
}
child_map_[child->name_] = child;
children_.insert(insert_pos, make_pair(child, indent));
}
void RuntimeProfile::PrependChild(RuntimeProfile* child, bool indent) {
lock_guard<SpinLock> l(children_lock_);
AddChildLocked(child, indent, children_.begin());
}
RuntimeProfile* RuntimeProfile::CreateChild(const string& name, bool indent,
bool prepend) {
lock_guard<SpinLock> l(children_lock_);
DCHECK(child_map_.find(name) == child_map_.end());
RuntimeProfile* child = Create(pool_, name);
AddChildLocked(child, indent, prepend ? children_.begin() : children_.end());
return child;
}
void RuntimeProfile::GetChildren(vector<RuntimeProfile*>* children) {
children->clear();
lock_guard<SpinLock> l(children_lock_);
for (const auto& entry : children_) children->push_back(entry.first);
}
void RuntimeProfile::GetAllChildren(vector<RuntimeProfile*>* children) {
lock_guard<SpinLock> l(children_lock_);
for (ChildMap::iterator i = child_map_.begin(); i != child_map_.end(); ++i) {
children->push_back(i->second);
i->second->GetAllChildren(children);
}
}
void RuntimeProfile::SortChildrenByTotalTime() {
lock_guard<SpinLock> l(children_lock_);
// Create a snapshot of total time values so that they don't change while we're
// sorting. Sort the <total_time, index> pairs, then reshuffle children_.
vector<pair<int64_t, int64_t>> total_times;
for (int i = 0; i < children_.size(); ++i) {
total_times.emplace_back(children_[i].first->total_time_counter()->value(), i);
}
// Order by descending total time.
sort(total_times.begin(), total_times.end(),
[](const pair<int64_t, int64_t>& p1, const pair<int64_t, int64_t>& p2) {
return p1.first > p2.first;
});
ChildVector new_children;
for (const auto& p : total_times) new_children.emplace_back(children_[p.second]);
children_ = move(new_children);
}
void RuntimeProfile::AddInfoString(const string& key, const string& value) {
return AddInfoStringInternal(key, value, false);
}
void RuntimeProfile::AddInfoStringRedacted(const string& key, const string& value) {
return AddInfoStringInternal(key, value, false, true);
}
void RuntimeProfile::AppendInfoString(const string& key, const string& value) {
return AddInfoStringInternal(key, value, true);
}
void RuntimeProfile::AddInfoStringInternal(const string& key, string value,
bool append, bool redact) {
if (redact) Redact(&value);
StripTrailingWhitespace(&value);
lock_guard<SpinLock> l(info_strings_lock_);
InfoStrings::iterator it = info_strings_.find(key);
if (it == info_strings_.end()) {
info_strings_.emplace(key, std::move(value));
info_strings_display_order_.push_back(key);
} else {
if (append) {
it->second += ", " + std::move(value);
} else {
it->second = std::move(value);
}
}
}
void RuntimeProfile::UpdateInfoString(const string& key, string value) {
lock_guard<SpinLock> l(info_strings_lock_);
InfoStrings::iterator it = info_strings_.find(key);
if (it != info_strings_.end()) it->second = std::move(value);
}
const string* RuntimeProfile::GetInfoString(const string& key) const {
lock_guard<SpinLock> l(info_strings_lock_);
InfoStrings::const_iterator it = info_strings_.find(key);
if (it == info_strings_.end()) return NULL;
return &it->second;
}
void RuntimeProfile::AddCodegenMsg(
bool codegen_enabled, const string& extra_info, const string& extra_label) {
string str = codegen_enabled ? "Codegen Enabled" : "Codegen Disabled";
if (!extra_info.empty()) str = str + ": " + extra_info;
if (!extra_label.empty()) str = extra_label + " " + str;
AppendExecOption(str);
}
#define ADD_COUNTER_IMPL(NAME, T) \
RuntimeProfile::T* RuntimeProfile::NAME( \
const string& name, TUnit::type unit, const string& parent_counter_name) { \
lock_guard<SpinLock> l(counter_map_lock_); \
bool dummy; \
return NAME##Locked(name, unit, parent_counter_name, &dummy); \
} \
RuntimeProfile::T* RuntimeProfile::NAME##Locked( const string& name, \
TUnit::type unit, const string& parent_counter_name, bool* created) { \
counter_map_lock_.DCheckLocked(); \
DCHECK_EQ(is_averaged_profile_, false); \
if (counter_map_.find(name) != counter_map_.end()) { \
*created = false; \
return reinterpret_cast<T*>(counter_map_[name]); \
} \
DCHECK(parent_counter_name == ROOT_COUNTER \
|| counter_map_.find(parent_counter_name) != counter_map_.end()); \
T* counter = pool_->Add(new T(unit)); \
counter_map_[name] = counter; \
set<string>* child_counters = \
FindOrInsert(&child_counter_map_, parent_counter_name, set<string>()); \
child_counters->insert(name); \
*created = true; \
return counter; \
}
ADD_COUNTER_IMPL(AddCounter, Counter);
ADD_COUNTER_IMPL(AddHighWaterMarkCounter, HighWaterMarkCounter);
ADD_COUNTER_IMPL(AddConcurrentTimerCounter, ConcurrentTimerCounter);
RuntimeProfile::DerivedCounter* RuntimeProfile::AddDerivedCounter(
const string& name, TUnit::type unit,
const SampleFunction& counter_fn, const string& parent_counter_name) {
DCHECK_EQ(is_averaged_profile_, false);
lock_guard<SpinLock> l(counter_map_lock_);
if (counter_map_.find(name) != counter_map_.end()) return NULL;
DerivedCounter* counter = pool_->Add(new DerivedCounter(unit, counter_fn));
counter_map_[name] = counter;
set<string>* child_counters =
FindOrInsert(&child_counter_map_, parent_counter_name, set<string>());
child_counters->insert(name);
return counter;
}
RuntimeProfile::ThreadCounters* RuntimeProfile::AddThreadCounters(
const string& prefix) {
ThreadCounters* counter = pool_->Add(new ThreadCounters());
counter->total_time_ = AddCounter(prefix + THREAD_TOTAL_TIME, TUnit::TIME_NS);
counter->user_time_ = AddCounter(prefix + THREAD_USER_TIME, TUnit::TIME_NS,
prefix + THREAD_TOTAL_TIME);
counter->sys_time_ = AddCounter(prefix + THREAD_SYS_TIME, TUnit::TIME_NS,
prefix + THREAD_TOTAL_TIME);
counter->voluntary_context_switches_ =
AddCounter(prefix + THREAD_VOLUNTARY_CONTEXT_SWITCHES, TUnit::UNIT);
counter->involuntary_context_switches_ =
AddCounter(prefix + THREAD_INVOLUNTARY_CONTEXT_SWITCHES, TUnit::UNIT);
return counter;
}
void RuntimeProfile::AddLocalTimeCounter(const SampleFunction& counter_fn) {
DerivedCounter* local_time_counter = pool_->Add(
new DerivedCounter(TUnit::TIME_NS, counter_fn));
lock_guard<SpinLock> l(counter_map_lock_);
DCHECK(counter_map_.find(LOCAL_TIME_COUNTER_NAME) == counter_map_.end())
<< "LocalTimeCounter already exists in the map.";
counter_map_[LOCAL_TIME_COUNTER_NAME] = local_time_counter;
}
RuntimeProfile::Counter* RuntimeProfile::GetCounter(const string& name) {
lock_guard<SpinLock> l(counter_map_lock_);
if (counter_map_.find(name) != counter_map_.end()) {
return counter_map_[name];
}
return NULL;
}
RuntimeProfile::SummaryStatsCounter* RuntimeProfile::GetSummaryStatsCounter(
const string& name) {
lock_guard<SpinLock> l(summary_stats_map_lock_);
if (summary_stats_map_.find(name) != summary_stats_map_.end()) {
return summary_stats_map_[name];
}
return nullptr;
}
void RuntimeProfile::GetCounters(const string& name, vector<Counter*>* counters) {
Counter* c = GetCounter(name);
if (c != NULL) counters->push_back(c);
lock_guard<SpinLock> l(children_lock_);
for (int i = 0; i < children_.size(); ++i) {
children_[i].first->GetCounters(name, counters);
}
}
RuntimeProfile::EventSequence* RuntimeProfile::GetEventSequence(const string& name) const
{
lock_guard<SpinLock> l(event_sequence_lock_);
EventSequenceMap::const_iterator it = event_sequence_map_.find(name);
if (it == event_sequence_map_.end()) return NULL;
return it->second;
}
void RuntimeProfile::ToJson(Document* d) const{
// queryObj that stores all JSON format profile information
Value queryObj(kObjectType);
RuntimeProfile::ToJsonHelper(&queryObj, d);
d->RemoveMember("contents");
d->AddMember("contents", queryObj, d->GetAllocator());
}
void RuntimeProfile::ToJsonCounters(Value* parent, Document* d,
const string& counter_name, const CounterMap& counter_map,
const ChildCounterMap& child_counter_map) const{
auto& allocator = d->GetAllocator();
ChildCounterMap::const_iterator itr = child_counter_map.find(counter_name);
if (itr != child_counter_map.end()) {
const set<string>& child_counters = itr->second;
for (const string& child_counter: child_counters) {
CounterMap::const_iterator iter = counter_map.find(child_counter);
if (iter == counter_map.end()) continue;
Value counter(kObjectType);
iter->second->ToJson(*d, &counter);
counter.AddMember("counter_name", StringRef(child_counter.c_str()), allocator);
Value child_counters_json(kArrayType);
RuntimeProfile::ToJsonCounters(&child_counters_json, d,
child_counter, counter_map,child_counter_map);
if (!child_counters_json.Empty()){
counter.AddMember("child_counters", child_counters_json, allocator);
}
parent->PushBack(counter, allocator);
}
}
}
void RuntimeProfile::ToJsonHelper(Value* parent, Document* d) const{
Document::AllocatorType& allocator = d->GetAllocator();
// Create copy of counter_map_ and child_counter_map_ so we don't need to hold lock
// while we call value() on the counters (some of those might be DerivedCounters).
CounterMap counter_map;
ChildCounterMap child_counter_map;
{
lock_guard<SpinLock> l(counter_map_lock_);
counter_map = counter_map_;
child_counter_map = child_counter_map_;
}
// 1. Name
Value name(name_.c_str(), allocator);
parent->AddMember("profile_name", name, allocator);
// 2. Num_children
parent->AddMember("num_children", children_.size(), allocator);
// 3. Metadata
// Set the required metadata field to the plan node ID for compatibility with any tools
// that rely on the plan node id being set there.
// Legacy field. May contain the node ID for plan nodes.
// Replaced by node_metadata, which contains richer metadata.
parent->AddMember("metadata",
metadata_.__isset.plan_node_id ? metadata_.plan_node_id : -1, allocator);
// requires exactly one field of a union to be set so we only mark node_metadata
// as set if that is the case.
if (metadata_.__isset.plan_node_id || metadata_.__isset.data_sink_id){
Value node_metadata_json(kObjectType);
if (metadata_.__isset.plan_node_id){
node_metadata_json.AddMember("plan_node_id", metadata_.plan_node_id, allocator);
}
if (metadata_.__isset.data_sink_id){
node_metadata_json.AddMember("data_sink_id", metadata_.data_sink_id, allocator);
}
parent->AddMember("node_metadata", node_metadata_json, allocator);
}
// 4. Info_strings
{
lock_guard<SpinLock> l(info_strings_lock_);
if (!info_strings_.empty()) {
Value info_strings_json(kArrayType);
for (const string& key : info_strings_display_order_) {
Value info_string_json(kObjectType);
Value key_json(key.c_str(), allocator);
auto value_itr = info_strings_.find(key);
DCHECK(value_itr != info_strings_.end());
Value value_json(value_itr->second.c_str(), allocator);
info_string_json.AddMember("key", key_json, allocator);
info_string_json.AddMember("value", value_json, allocator);
info_strings_json.PushBack(info_string_json, allocator);
}
parent->AddMember("info_strings", info_strings_json, allocator);
}
}
// 5. Events
{
lock_guard<SpinLock> l(event_sequence_lock_);
if (!event_sequence_map_.empty()) {
Value event_sequences_json(kArrayType);
for (EventSequenceMap::const_iterator it = event_sequence_map_.begin();
it != event_sequence_map_.end(); ++it) {
Value event_sequence_json(kObjectType);
it->second->ToJson(*d, &event_sequence_json);
event_sequences_json.PushBack(event_sequence_json, allocator);
}
parent->AddMember("event_sequences", event_sequences_json, allocator);
}
}
// 6. Counters
Value counters(kArrayType);
RuntimeProfile::ToJsonCounters(&counters , d, "", counter_map, child_counter_map);
if (!counters.Empty()) {
parent->AddMember("counters", counters, allocator);
}
// 7. SummaryStatsCounter
{
lock_guard<SpinLock> l(summary_stats_map_lock_);
if (!summary_stats_map_.empty()) {
Value summary_stats_counters_json(kArrayType);
for (const SummaryStatsCounterMap::value_type& v : summary_stats_map_) {
Value summary_stats_counter(kObjectType);
Value summary_name(v.first.c_str(), allocator);
v.second->ToJson(*d, &summary_stats_counter);
// Remove Kind here because it would be redundant information for users
summary_stats_counter.RemoveMember("kind");
summary_stats_counter.AddMember("counter_name", summary_name, allocator);
summary_stats_counters_json.PushBack(summary_stats_counter, allocator);
}
parent->AddMember(
"summary_stats_counters", summary_stats_counters_json, allocator);
}
}
// 8. Time_series_counter_map
{
// Print all time series counters as following:
// - <Name> (<period>): <val1>, <val2>, <etc>
lock_guard<SpinLock> l(counter_map_lock_);
if (!time_series_counter_map_.empty()) {
Value time_series_counters_json(kArrayType);
for (const TimeSeriesCounterMap::value_type& v : time_series_counter_map_) {
TimeSeriesCounter* counter = v.second;
Value time_series_json(kObjectType);
counter->ToJson(*d, &time_series_json);
time_series_counters_json.PushBack(time_series_json, allocator);
}
parent->AddMember("time_series_counters", time_series_counters_json, allocator);
}
}
// 9. Children Runtime Profiles
//
// Create copy of children_ so we don't need to hold lock while we call
// ToJsonHelper() on the children.
ChildVector children;
{
lock_guard<SpinLock> l(children_lock_);
children = children_;
}
if (!children.empty()) {
Value child_profiles(kArrayType);
for (int i = 0; i < children.size(); ++i) {
RuntimeProfile* profile = children[i].first;
Value child_profile(kObjectType);
profile->ToJsonHelper(&child_profile, d);
child_profiles.PushBack(child_profile, allocator);
}
parent->AddMember("child_profiles", child_profiles, allocator);
}
}
// Print the profile:
// 1. Profile Name
// 2. Info Strings
// 3. Counters
// 4. Children
void RuntimeProfile::PrettyPrint(ostream* s, const string& prefix) const {
ostream& stream = *s;
// Create copy of counter_map_ and child_counter_map_ so we don't need to hold lock
// while we call value() on the counters (some of those might be DerivedCounters).
CounterMap counter_map;
ChildCounterMap child_counter_map;
{
lock_guard<SpinLock> l(counter_map_lock_);
counter_map = counter_map_;
child_counter_map = child_counter_map_;
}
map<string, Counter*>::const_iterator total_time =
counter_map.find(TOTAL_TIME_COUNTER_NAME);
DCHECK(total_time != counter_map.end());
stream.flags(ios::fixed);
stream << prefix << name_ << ":";
if (total_time->second->value() != 0) {
stream << "(Total: "
<< PrettyPrinter::Print(total_time->second->value(),
total_time->second->unit())
<< ", non-child: "
<< PrettyPrinter::Print(local_time_ns_, TUnit::TIME_NS)
<< ", % non-child: "
<< setprecision(2) << local_time_percent_
<< "%)";
}
stream << endl;
{
lock_guard<SpinLock> l(info_strings_lock_);
for (const string& key: info_strings_display_order_) {
stream << prefix << " " << key << ": " << info_strings_.find(key)->second << endl;
}
}
{
// Print all the event timers as the following:
// <EventKey> Timeline: 2s719ms
// - Event 1: 6.522us (6.522us)
// - Event 2: 2s288ms (2s288ms)
// - Event 3: 2s410ms (121.138ms)
// The times in parentheses are the time elapsed since the last event.
vector<EventSequence::Event> events;
lock_guard<SpinLock> l(event_sequence_lock_);
for (const EventSequenceMap::value_type& event_sequence: event_sequence_map_) {
// If the stopwatch has never been started (e.g. because this sequence came from
// Thrift), look for the last element to tell us the total runtime. For
// currently-updating sequences, it's better to use the stopwatch value because that
// updates continuously.
int64_t last = event_sequence.second->ElapsedTime();
event_sequence.second->GetEvents(&events);
if (last == 0 && events.size() > 0) last = events.back().second;
stream << prefix << " " << event_sequence.first << ": "
<< PrettyPrinter::Print(last, TUnit::TIME_NS)
<< endl;
int64_t prev = 0L;
event_sequence.second->GetEvents(&events);
for (const EventSequence::Event& event: events) {
stream << prefix << " - " << event.first << ": "
<< PrettyPrinter::Print(event.second, TUnit::TIME_NS) << " ("
<< PrettyPrinter::Print(event.second - prev, TUnit::TIME_NS) << ")"
<< endl;
prev = event.second;
}
}
}
{
// Print all time series counters as following:
// - <Name> (<period>): <val1>, <val2>, <etc>
lock_guard<SpinLock> l(counter_map_lock_);
for (const TimeSeriesCounterMap::value_type& v: time_series_counter_map_) {
const TimeSeriesCounter* counter = v.second;
lock_guard<SpinLock> l(counter->lock_);
int num, period;
const int64_t* samples = counter->GetSamplesLocked(&num, &period);
if (num > 0) {
// Clamp number of printed values at 64, the maximum number of values in the
// SamplingTimeSeriesCounter.
int step = 1 + (num - 1) / 64;
period *= step;
stream << prefix << " - " << v.first << " ("
<< PrettyPrinter::Print(period * 1000000L, TUnit::TIME_NS) << "): ";
for (int i = 0; i < num; i += step) {
stream << PrettyPrinter::Print(samples[i], counter->unit());
if (i + step < num) stream << ", ";
}
if (step > 1) {
stream << " (Showing " << ((num + 1) / step) << " of " << num << " values from "
"Thrift Profile)";
}
stream << endl;
}
}
}
{
lock_guard<SpinLock> l(summary_stats_map_lock_);
// Print all SummaryStatsCounters as following:
// <Name>: (Avg: <value> ; Min: <min_value> ; Max: <max_value> ;
// Number of samples: <total>)
for (const SummaryStatsCounterMap::value_type& v: summary_stats_map_) {
if (v.second->TotalNumValues() == 0) {
// No point printing all the stats if number of samples is zero.
stream << prefix << " - " << v.first << ": "
<< PrettyPrinter::Print(v.second->value(), v.second->unit(), true)
<< " (Number of samples: " << v.second->TotalNumValues() << ")" << endl;
} else {
stream << prefix << " - " << v.first << ": (Avg: "
<< PrettyPrinter::Print(v.second->value(), v.second->unit(), true)
<< " ; Min: "
<< PrettyPrinter::Print(v.second->MinValue(), v.second->unit(), true)
<< " ; Max: "
<< PrettyPrinter::Print(v.second->MaxValue(), v.second->unit(), true)
<< " ; Number of samples: " << v.second->TotalNumValues() << ")" << endl;
}
}
}
RuntimeProfile::PrintChildCounters(
prefix, ROOT_COUNTER, counter_map, child_counter_map, s);
// Create copy of children_ so we don't need to hold lock while we call
// PrettyPrint() on the children.
ChildVector children;
{
lock_guard<SpinLock> l(children_lock_);
children = children_;
}
for (int i = 0; i < children.size(); ++i) {
RuntimeProfile* profile = children[i].first;
bool indent = children[i].second;
profile->PrettyPrint(s, prefix + (indent ? " " : ""));
}
}
Status RuntimeProfile::SerializeToArchiveString(string* out) const {
stringstream ss;
RETURN_IF_ERROR(SerializeToArchiveString(&ss));
*out = ss.str();
return Status::OK();
}
Status RuntimeProfile::SerializeToArchiveString(stringstream* out) const {
Status status;
TRuntimeProfileTree thrift_object;
const_cast<RuntimeProfile*>(this)->ToThrift(&thrift_object);
ThriftSerializer serializer(true);
vector<uint8_t> serialized_buffer;
RETURN_IF_ERROR(serializer.SerializeToVector(&thrift_object, &serialized_buffer));
// Compress the serialized thrift string. This uses string keys and is very
// easy to compress.
scoped_ptr<Codec> compressor;
Codec::CodecInfo codec_info(THdfsCompression::DEFAULT);
RETURN_IF_ERROR(Codec::CreateCompressor(NULL, false, codec_info, &compressor));
const auto close_compressor =
MakeScopeExitTrigger([&compressor]() { compressor->Close(); });
vector<uint8_t> compressed_buffer;
int64_t max_compressed_size = compressor->MaxOutputLen(serialized_buffer.size());
DCHECK_GT(max_compressed_size, 0);
compressed_buffer.resize(max_compressed_size);
int64_t result_len = compressed_buffer.size();
uint8_t* compressed_buffer_ptr = compressed_buffer.data();
RETURN_IF_ERROR(compressor->ProcessBlock(true, serialized_buffer.size(),
serialized_buffer.data(), &result_len, &compressed_buffer_ptr));
compressed_buffer.resize(result_len);
Base64Encode(compressed_buffer, out);
return Status::OK();;
}
Status RuntimeProfile::DeserializeFromArchiveString(
const std::string& archive_str, TRuntimeProfileTree* out) {
int64_t decoded_max;
if (!Base64DecodeBufLen(archive_str.c_str(), archive_str.size(), &decoded_max)) {
return Status("Error in DeserializeFromArchiveString: Base64DecodeBufLen failed.");
}
vector<uint8_t> decoded_buffer;
decoded_buffer.resize(decoded_max);
int64_t decoded_len;
if (!Base64Decode(archive_str.c_str(), archive_str.size(), decoded_max,
reinterpret_cast<char*>(decoded_buffer.data()), &decoded_len)) {
return Status("Error in DeserializeFromArchiveString: Base64Decode failed.");
}
decoded_buffer.resize(decoded_len);
scoped_ptr<Codec> decompressor;
MemTracker mem_tracker;
MemPool mem_pool(&mem_tracker);
const auto close_mem_tracker = MakeScopeExitTrigger([&mem_pool, &mem_tracker]() {
mem_pool.FreeAll();
mem_tracker.Close();
});
RETURN_IF_ERROR(Codec::CreateDecompressor(
&mem_pool, false, THdfsCompression::DEFAULT, &decompressor));
const auto close_decompressor =
MakeScopeExitTrigger([&decompressor]() { decompressor->Close(); });
int64_t result_len;
uint8_t* decompressed_buffer;
RETURN_IF_ERROR(decompressor->ProcessBlock(
false, decoded_len, decoded_buffer.data(), &result_len, &decompressed_buffer));
uint32_t deserialized_len = static_cast<uint32_t>(result_len);
RETURN_IF_ERROR(
DeserializeThriftMsg(decompressed_buffer, &deserialized_len, true, out));
return Status::OK();
}
void RuntimeProfile::SetTExecSummary(const TExecSummary& summary) {
lock_guard<SpinLock> l(t_exec_summary_lock_);
t_exec_summary_ = summary;
}
void RuntimeProfile::ToThrift(TRuntimeProfileTree* tree) const {
tree->nodes.clear();
ToThrift(&tree->nodes);
ExecSummaryToThrift(tree);
}
void RuntimeProfile::ToThrift(vector<TRuntimeProfileNode>* nodes) const {
int index = nodes->size();
nodes->push_back(TRuntimeProfileNode());
TRuntimeProfileNode& node = (*nodes)[index];
node.name = name_;
// Set the required metadata field to the plan node ID for compatibility with any tools
// that rely on the plan node id being set there.
node.metadata = metadata_.__isset.plan_node_id ? metadata_.plan_node_id : -1;
// Thrift requires exactly one field of a union to be set so we only mark node_metadata
// as set if that is the case.
if (metadata_.__isset.plan_node_id || metadata_.__isset.data_sink_id) {
node.__set_node_metadata(metadata_);
}
node.indent = true;
CounterMap counter_map;
{
lock_guard<SpinLock> l(counter_map_lock_);
counter_map = counter_map_;
node.child_counters_map = child_counter_map_;
}
for (map<string, Counter*>::const_iterator iter = counter_map.begin();
iter != counter_map.end(); ++iter) {
TCounter counter;
counter.name = iter->first;
counter.value = iter->second->value();
counter.unit = iter->second->unit();
node.counters.push_back(counter);
}
{
lock_guard<SpinLock> l(info_strings_lock_);
node.info_strings = info_strings_;
node.info_strings_display_order = info_strings_display_order_;
}
{
vector<EventSequence::Event> events;
lock_guard<SpinLock> l(event_sequence_lock_);
if (event_sequence_map_.size() != 0) {
node.__set_event_sequences(vector<TEventSequence>());
node.event_sequences.resize(event_sequence_map_.size());
int idx = 0;
for (const EventSequenceMap::value_type& val: event_sequence_map_) {
TEventSequence* seq = &node.event_sequences[idx++];
seq->name = val.first;
val.second->GetEvents(&events);
for (const EventSequence::Event& ev: events) {
seq->labels.push_back(ev.first);
seq->timestamps.push_back(ev.second);
}
}
}
}
{
lock_guard<SpinLock> l(counter_map_lock_);
if (time_series_counter_map_.size() != 0) {
node.__set_time_series_counters(
vector<TTimeSeriesCounter>(time_series_counter_map_.size()));
int idx = 0;
for (const TimeSeriesCounterMap::value_type& val: time_series_counter_map_) {
val.second->ToThrift(&node.time_series_counters[idx++]);
}
}
}
{
lock_guard<SpinLock> l(summary_stats_map_lock_);
if (summary_stats_map_.size() != 0) {
node.__set_summary_stats_counters(
vector<TSummaryStatsCounter>(summary_stats_map_.size()));
int idx = 0;
for (const SummaryStatsCounterMap::value_type& val: summary_stats_map_) {
val.second->ToThrift(&node.summary_stats_counters[idx++], val.first);
}
}
}
ChildVector children;
{
lock_guard<SpinLock> l(children_lock_);
children = children_;
node.num_children = children_.size();
}
for (int i = 0; i < children.size(); ++i) {
int child_idx = nodes->size();
children[i].first->ToThrift(nodes);
// fix up indentation flag
(*nodes)[child_idx].indent = children[i].second;
}
}
void RuntimeProfile::ExecSummaryToThrift(TRuntimeProfileTree* tree) const {
GetExecSummary(&tree->exec_summary);
tree->__isset.exec_summary = true;
}
void RuntimeProfile::GetExecSummary(TExecSummary* t_exec_summary) const {
lock_guard<SpinLock> l(t_exec_summary_lock_);
*t_exec_summary = t_exec_summary_;
}
void RuntimeProfile::SetPlanNodeId(int node_id) {
DCHECK(!metadata_.__isset.data_sink_id) << "Don't set conflicting metadata";
metadata_.__set_plan_node_id(node_id);
}
void RuntimeProfile::SetDataSinkId(int sink_id) {
DCHECK(!metadata_.__isset.plan_node_id) << "Don't set conflicting metadata";
metadata_.__set_data_sink_id(sink_id);
}
int64_t RuntimeProfile::UnitsPerSecond(
const RuntimeProfile::Counter* total_counter, const RuntimeProfile::Counter* timer) {
DCHECK(total_counter->unit() == TUnit::BYTES || total_counter->unit() == TUnit::UNIT);
DCHECK(timer->unit() == TUnit::TIME_NS);
if (timer->value() == 0) return 0;
double secs = static_cast<double>(timer->value()) / 1000.0 / 1000.0 / 1000.0;
return total_counter->value() / secs;
}
int64_t RuntimeProfile::CounterSum(const vector<Counter*>* counters) {
int64_t value = 0;
for (int i = 0; i < counters->size(); ++i) {
value += (*counters)[i]->value();
}
return value;
}
RuntimeProfile::Counter* RuntimeProfile::AddRateCounter(
const string& name, Counter* src_counter) {
TUnit::type dst_unit;
switch (src_counter->unit()) {
case TUnit::BYTES:
dst_unit = TUnit::BYTES_PER_SECOND;
break;
case TUnit::UNIT:
dst_unit = TUnit::UNIT_PER_SECOND;
break;
default:
DCHECK(false) << "Unsupported src counter unit: " << src_counter->unit();
return NULL;
}
{
lock_guard<SpinLock> l(counter_map_lock_);
bool created;
Counter* dst_counter = AddCounterLocked(name, dst_unit, "", &created);
if (!created) return dst_counter;
rate_counters_.push_back(dst_counter);
PeriodicCounterUpdater::RegisterPeriodicCounter(src_counter, NULL, dst_counter,
PeriodicCounterUpdater::RATE_COUNTER);
has_active_periodic_counters_ = true;
return dst_counter;
}
}
RuntimeProfile::Counter* RuntimeProfile::AddRateCounter(
const string& name, SampleFunction fn, TUnit::type dst_unit) {
lock_guard<SpinLock> l(counter_map_lock_);
bool created;
Counter* dst_counter = AddCounterLocked(name, dst_unit, "", &created);
if (!created) return dst_counter;
rate_counters_.push_back(dst_counter);
PeriodicCounterUpdater::RegisterPeriodicCounter(NULL, fn, dst_counter,
PeriodicCounterUpdater::RATE_COUNTER);
has_active_periodic_counters_ = true;
return dst_counter;
}
RuntimeProfile::Counter* RuntimeProfile::AddSamplingCounter(
const string& name, Counter* src_counter) {
DCHECK(src_counter->unit() == TUnit::UNIT);
lock_guard<SpinLock> l(counter_map_lock_);
bool created;
Counter* dst_counter = AddCounterLocked(name, TUnit::DOUBLE_VALUE, "", &created);
if (!created) return dst_counter;
sampling_counters_.push_back(dst_counter);
PeriodicCounterUpdater::RegisterPeriodicCounter(src_counter, NULL, dst_counter,
PeriodicCounterUpdater::SAMPLING_COUNTER);
has_active_periodic_counters_ = true;
return dst_counter;
}
RuntimeProfile::Counter* RuntimeProfile::AddSamplingCounter(
const string& name, SampleFunction sample_fn) {
lock_guard<SpinLock> l(counter_map_lock_);
bool created;
Counter* dst_counter = AddCounterLocked(name, TUnit::DOUBLE_VALUE, "", &created);
if (!created) return dst_counter;
sampling_counters_.push_back(dst_counter);
PeriodicCounterUpdater::RegisterPeriodicCounter(NULL, sample_fn, dst_counter,
PeriodicCounterUpdater::SAMPLING_COUNTER);
has_active_periodic_counters_ = true;
return dst_counter;
}
vector<RuntimeProfile::Counter*>* RuntimeProfile::AddBucketingCounters(
Counter* src_counter, int num_buckets) {
lock_guard<SpinLock> l(counter_map_lock_);
vector<RuntimeProfile::Counter*>* buckets = pool_->Add(new vector<Counter*>);
for (int i = 0; i < num_buckets; ++i) {
buckets->push_back(
pool_->Add(new RuntimeProfile::Counter(TUnit::DOUBLE_VALUE, 0)));
}
bucketing_counters_.insert(buckets);
has_active_periodic_counters_ = true;
PeriodicCounterUpdater::RegisterBucketingCounters(src_counter, buckets);
return buckets;
}
RuntimeProfile::EventSequence* RuntimeProfile::AddEventSequence(const string& name) {
lock_guard<SpinLock> l(event_sequence_lock_);
EventSequenceMap::iterator timer_it = event_sequence_map_.find(name);
if (timer_it != event_sequence_map_.end()) return timer_it->second;
EventSequence* timer = pool_->Add(new EventSequence());
event_sequence_map_[name] = timer;
return timer;
}
RuntimeProfile::EventSequence* RuntimeProfile::AddEventSequence(const string& name,
const TEventSequence& from) {
lock_guard<SpinLock> l(event_sequence_lock_);
EventSequenceMap::iterator timer_it = event_sequence_map_.find(name);
if (timer_it != event_sequence_map_.end()) return timer_it->second;
EventSequence* timer = pool_->Add(new EventSequence(from.timestamps, from.labels));
event_sequence_map_[name] = timer;
return timer;
}
void RuntimeProfile::PrintChildCounters(const string& prefix,
const string& counter_name, const CounterMap& counter_map,
const ChildCounterMap& child_counter_map, ostream* s) {
ostream& stream = *s;
ChildCounterMap::const_iterator itr = child_counter_map.find(counter_name);
if (itr != child_counter_map.end()) {
const set<string>& child_counters = itr->second;
for (const string& child_counter: child_counters) {
CounterMap::const_iterator iter = counter_map.find(child_counter);
if (iter == counter_map.end()) continue;
stream << prefix << " - " << iter->first << ": "
<< PrettyPrinter::Print(iter->second->value(), iter->second->unit(), true)
<< endl;
RuntimeProfile::PrintChildCounters(prefix + " ", child_counter, counter_map,
child_counter_map, s);
}
}
}
RuntimeProfile::SummaryStatsCounter* RuntimeProfile::AddSummaryStatsCounter(
const string& name, TUnit::type unit, const std::string& parent_counter_name) {
DCHECK_EQ(is_averaged_profile_, false);
lock_guard<SpinLock> l(summary_stats_map_lock_);
if (summary_stats_map_.find(name) != summary_stats_map_.end()) {
return summary_stats_map_[name];
}
SummaryStatsCounter* counter = pool_->Add(new SummaryStatsCounter(unit));
summary_stats_map_[name] = counter;
return counter;
}
RuntimeProfile::TimeSeriesCounter* RuntimeProfile::AddSamplingTimeSeriesCounter(
const string& name, TUnit::type unit, SampleFunction fn) {
DCHECK(fn != nullptr);
lock_guard<SpinLock> l(counter_map_lock_);
TimeSeriesCounterMap::iterator it = time_series_counter_map_.find(name);
if (it != time_series_counter_map_.end()) return it->second;
TimeSeriesCounter* counter = pool_->Add(new SamplingTimeSeriesCounter(name, unit, fn));
time_series_counter_map_[name] = counter;
PeriodicCounterUpdater::RegisterTimeSeriesCounter(counter);
has_active_periodic_counters_ = true;
return counter;
}
RuntimeProfile::TimeSeriesCounter* RuntimeProfile::AddSamplingTimeSeriesCounter(
const string& name, Counter* src_counter) {
DCHECK(src_counter != NULL);
return AddSamplingTimeSeriesCounter(name, src_counter->unit(),
bind(&Counter::value, src_counter));
}
void RuntimeProfile::TimeSeriesCounter::AddSample(int ms_elapsed) {
lock_guard<SpinLock> l(lock_);
int64_t sample = sample_fn_();
AddSampleLocked(sample, ms_elapsed);
}
const int64_t* RuntimeProfile::TimeSeriesCounter::GetSamplesLockedForSend(
int* num_samples, int* period) {
return GetSamplesLocked(num_samples, period);
}
void RuntimeProfile::TimeSeriesCounter::SetSamples(
int period, const std::vector<int64_t>& samples, int64_t start_idx) {
DCHECK(false);
}
void RuntimeProfile::SamplingTimeSeriesCounter::AddSampleLocked(
int64_t sample, int ms_elapsed){
samples_.AddSample(sample, ms_elapsed);
}
const int64_t* RuntimeProfile::SamplingTimeSeriesCounter::GetSamplesLocked(
int* num_samples, int* period) const {
return samples_.GetSamples(num_samples, period);
}
RuntimeProfile::ChunkedTimeSeriesCounter::ChunkedTimeSeriesCounter(
const string& name, TUnit::type unit, SampleFunction fn)
: TimeSeriesCounter(name, unit, fn)
, period_(FLAGS_periodic_counter_update_period_ms)
, max_size_(10 * FLAGS_status_report_interval_ms / period_) {}
void RuntimeProfile::ChunkedTimeSeriesCounter::Clear() {
lock_guard<SpinLock> l(lock_);
previous_sample_count_ += last_get_count_;
values_.erase(values_.begin(), values_.begin() + last_get_count_);
last_get_count_ = 0;
}
void RuntimeProfile::ChunkedTimeSeriesCounter::AddSampleLocked(
int64_t sample, int ms_elapsed) {
// We chose inefficiently erasing elements from a vector over using a std::deque because
// this should only happen very infrequently and we rely on contiguous storage in
// GetSamplesLocked*().
if (max_size_ > 0 && values_.size() == max_size_) {
KLOG_EVERY_N_SECS(WARNING, 60) << "ChunkedTimeSeriesCounter reached maximum size";
values_.erase(values_.begin(), values_.begin() + 1);
}
DCHECK_LT(values_.size(), max_size_);
values_.push_back(sample);
}
const int64_t* RuntimeProfile::ChunkedTimeSeriesCounter::GetSamplesLocked(
int* num_samples, int* period) const {
DCHECK(num_samples != nullptr);
DCHECK(period != nullptr);
*num_samples = values_.size();
*period = period_;
return values_.data();
}
const int64_t* RuntimeProfile::ChunkedTimeSeriesCounter::GetSamplesLockedForSend(
int* num_samples, int* period) {
last_get_count_ = values_.size();
return GetSamplesLocked(num_samples, period);
}
void RuntimeProfile::ChunkedTimeSeriesCounter::SetSamples(
int period, const std::vector<int64_t>& samples, int64_t start_idx) {
lock_guard<SpinLock> l(lock_);
if (start_idx == 0) {
// This could be coming from a SamplingTimeSeriesCounter or another
// ChunkedTimeSeriesCounter.
period_ = period;
values_ = samples;
return;
}
// Only ChunkedTimeSeriesCounter will set start_idx > 0.
DCHECK_GT(start_idx, 0);
DCHECK_EQ(period_, period);
if (values_.size() < start_idx) {
// Fill up with 0.
values_.resize(start_idx);
}
DCHECK_GE(values_.size(), start_idx);
// Skip values we already received.
auto start_it = samples.begin() + values_.size() - start_idx;
values_.insert(values_.end(), start_it, samples.end());
}
RuntimeProfile::TimeSeriesCounter* RuntimeProfile::AddChunkedTimeSeriesCounter(
const string& name, TUnit::type unit, SampleFunction fn) {
DCHECK(fn != nullptr);
lock_guard<SpinLock> l(counter_map_lock_);
TimeSeriesCounterMap::iterator it = time_series_counter_map_.find(name);
if (it != time_series_counter_map_.end()) return it->second;
TimeSeriesCounter* counter = pool_->Add(new ChunkedTimeSeriesCounter(name, unit, fn));
time_series_counter_map_[name] = counter;
PeriodicCounterUpdater::RegisterTimeSeriesCounter(counter);
has_active_periodic_counters_ = true;
return counter;
}
void RuntimeProfile::ClearChunkedTimeSeriesCounters() {
{
lock_guard<SpinLock> l(counter_map_lock_);
for (auto& it : time_series_counter_map_) it.second->Clear();
}
{
lock_guard<SpinLock> l(children_lock_);
for (int i = 0; i < children_.size(); ++i) {
children_[i].first->ClearChunkedTimeSeriesCounters();
}
}
}
void RuntimeProfile::TimeSeriesCounter::ToThrift(TTimeSeriesCounter* counter) {
lock_guard<SpinLock> l(lock_);
int num, period;
const int64_t* samples = GetSamplesLockedForSend(&num, &period);
counter->values.resize(num);
Ubsan::MemCpy(counter->values.data(), samples, num * sizeof(int64_t));
counter->name = name_;
counter->unit = unit_;
counter->period_ms = period;
counter->__set_start_index(previous_sample_count_);
}
void RuntimeProfile::EventSequence::ToThrift(TEventSequence* seq) {
lock_guard<SpinLock> l(lock_);
/// It's possible that concurrent events can be logged out of sequence so we sort the
/// events before serializing them.
SortEvents();
for (const EventSequence::Event& ev: events_) {
seq->labels.push_back(ev.first);
seq->timestamps.push_back(ev.second);
}
}
void RuntimeProfile::SummaryStatsCounter::ToThrift(TSummaryStatsCounter* counter,
const std::string& name) {
lock_guard<SpinLock> l(lock_);
counter->name = name;
counter->unit = unit_;
counter->sum = sum_;
counter->total_num_values = total_num_values_;
counter->min_value = min_;
counter->max_value = max_;
}
void RuntimeProfile::SummaryStatsCounter::UpdateCounter(int64_t new_value) {
lock_guard<SpinLock> l(lock_);
++total_num_values_;
sum_ += new_value;
value_.Store(sum_ / total_num_values_);
if (new_value < min_) min_ = new_value;
if (new_value > max_) max_ = new_value;
}
void RuntimeProfile::SummaryStatsCounter::SetStats(const TSummaryStatsCounter& counter) {
// We drop this input if it looks malformed.
if (counter.total_num_values < 0) return;
lock_guard<SpinLock> l(lock_);
unit_ = counter.unit;
sum_ = counter.sum;
total_num_values_ = counter.total_num_values;
min_ = counter.min_value;
max_ = counter.max_value;
value_.Store(total_num_values_ == 0 ? 0 : sum_ / total_num_values_);
}
int64_t RuntimeProfile::SummaryStatsCounter::MinValue() {
lock_guard<SpinLock> l(lock_);
return min_;
}
int64_t RuntimeProfile::SummaryStatsCounter::MaxValue() {
lock_guard<SpinLock> l(lock_);
return max_;
}
int32_t RuntimeProfile::SummaryStatsCounter::TotalNumValues() {
lock_guard<SpinLock> l(lock_);
return total_num_values_;
}
void RuntimeProfile::Counter::ToJson(Document& document, Value* val) const {
Value counter_json(kObjectType);
counter_json.AddMember("value", value(), document.GetAllocator());
auto unit_itr = _TUnit_VALUES_TO_NAMES.find(unit_);
DCHECK(unit_itr != _TUnit_VALUES_TO_NAMES.end());
Value unit_json(unit_itr->second, document.GetAllocator());
counter_json.AddMember("unit", unit_json, document.GetAllocator());
Value kind_json(CounterType().c_str(), document.GetAllocator());
counter_json.AddMember("kind", kind_json, document.GetAllocator());
*val = counter_json;
}
void RuntimeProfile::TimeSeriesCounter::ToJson(Document& document, Value* val) {
lock_guard<SpinLock> lock(lock_);
Value counter_json(kObjectType);
counter_json.AddMember("counter_name",
StringRef(name_.c_str()), document.GetAllocator());
auto unit_itr = _TUnit_VALUES_TO_NAMES.find(unit_);
DCHECK(unit_itr != _TUnit_VALUES_TO_NAMES.end());
Value unit_json(unit_itr->second, document.GetAllocator());
counter_json.AddMember("unit", unit_json, document.GetAllocator());
int num, period;
const int64_t* samples = GetSamplesLocked(&num, &period);
counter_json.AddMember("num", num, document.GetAllocator());
counter_json.AddMember("period", period, document.GetAllocator());
stringstream stream;
// Clamp number of printed values at 64, the maximum number of values in the
// SamplingTimeSeriesCounter.
int step = 1 + (num - 1) / 64;
period *= step;
for (int i = 0; i < num; i += step) {
stream << samples[i];
if (i + step < num) stream << ",";
}
Value samples_data_json(stream.str().c_str(), document.GetAllocator());
counter_json.AddMember("data", samples_data_json, document.GetAllocator());
*val = counter_json;
}
void RuntimeProfile::EventSequence::ToJson(Document& document, Value* value) {
boost::lock_guard<SpinLock> event_lock(lock_);
SortEvents();
Value event_sequence_json(kObjectType);
event_sequence_json.AddMember("offset", offset_, document.GetAllocator());
Value events_json(kArrayType);
for (const Event& ev: events_) {
Value event_json(kObjectType);
event_json.AddMember("label", StringRef(ev.first.c_str()), document.GetAllocator());
event_json.AddMember("timestamp", ev.second, document.GetAllocator());
events_json.PushBack(event_json, document.GetAllocator());
}
event_sequence_json.AddMember("events", events_json, document.GetAllocator());
*value = event_sequence_json;
}
}