blob: 79453c0eb82828fb577fb96ebcce6beba09cc862 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "util/runtime-profile-counters.h"
#include <algorithm>
#include <iomanip>
#include <iostream>
#include <mutex>
#include <numeric>
#include <type_traits>
#include <utility>
#include <boost/algorithm/string/join.hpp>
#include <boost/bind.hpp>
#include "common/object-pool.h"
#include "gutil/strings/strip.h"
#include "kudu/util/logging.h"
#include "rpc/thrift-util.h"
#include "runtime/mem-pool.h"
#include "runtime/mem-tracker.h"
#include "util/coding-util.h"
#include "util/compress.h"
#include "util/container-util.h"
#include "util/debug-util.h"
#include "util/periodic-counter-updater.h"
#include "util/pretty-printer.h"
#include "util/redactor.h"
#include "util/scope-exit-trigger.h"
#include "util/ubsan.h"
#include "common/names.h"
DECLARE_int32(status_report_interval_ms);
DECLARE_int32(periodic_counter_update_period_ms);
DEFINE_bool_hidden(gen_experimental_profile, false,
"(Experimental) generate a new aggregated runtime profile layout. Format is subject "
"to change.");
using namespace rapidjson;
namespace impala {
// Thread counters name
static const string THREAD_TOTAL_TIME = "TotalWallClockTime";
static const string THREAD_USER_TIME = "UserTime";
static const string THREAD_SYS_TIME = "SysTime";
static const string THREAD_VOLUNTARY_CONTEXT_SWITCHES = "VoluntaryContextSwitches";
static const string THREAD_INVOLUNTARY_CONTEXT_SWITCHES = "InvoluntaryContextSwitches";
// The root counter name for all top level counters.
static const string ROOT_COUNTER = "";
const string RuntimeProfileBase::TOTAL_TIME_COUNTER_NAME = "TotalTime";
const string RuntimeProfileBase::LOCAL_TIME_COUNTER_NAME = "LocalTime";
const string RuntimeProfileBase::INACTIVE_TIME_COUNTER_NAME = "InactiveTotalTime";
constexpr ProfileEntryPrototype::Significance ProfileEntryPrototype::ALLSIGNIFICANCE[];
/// Helper to interpret the bit pattern of 'val' as T, which can either be an int64_t or
/// a double.
template <typename T>
static T BitcastFromInt64(int64_t val) {
static_assert(std::is_same<T, int64_t>::value || std::is_same<T, double>::value,
"Only double and int64_t are supported");
T res;
memcpy(&res, &val, sizeof(int64_t));
return res;
}
/// Helper to store the bit pattern of 'val' as an int64_t, T can either be an int64_t or
/// a double.
template <typename T>
static int64_t BitcastToInt64(T val) {
static_assert(std::is_same<T, int64_t>::value || std::is_same<T, double>::value,
"Only double and int64_t are supported");
int64_t res;
memcpy(&res, &val, sizeof(int64_t));
return res;
}
void ProfileEntryPrototypeRegistry::AddPrototype(const ProfileEntryPrototype* prototype) {
lock_guard<SpinLock> l(lock_);
DCHECK(prototypes_.find(prototype->name()) == prototypes_.end()) <<
"Found duplicate prototype name: " << prototype->name();
prototypes_.emplace(prototype->name(), prototype);
}
void ProfileEntryPrototypeRegistry::GetPrototypes(
vector<const ProfileEntryPrototype*>* out) {
lock_guard<SpinLock> l(lock_);
out->reserve(prototypes_.size());
for (auto p : prototypes_) out->push_back(p.second);
}
ProfileEntryPrototype::ProfileEntryPrototype(const char* name, Significance significance,
const char* desc, TUnit::type unit) :
name_(name), significance_(significance), desc_(desc),
unit_(unit) {
ProfileEntryPrototypeRegistry::get()->AddPrototype(this);
}
const char* ProfileEntryPrototype::SignificanceString(
ProfileEntryPrototype::Significance significance) {
switch (significance) {
case Significance::STABLE_HIGH:
return "STABLE & HIGH";
case Significance::STABLE_LOW:
return "STABLE & LOW";
case Significance::UNSTABLE:
return "UNSTABLE";
case Significance::DEBUG:
return "DEBUG";
default:
DCHECK(false);
return "";
}
}
const char* ProfileEntryPrototype::SignificanceDescription(
ProfileEntryPrototype::Significance significance) {
switch (significance) {
case Significance::STABLE_HIGH:
return "High level and stable counters - always useful for measuring query "
"performance and status. Counters that everyone is interested. should "
"rarely change and if it does we will make some effort to notify users.";
case Significance::STABLE_LOW:
return "Low level and stable counters - interesting counters to monitor and "
"analyze by machine. It will probably be interesting under some "
"circumstances for users.";
case Significance::UNSTABLE:
return "Unstable but useful - useful to understand query performance, but subject"
" to change, particularly if the implementation changes. E.g. "
"RowBatchQueuePutWaitTime, MaterializeTupleTimer";
case Significance::DEBUG:
return "Debugging counters - generally not useful to users of Impala, the main"
" use case is low-level debugging. Can be hidden to reduce noise for "
"most consumers of profiles.";
default:
DCHECK(false);
return "";
}
}
RuntimeProfileBase::RuntimeProfileBase(ObjectPool* pool, const string& name)
: pool_(pool), name_(name) {}
RuntimeProfileBase::~RuntimeProfileBase() {}
RuntimeProfile* RuntimeProfile::Create(ObjectPool* pool, const string& name) {
return pool->Add(new RuntimeProfile(pool, name));
}
RuntimeProfile::RuntimeProfile(ObjectPool* pool, const string& name)
: RuntimeProfileBase(pool, name) {
set<string>& root_counters = child_counter_map_[ROOT_COUNTER];
counter_map_[TOTAL_TIME_COUNTER_NAME] = &counter_total_time_;
root_counters.emplace(TOTAL_TIME_COUNTER_NAME);
counter_map_[INACTIVE_TIME_COUNTER_NAME] = &inactive_timer_;
root_counters.emplace(INACTIVE_TIME_COUNTER_NAME);
}
RuntimeProfile::~RuntimeProfile() {
DCHECK(!has_active_periodic_counters_);
}
void RuntimeProfile::StopPeriodicCounters() {
lock_guard<SpinLock> l(counter_map_lock_);
if (!has_active_periodic_counters_) return;
for (Counter* sampling_counter : sampling_counters_) {
PeriodicCounterUpdater::StopSamplingCounter(sampling_counter);
}
for (Counter* rate_counter : rate_counters_) {
PeriodicCounterUpdater::StopRateCounter(rate_counter);
}
for (vector<Counter*>* counters : bucketing_counters_) {
PeriodicCounterUpdater::StopBucketingCounters(counters);
}
for (auto& time_series_counter_entry : time_series_counter_map_) {
PeriodicCounterUpdater::StopTimeSeriesCounter(time_series_counter_entry.second);
}
has_active_periodic_counters_ = false;
}
RuntimeProfile* RuntimeProfile::CreateFromThrift(ObjectPool* pool,
const TRuntimeProfileTree& profiles) {
if (profiles.nodes.size() == 0) return NULL;
int idx = 0;
RuntimeProfileBase* profile =
RuntimeProfileBase::CreateFromThriftHelper(pool, profiles.nodes, &idx);
// The root must always be a RuntimeProfile, not an AggregatedProfile.
RuntimeProfile* root = dynamic_cast<RuntimeProfile*>(profile);
DCHECK(root != nullptr);
root->SetTExecSummary(profiles.exec_summary);
// Some values like local time are not serialized to Thrift and need to be
// recomputed.
root->ComputeTimeInProfile();
return root;
}
RuntimeProfileBase* RuntimeProfileBase::CreateFromThriftHelper(
ObjectPool* pool, const vector<TRuntimeProfileNode>& nodes, int* idx) {
DCHECK_LT(*idx, nodes.size());
const TRuntimeProfileNode& node = nodes[*idx];
RuntimeProfileBase* profile;
if (FLAGS_gen_experimental_profile && node.__isset.aggregated) {
DCHECK(node.aggregated.__isset.num_instances);
profile = AggregatedRuntimeProfile::Create(pool, node.name,
node.aggregated.num_instances, node.aggregated.__isset.input_profiles);
} else {
// If we're not using the transposed profile representation, just convert
// the averaged profile to a regular profile (this is what this code
// always did in the past).
profile = RuntimeProfile::Create(pool, node.name);
}
profile->metadata_ = node.node_metadata;
profile->InitFromThrift(node, pool);
profile->child_counter_map_ = node.child_counters_map;
// TODO: IMPALA-9382: move to RuntimeProfile::InitFromThrift() once 'info_strings_' is
// moved.
profile->info_strings_ = node.info_strings;
profile->info_strings_display_order_ = node.info_strings_display_order;
++*idx;
{
lock_guard<SpinLock> l(profile->children_lock_);
for (int i = 0; i < node.num_children; ++i) {
bool indent = nodes[*idx].indent;
profile->AddChildLocked(
RuntimeProfileBase::CreateFromThriftHelper(pool, nodes, idx),
indent, profile->children_.end());
}
}
return profile;
}
void RuntimeProfile::InitFromThrift(const TRuntimeProfileNode& node, ObjectPool* pool) {
// Only read 'counters' for non-aggregated profiles. Aggregated profiles will populate
// 'counter_map_' from the aggregated counters in thrift.
for (int i = 0; i < node.counters.size(); ++i) {
const TCounter& counter = node.counters[i];
counter_map_[counter.name] = pool->Add(new Counter(counter.unit, counter.value));
}
if (node.__isset.event_sequences) {
for (const TEventSequence& sequence: node.event_sequences) {
event_sequence_map_[sequence.name] =
pool->Add(new EventSequence(sequence.timestamps, sequence.labels));
}
}
if (node.__isset.time_series_counters) {
for (const TTimeSeriesCounter& val: node.time_series_counters) {
// Capture all incoming time series counters with the same type since re-sampling
// will have happened on the sender side.
time_series_counter_map_[val.name] = pool->Add(
new ChunkedTimeSeriesCounter(val.name, val.unit, val.period_ms, val.values));
}
}
if (node.__isset.summary_stats_counters) {
for (const TSummaryStatsCounter& val: node.summary_stats_counters) {
summary_stats_map_[val.name] = pool->Add(new SummaryStatsCounter(
val.unit, val.total_num_values, val.min_value, val.max_value, val.sum));
}
}
}
void AggregatedRuntimeProfile::Update(RuntimeProfile* other, int idx) {
{
lock_guard<SpinLock> l(input_profile_name_lock_);
DCHECK(!input_profile_names_.empty())
<< "Update() can only be called on root of averaged profile tree";
input_profile_names_[idx] = other->name();
}
UpdateRecursive(other, idx);
// Recursively compute times on the whole tree.
ComputeTimeInProfile();
}
void AggregatedRuntimeProfile::UpdateRecursive(RuntimeProfile* other, int idx) {
DCHECK(other != NULL);
DCHECK_GE(idx, 0);
DCHECK_LT(idx, num_input_profiles_);
// Merge this level
{
CounterMap::iterator dst_iter;
CounterMap::const_iterator src_iter;
lock_guard<SpinLock> l(counter_map_lock_);
lock_guard<SpinLock> m(other->counter_map_lock_);
for (src_iter = other->counter_map_.begin();
src_iter != other->counter_map_.end(); ++src_iter) {
dst_iter = counter_map_.find(src_iter->first);
AveragedCounter* avg_counter;
// Get the counter with the same name in dst_iter (this->counter_map_)
// Create one if it doesn't exist.
if (dst_iter == counter_map_.end()) {
avg_counter = pool_->Add(
new AveragedCounter(src_iter->second->unit(), num_input_profiles_));
counter_map_[src_iter->first] = avg_counter;
} else {
DCHECK(dst_iter->second->unit() == src_iter->second->unit());
avg_counter = static_cast<AveragedCounter*>(dst_iter->second);
}
avg_counter->UpdateCounter(src_iter->second, idx);
}
ChildCounterMap::const_iterator child_counter_src_itr;
for (child_counter_src_itr = other->child_counter_map_.begin();
child_counter_src_itr != other->child_counter_map_.end();
++child_counter_src_itr) {
set<string>* child_counters = FindOrInsert(&child_counter_map_,
child_counter_src_itr->first, set<string>());
child_counters->insert(child_counter_src_itr->second.begin(),
child_counter_src_itr->second.end());
}
}
if (FLAGS_gen_experimental_profile) {
lock_guard<SpinLock> l(agg_info_strings_lock_);
lock_guard<SpinLock> m(other->info_strings_lock_);
for (const auto& entry : other->info_strings_) {
vector<string>& values = agg_info_strings_[entry.first];
if (values.empty()) values.resize(num_input_profiles_);
if (values[idx] != entry.second) values[idx] = entry.second;
}
}
if (FLAGS_gen_experimental_profile) {
// Merge summary stats.
lock_guard<SpinLock> l(summary_stats_map_lock_);
lock_guard<SpinLock> m(other->summary_stats_map_lock_);
for (const RuntimeProfile::SummaryStatsCounterMap::value_type& src_entry :
other->summary_stats_map_) {
DCHECK_GT(num_input_profiles_, 0);
AggSummaryStatsCounterMap::mapped_type& agg_entry =
summary_stats_map_[src_entry.first];
vector<SummaryStatsCounter*>& agg_instance_counters = agg_entry.second;
if (agg_instance_counters.empty()) {
agg_instance_counters.resize(num_input_profiles_);
agg_entry.first = src_entry.second->unit();
} else {
DCHECK_EQ(agg_entry.first, src_entry.second->unit()) << "Unit must be consistent";
}
// Get the counter with the same name. Create one if it doesn't exist.
if (agg_instance_counters[idx] == nullptr) {
agg_instance_counters[idx] =
pool_->Add(new SummaryStatsCounter(src_entry.second->unit()));
}
// Overwrite the previous value with the new value.
agg_instance_counters[idx]->SetStats(*src_entry.second);
}
}
{
lock_guard<SpinLock> l(children_lock_);
lock_guard<SpinLock> m(other->children_lock_);
// Recursively merge children with matching names.
// Track the current position in the vector so we preserve the order of children
// if children are added after the first Update() call (IMPALA-6694).
// E.g. if the first update sends [B, D] and the second update sends [A, B, C, D],
// then this code makes sure that children_ is [A, B, C, D] afterwards.
ChildVector::iterator insert_pos = children_.begin();
for (int i = 0; i < other->children_.size(); ++i) {
RuntimeProfile* other_child =
dynamic_cast<RuntimeProfile*>(other->children_[i].first);
DCHECK(other_child != nullptr)
<< other->children_[i].first->name() << " must be a RuntimeProfile";
ChildMap::iterator j = child_map_.find(other_child->name());
AggregatedRuntimeProfile* child = NULL;
if (j != child_map_.end()) {
child = dynamic_cast<AggregatedRuntimeProfile*>(j->second);
DCHECK(child != nullptr);
// Search forward until the insert position is either at the end of the vector
// or after this child. This preserves the order if the relative order of
// children in all updates is consistent.
bool found_child = false;
while (insert_pos != children_.end() && !found_child) {
found_child = insert_pos->first == child;
++insert_pos;
}
} else {
child =
Create(pool_, other_child->name(), num_input_profiles_, /*is_root=*/false);
child->metadata_ = other_child->metadata();
bool indent_other_child = other->children_[i].second;
child_map_[child->name_] = child;
insert_pos = children_.insert(insert_pos, make_pair(child, indent_other_child));
++insert_pos;
}
child->UpdateRecursive(other_child, idx);
}
}
}
void RuntimeProfile::Update(const TRuntimeProfileTree& thrift_profile) {
int idx = 0;
Update(thrift_profile.nodes, &idx);
DCHECK_EQ(idx, thrift_profile.nodes.size());
// Re-compute the total time for the entire profile tree.
ComputeTimeInProfile();
}
void RuntimeProfile::Update(const vector<TRuntimeProfileNode>& nodes, int* idx) {
if (UNLIKELY(nodes.size()) == 0) return;
DCHECK_LT(*idx, nodes.size());
const TRuntimeProfileNode& node = nodes[*idx];
{
// Update this level.
map<string, Counter*>::iterator dst_iter;
lock_guard<SpinLock> l(counter_map_lock_);
for (int i = 0; i < node.counters.size(); ++i) {
const TCounter& tcounter = node.counters[i];
CounterMap::iterator j = counter_map_.find(tcounter.name);
if (j == counter_map_.end()) {
counter_map_[tcounter.name] =
pool_->Add(new Counter(tcounter.unit, tcounter.value));
} else {
if (j->second->unit() != tcounter.unit) {
LOG(ERROR) << "Cannot update counters with the same name ("
<< j->first << ") but different units.";
} else {
j->second->Set(tcounter.value);
}
}
}
ChildCounterMap::const_iterator child_counter_src_itr;
for (child_counter_src_itr = node.child_counters_map.begin();
child_counter_src_itr != node.child_counters_map.end();
++child_counter_src_itr) {
set<string>* child_counters = FindOrInsert(&child_counter_map_,
child_counter_src_itr->first, set<string>());
child_counters->insert(child_counter_src_itr->second.begin(),
child_counter_src_itr->second.end());
}
for (int i = 0; i < node.time_series_counters.size(); ++i) {
const TTimeSeriesCounter& c = node.time_series_counters[i];
TimeSeriesCounterMap::iterator it = time_series_counter_map_.find(c.name);
if (it == time_series_counter_map_.end()) {
// Capture all incoming time series counters with the same type since re-sampling
// will have happened on the sender side.
time_series_counter_map_[c.name] = pool_->Add(
new ChunkedTimeSeriesCounter(c.name, c.unit, c.period_ms, c.values));
} else {
int64_t start_idx = c.__isset.start_index ? c.start_index : 0;
it->second->SetSamples(c.period_ms, c.values, start_idx);
}
}
}
{
const InfoStrings& info_strings = node.info_strings;
lock_guard<SpinLock> l(info_strings_lock_);
for (const string& key: node.info_strings_display_order) {
// Look for existing info strings and update in place. If there
// are new strings, add them to the end of the display order.
// TODO: Is nodes.info_strings always a superset of
// info_strings_? If so, can just copy the display order.
InfoStrings::const_iterator it = info_strings.find(key);
DCHECK(it != info_strings.end());
InfoStrings::iterator existing = info_strings_.find(key);
if (existing == info_strings_.end()) {
info_strings_.emplace(key, it->second);
info_strings_display_order_.push_back(key);
} else {
info_strings_[key] = it->second;
}
}
}
{
lock_guard<SpinLock> l(event_sequence_lock_);
for (int i = 0; i < node.event_sequences.size(); ++i) {
const TEventSequence& seq = node.event_sequences[i];
EventSequenceMap::iterator it = event_sequence_map_.find(seq.name);
if (it == event_sequence_map_.end()) {
event_sequence_map_[seq.name] =
pool_->Add(new EventSequence(seq.timestamps, seq.labels));
} else {
it->second->AddNewerEvents(seq.timestamps, seq.labels);
}
}
}
{
lock_guard<SpinLock> l(summary_stats_map_lock_);
for (int i = 0; i < node.summary_stats_counters.size(); ++i) {
const TSummaryStatsCounter& c = node.summary_stats_counters[i];
SummaryStatsCounterMap::iterator it = summary_stats_map_.find(c.name);
if (it == summary_stats_map_.end()) {
summary_stats_map_[c.name] =
pool_->Add(new SummaryStatsCounter(
c.unit, c.total_num_values, c.min_value, c.max_value, c.sum));
} else {
it->second->SetStats(c);
}
}
}
++*idx;
{
lock_guard<SpinLock> l(children_lock_);
// Track the current position in the vector so we preserve the order of children
// if children are added after the first Update() call (IMPALA-6694).
// E.g. if the first update sends [B, D] and the second update sends [A, B, C, D],
// then this code makes sure that children_ is [A, B, C, D] afterwards.
ChildVector::iterator insert_pos = children_.begin();
// Update children with matching names; create new ones if they don't match.
for (int i = 0; i < node.num_children; ++i) {
const TRuntimeProfileNode& tchild = nodes[*idx];
ChildMap::iterator j = child_map_.find(tchild.name);
RuntimeProfile* child = NULL;
if (j != child_map_.end()) {
child = dynamic_cast<RuntimeProfile*>(j->second);
DCHECK(child != nullptr) << j->second->name() << " must be a RuntimeProfile";
// Search forward until the insert position is either at the end of the vector
// or after this child. This preserves the order if the relative order of
// children in all updates is consistent.
bool found_child = false;
while (insert_pos != children_.end() && !found_child) {
found_child = insert_pos->first == child;
++insert_pos;
}
} else {
child = Create(pool_, tchild.name);
child->metadata_ = tchild.node_metadata;
child_map_[tchild.name] = child;
insert_pos = children_.insert(insert_pos, make_pair(child, tchild.indent));
++insert_pos;
}
child->Update(nodes, idx);
}
}
}
void RuntimeProfileBase::ComputeTimeInProfile() {
// Recurse on children. After this, childrens' total time is up to date.
int64_t children_total_time = 0;
{
lock_guard<SpinLock> l(children_lock_);
for (int i = 0; i < children_.size(); ++i) {
children_[i].first->ComputeTimeInProfile();
children_total_time += children_[i].first->total_time();
}
}
// IMPALA-5200: Take the max, because the parent includes all of the time from the
// children, whether or not its total time counter has been updated recently enough
// to see this.
int64_t total_time_ns = max(children_total_time, total_time_counter()->value());
// If a local time counter exists, use its value as local time. Otherwise, derive the
// local time from total time and the child time.
int64_t local_time_ns = 0;
bool has_local_time_counter = false;
{
lock_guard<SpinLock> l(counter_map_lock_);
CounterMap::const_iterator itr = counter_map_.find(LOCAL_TIME_COUNTER_NAME);
if (itr != counter_map_.end()) {
local_time_ns = itr->second->value();
has_local_time_counter = true;
}
}
if (!has_local_time_counter) {
local_time_ns = total_time_ns - children_total_time;
local_time_ns -= inactive_timer()->value();
}
// Counters have some margin, set to 0 if it was negative.
local_time_ns = ::max<int64_t>(0, local_time_ns);
double local_time_frac =
min(1.0, static_cast<double>(local_time_ns) / total_time_ns);
total_time_ns_.Store(total_time_ns);
local_time_ns_.Store(local_time_ns);
local_time_frac_.Store(BitcastToInt64(local_time_frac));
}
void RuntimeProfile::AddChild(
RuntimeProfileBase* child, bool indent, RuntimeProfile* loc) {
lock_guard<SpinLock> l(children_lock_);
ChildVector::iterator insert_pos;
if (loc == NULL) {
insert_pos = children_.end();
} else {
bool found = false;
for (ChildVector::iterator it = children_.begin(); it != children_.end(); ++it) {
if (it->first == loc) {
insert_pos = it + 1;
found = true;
break;
}
}
DCHECK(found) << "Invalid loc";
}
AddChildLocked(child, indent, insert_pos);
}
void RuntimeProfileBase::AddChildLocked(
RuntimeProfileBase* child, bool indent, ChildVector::iterator insert_pos) {
children_lock_.DCheckLocked();
DCHECK(child != NULL);
if (child_map_.count(child->name()) > 0) {
// This child has already been added, so do nothing.
// Otherwise, the map and vector will be out of sync.
return;
}
child_map_[child->name()] = child;
children_.insert(insert_pos, make_pair(child, indent));
}
void RuntimeProfile::PrependChild(RuntimeProfileBase* child, bool indent) {
lock_guard<SpinLock> l(children_lock_);
AddChildLocked(child, indent, children_.begin());
}
RuntimeProfile* RuntimeProfile::CreateChild(const string& name, bool indent,
bool prepend) {
lock_guard<SpinLock> l(children_lock_);
DCHECK(child_map_.find(name) == child_map_.end());
RuntimeProfile* child = Create(pool_, name);
AddChildLocked(child, indent, prepend ? children_.begin() : children_.end());
return child;
}
void RuntimeProfileBase::GetChildren(vector<RuntimeProfileBase*>* children) {
children->clear();
lock_guard<SpinLock> l(children_lock_);
for (const auto& entry : children_) children->push_back(entry.first);
}
void RuntimeProfileBase::GetAllChildren(vector<RuntimeProfileBase*>* children) {
lock_guard<SpinLock> l(children_lock_);
for (ChildMap::iterator i = child_map_.begin(); i != child_map_.end(); ++i) {
children->push_back(i->second);
i->second->GetAllChildren(children);
}
}
int RuntimeProfileBase::num_counters() const {
std::lock_guard<SpinLock> l(counter_map_lock_);
return counter_map_.size();
}
void RuntimeProfile::SortChildrenByTotalTime() {
lock_guard<SpinLock> l(children_lock_);
// Create a snapshot of total time values so that they don't change while we're
// sorting. Sort the <total_time, index> pairs, then reshuffle children_.
vector<pair<int64_t, int64_t>> total_times;
for (int i = 0; i < children_.size(); ++i) {
total_times.emplace_back(children_[i].first->total_time_counter()->value(), i);
}
// Order by descending total time.
sort(total_times.begin(), total_times.end(),
[](const pair<int64_t, int64_t>& p1, const pair<int64_t, int64_t>& p2) {
return p1.first > p2.first;
});
ChildVector new_children;
for (const auto& p : total_times) new_children.emplace_back(children_[p.second]);
children_ = move(new_children);
}
void RuntimeProfileBase::AddInfoString(const string& key, const string& value) {
return AddInfoStringInternal(key, value, false);
}
void RuntimeProfile::AddInfoStringRedacted(const string& key, const string& value) {
return AddInfoStringInternal(key, value, false, true);
}
void RuntimeProfile::AppendInfoString(const string& key, const string& value) {
return AddInfoStringInternal(key, value, true);
}
void RuntimeProfileBase::AddInfoStringInternal(
const string& key, string value, bool append, bool redact) {
if (redact) Redact(&value);
StripTrailingWhitespace(&value);
lock_guard<SpinLock> l(info_strings_lock_);
InfoStrings::iterator it = info_strings_.find(key);
if (it == info_strings_.end()) {
info_strings_.emplace(key, std::move(value));
info_strings_display_order_.push_back(key);
} else {
if (append) {
it->second += ", " + std::move(value);
} else {
it->second = std::move(value);
}
}
}
void RuntimeProfile::UpdateInfoString(const string& key, string value) {
lock_guard<SpinLock> l(info_strings_lock_);
InfoStrings::iterator it = info_strings_.find(key);
if (it != info_strings_.end()) it->second = std::move(value);
}
const string* RuntimeProfile::GetInfoString(const string& key) const {
lock_guard<SpinLock> l(info_strings_lock_);
InfoStrings::const_iterator it = info_strings_.find(key);
if (it == info_strings_.end()) return NULL;
return &it->second;
}
void RuntimeProfile::AddCodegenMsg(
bool codegen_enabled, const string& extra_info, const string& extra_label) {
string str = codegen_enabled ? "Codegen Enabled" : "Codegen Disabled";
if (!extra_info.empty()) str = str + ": " + extra_info;
if (!extra_label.empty()) str = extra_label + " " + str;
AppendExecOption(str);
}
#define ADD_COUNTER_IMPL(NAME, T) \
RuntimeProfile::T* RuntimeProfile::NAME( \
const string& name, TUnit::type unit, const string& parent_counter_name) { \
lock_guard<SpinLock> l(counter_map_lock_); \
bool dummy; \
return NAME##Locked(name, unit, parent_counter_name, &dummy); \
} \
RuntimeProfile::T* RuntimeProfile::NAME##Locked( const string& name, \
TUnit::type unit, const string& parent_counter_name, bool* created) { \
counter_map_lock_.DCheckLocked(); \
if (counter_map_.find(name) != counter_map_.end()) { \
*created = false; \
return reinterpret_cast<T*>(counter_map_[name]); \
} \
DCHECK(parent_counter_name == ROOT_COUNTER \
|| counter_map_.find(parent_counter_name) != counter_map_.end()); \
T* counter = pool_->Add(new T(unit)); \
counter_map_[name] = counter; \
set<string>* child_counters = \
FindOrInsert(&child_counter_map_, parent_counter_name, set<string>()); \
child_counters->insert(name); \
*created = true; \
return counter; \
}
ADD_COUNTER_IMPL(AddCounter, Counter);
ADD_COUNTER_IMPL(AddHighWaterMarkCounter, HighWaterMarkCounter);
ADD_COUNTER_IMPL(AddConcurrentTimerCounter, ConcurrentTimerCounter);
RuntimeProfile::DerivedCounter* RuntimeProfile::AddDerivedCounter(
const string& name, TUnit::type unit,
const SampleFunction& counter_fn, const string& parent_counter_name) {
lock_guard<SpinLock> l(counter_map_lock_);
if (counter_map_.find(name) != counter_map_.end()) return NULL;
DerivedCounter* counter = pool_->Add(new DerivedCounter(unit, counter_fn));
counter_map_[name] = counter;
set<string>* child_counters =
FindOrInsert(&child_counter_map_, parent_counter_name, set<string>());
child_counters->insert(name);
return counter;
}
RuntimeProfile::ThreadCounters* RuntimeProfile::AddThreadCounters(
const string& prefix) {
ThreadCounters* counter = pool_->Add(new ThreadCounters());
counter->total_time_ = AddCounter(prefix + THREAD_TOTAL_TIME, TUnit::TIME_NS);
counter->user_time_ = AddCounter(prefix + THREAD_USER_TIME, TUnit::TIME_NS,
prefix + THREAD_TOTAL_TIME);
counter->sys_time_ = AddCounter(prefix + THREAD_SYS_TIME, TUnit::TIME_NS,
prefix + THREAD_TOTAL_TIME);
counter->voluntary_context_switches_ =
AddCounter(prefix + THREAD_VOLUNTARY_CONTEXT_SWITCHES, TUnit::UNIT);
counter->involuntary_context_switches_ =
AddCounter(prefix + THREAD_INVOLUNTARY_CONTEXT_SWITCHES, TUnit::UNIT);
return counter;
}
void RuntimeProfile::AddLocalTimeCounter(const SampleFunction& counter_fn) {
DerivedCounter* local_time_counter = pool_->Add(
new DerivedCounter(TUnit::TIME_NS, counter_fn));
lock_guard<SpinLock> l(counter_map_lock_);
DCHECK(counter_map_.find(LOCAL_TIME_COUNTER_NAME) == counter_map_.end())
<< "LocalTimeCounter already exists in the map.";
counter_map_[LOCAL_TIME_COUNTER_NAME] = local_time_counter;
}
RuntimeProfileBase::Counter* RuntimeProfileBase::GetCounter(const string& name) {
lock_guard<SpinLock> l(counter_map_lock_);
if (counter_map_.find(name) != counter_map_.end()) {
return counter_map_[name];
}
return NULL;
}
RuntimeProfileBase::SummaryStatsCounter* RuntimeProfile::GetSummaryStatsCounter(
const string& name) {
lock_guard<SpinLock> l(summary_stats_map_lock_);
if (summary_stats_map_.find(name) != summary_stats_map_.end()) {
return summary_stats_map_[name];
}
return nullptr;
}
void RuntimeProfileBase::GetCounters(const string& name, vector<Counter*>* counters) {
Counter* c = GetCounter(name);
if (c != NULL) counters->push_back(c);
lock_guard<SpinLock> l(children_lock_);
for (int i = 0; i < children_.size(); ++i) {
children_[i].first->GetCounters(name, counters);
}
}
RuntimeProfile::EventSequence* RuntimeProfile::GetEventSequence(const string& name) const
{
lock_guard<SpinLock> l(event_sequence_lock_);
EventSequenceMap::const_iterator it = event_sequence_map_.find(name);
if (it == event_sequence_map_.end()) return NULL;
return it->second;
}
void RuntimeProfile::ToJson(Document* d) const {
// queryObj that stores all JSON format profile information
Value queryObj(kObjectType);
ToJsonHelper(&queryObj, d);
d->RemoveMember("contents");
d->AddMember("contents", queryObj, d->GetAllocator());
}
void RuntimeProfileBase::ToJsonCounters(Value* parent, Document* d,
const string& counter_name, const CounterMap& counter_map,
const ChildCounterMap& child_counter_map) const {
auto& allocator = d->GetAllocator();
ChildCounterMap::const_iterator itr = child_counter_map.find(counter_name);
if (itr != child_counter_map.end()) {
const set<string>& child_counters = itr->second;
for (const string& child_counter: child_counters) {
CounterMap::const_iterator iter = counter_map.find(child_counter);
if (iter == counter_map.end()) continue;
Value counter(kObjectType);
iter->second->ToJson(*d, &counter);
Value child_counter_json(child_counter.c_str(), child_counter.size(), allocator);
counter.AddMember("counter_name", child_counter_json, allocator);
Value child_counters_json(kArrayType);
ToJsonCounters(
&child_counters_json, d, child_counter, counter_map, child_counter_map);
if (!child_counters_json.Empty()){
counter.AddMember("child_counters", child_counters_json, allocator);
}
parent->PushBack(counter, allocator);
}
}
}
void RuntimeProfileBase::ToJsonHelper(Value* parent, Document* d) const {
Document::AllocatorType& allocator = d->GetAllocator();
// Create copy of counter_map_ and child_counter_map_ so we don't need to hold lock
// while we call value() on the counters (some of those might be DerivedCounters).
CounterMap counter_map;
ChildCounterMap child_counter_map;
{
lock_guard<SpinLock> l(counter_map_lock_);
counter_map = counter_map_;
child_counter_map = child_counter_map_;
}
// 1. Name
Value name(name_.c_str(), allocator);
parent->AddMember("profile_name", name, allocator);
// 2. Num_children
parent->AddMember("num_children", children_.size(), allocator);
// 3. Metadata
// Set the required metadata field to the plan node ID for compatibility with any tools
// that rely on the plan node id being set there.
// Legacy field. May contain the node ID for plan nodes.
// Replaced by node_metadata, which contains richer metadata.
parent->AddMember("metadata",
metadata_.__isset.plan_node_id ? metadata_.plan_node_id : -1, allocator);
// requires exactly one field of a union to be set so we only mark node_metadata
// as set if that is the case.
if (metadata_.__isset.plan_node_id || metadata_.__isset.data_sink_id){
Value node_metadata_json(kObjectType);
if (metadata_.__isset.plan_node_id){
node_metadata_json.AddMember("plan_node_id", metadata_.plan_node_id, allocator);
}
if (metadata_.__isset.data_sink_id){
node_metadata_json.AddMember("data_sink_id", metadata_.data_sink_id, allocator);
}
parent->AddMember("node_metadata", node_metadata_json, allocator);
}
// 4. Info strings
// TODO: IMPALA-9382: move to subclass once 'info_strings_' is also moved
{
lock_guard<SpinLock> l(info_strings_lock_);
if (!info_strings_.empty()) {
Value info_strings_json(kArrayType);
for (const string& key : info_strings_display_order_) {
Value info_string_json(kObjectType);
Value key_json(key.c_str(), allocator);
auto value_itr = info_strings_.find(key);
DCHECK(value_itr != info_strings_.end());
Value value_json(value_itr->second.c_str(), allocator);
info_string_json.AddMember("key", key_json, allocator);
info_string_json.AddMember("value", value_json, allocator);
info_strings_json.PushBack(info_string_json, allocator);
}
parent->AddMember("info_strings", info_strings_json, allocator);
}
}
// 5. Counters and info strings from subclasses
ToJsonSubclass(parent, d);
// 6. Counters
Value counters(kArrayType);
ToJsonCounters(&counters, d, ROOT_COUNTER, counter_map, child_counter_map);
if (!counters.Empty()) {
parent->AddMember("counters", counters, allocator);
}
// 7. Children Runtime Profiles
//
// Create copy of children_ so we don't need to hold lock while we call
// ToJsonHelper() on the children.
ChildVector children;
{
lock_guard<SpinLock> l(children_lock_);
children = children_;
}
if (!children.empty()) {
Value child_profiles(kArrayType);
for (int i = 0; i < children.size(); ++i) {
RuntimeProfileBase* profile = children[i].first;
Value child_profile(kObjectType);
profile->ToJsonHelper(&child_profile, d);
child_profiles.PushBack(child_profile, allocator);
}
parent->AddMember("child_profiles", child_profiles, allocator);
}
}
void RuntimeProfile::ToJsonSubclass(Value* parent, Document* d) const {
Document::AllocatorType& allocator = d->GetAllocator();
// 1. Events
{
lock_guard<SpinLock> l(event_sequence_lock_);
if (!event_sequence_map_.empty()) {
Value event_sequences_json(kArrayType);
for (EventSequenceMap::const_iterator it = event_sequence_map_.begin();
it != event_sequence_map_.end(); ++it) {
Value event_sequence_json(kObjectType);
it->second->ToJson(*d, &event_sequence_json);
event_sequences_json.PushBack(event_sequence_json, allocator);
}
parent->AddMember("event_sequences", event_sequences_json, allocator);
}
}
// 2. Time_series_counter_map
{
// Print all time series counters as following:
// - <Name> (<period>): <val1>, <val2>, <etc>
lock_guard<SpinLock> l(counter_map_lock_);
if (!time_series_counter_map_.empty()) {
Value time_series_counters_json(kArrayType);
for (const TimeSeriesCounterMap::value_type& v : time_series_counter_map_) {
TimeSeriesCounter* counter = v.second;
Value time_series_json(kObjectType);
counter->ToJson(*d, &time_series_json);
time_series_counters_json.PushBack(time_series_json, allocator);
}
parent->AddMember("time_series_counters", time_series_counters_json, allocator);
}
}
// 3. SummaryStatsCounter
{
lock_guard<SpinLock> l(summary_stats_map_lock_);
if (!summary_stats_map_.empty()) {
Value summary_stats_counters_json(kArrayType);
for (const SummaryStatsCounterMap::value_type& v : summary_stats_map_) {
Value summary_stats_counter(kObjectType);
Value summary_name_json(v.first.c_str(), v.first.size(), allocator);
v.second->ToJson(*d, &summary_stats_counter);
summary_stats_counter.AddMember("counter_name", summary_name_json, allocator);
summary_stats_counters_json.PushBack(summary_stats_counter, allocator);
}
parent->AddMember("summary_stats_counters", summary_stats_counters_json, allocator);
}
}
}
// Print the profile:
// 1. Profile Name
// 2. Info Strings
// 3. Counters
// 4. Children
void RuntimeProfileBase::PrettyPrint(ostream* s, const string& prefix) const {
ostream& stream = *s;
// Create copy of counter_map_ and child_counter_map_ so we don't need to hold lock
// while we call value() on the counters (some of those might be DerivedCounters).
CounterMap counter_map;
ChildCounterMap child_counter_map;
{
lock_guard<SpinLock> l(counter_map_lock_);
counter_map = counter_map_;
child_counter_map = child_counter_map_;
}
int num_input_profiles = GetNumInputProfiles();
Counter* total_time = total_time_counter();
DCHECK(total_time != nullptr);
stream.flags(ios::fixed);
stream << prefix << name_;
if (num_input_profiles != 1) {
stream << " [" << num_input_profiles << " instances]";
}
stream << ":";
if (total_time->value() != 0) {
stream << "(Total: " << PrettyPrinter::Print(total_time->value(), total_time->unit())
<< ", non-child: "
<< PrettyPrinter::Print(local_time_ns_.Load(), TUnit::TIME_NS)
<< ", % non-child: " << setprecision(2)
<< BitcastFromInt64<double>(local_time_frac_.Load()) * 100 << "%)";
}
stream << endl;
// TODO: IMPALA-9382: move to RuntimeProfile::PrettyPrintInfoStrings() once we move
// 'info_strings_'.
{
lock_guard<SpinLock> l(info_strings_lock_);
for (const string& key: info_strings_display_order_) {
stream << prefix << " " << key << ": " << info_strings_.find(key)->second << endl;
}
}
PrettyPrintInfoStrings(s, prefix);
PrettyPrintSubclassCounters(s, prefix);
RuntimeProfileBase::PrintChildCounters(
prefix, ROOT_COUNTER, counter_map, child_counter_map, s);
// Create copy of children_ so we don't need to hold lock while we call
// PrettyPrint() on the children.
ChildVector children;
{
lock_guard<SpinLock> l(children_lock_);
children = children_;
}
for (int i = 0; i < children.size(); ++i) {
RuntimeProfileBase* profile = children[i].first;
bool indent = children[i].second;
profile->PrettyPrint(s, prefix + (indent ? " " : ""));
}
}
void RuntimeProfile::PrettyPrintInfoStrings(ostream* s, const string& prefix) const {}
void RuntimeProfile::PrettyPrintSubclassCounters(ostream* s, const string& prefix) const {
ostream& stream = *s;
{
// Print all the event timers as the following:
// <EventKey> Timeline: 2s719ms
// - Event 1: 6.522us (6.522us)
// - Event 2: 2s288ms (2s288ms)
// - Event 3: 2s410ms (121.138ms)
// The times in parentheses are the time elapsed since the last event.
vector<EventSequence::Event> events;
lock_guard<SpinLock> l(event_sequence_lock_);
for (const auto& event_sequence : event_sequence_map_) {
// If the stopwatch has never been started (e.g. because this sequence came from
// Thrift), look for the last element to tell us the total runtime. For
// currently-updating sequences, it's better to use the stopwatch value because that
// updates continuously.
int64_t last = event_sequence.second->ElapsedTime();
event_sequence.second->GetEvents(&events);
if (last == 0 && events.size() > 0) last = events.back().second;
stream << prefix << " " << event_sequence.first << ": "
<< PrettyPrinter::Print(last, TUnit::TIME_NS)
<< endl;
int64_t prev = 0L;
event_sequence.second->GetEvents(&events);
for (const EventSequence::Event& event: events) {
stream << prefix << " - " << event.first << ": "
<< PrettyPrinter::Print(event.second, TUnit::TIME_NS) << " ("
<< PrettyPrinter::Print(event.second - prev, TUnit::TIME_NS) << ")"
<< endl;
prev = event.second;
}
}
}
{
// Print all time series counters as following:
// - <Name> (<period>): <val1>, <val2>, <etc>
lock_guard<SpinLock> l(counter_map_lock_);
for (const auto& v : time_series_counter_map_) {
const TimeSeriesCounter* counter = v.second;
lock_guard<SpinLock> l(counter->lock_);
int num, period;
const int64_t* samples = counter->GetSamplesLocked(&num, &period);
if (num > 0) {
// Clamp number of printed values at 64, the maximum number of values in the
// SamplingTimeSeriesCounter.
int step = 1 + (num - 1) / 64;
period *= step;
stream << prefix << " - " << v.first << " ("
<< PrettyPrinter::Print(period * 1000000L, TUnit::TIME_NS) << "): ";
for (int i = 0; i < num; i += step) {
stream << PrettyPrinter::Print(samples[i], counter->unit());
if (i + step < num) stream << ", ";
}
if (step > 1) {
stream << " (Showing " << ((num + 1) / step) << " of " << num << " values from "
"Thrift Profile)";
}
stream << endl;
}
}
}
{
lock_guard<SpinLock> l(summary_stats_map_lock_);
// Print all SummaryStatsCounters as following:
// <Name>: (Avg: <value> ; Min: <min_value> ; Max: <max_value> ;
// Number of samples: <total>)
for (const auto& v : summary_stats_map_) {
v.second->PrettyPrint(prefix, v.first, s);
}
}
}
Status RuntimeProfile::Compress(vector<uint8_t>* out) const {
Status status;
TRuntimeProfileTree thrift_object;
const_cast<RuntimeProfile*>(this)->ToThrift(&thrift_object);
ThriftSerializer serializer(true);
vector<uint8_t> serialized_buffer;
RETURN_IF_ERROR(serializer.SerializeToVector(&thrift_object, &serialized_buffer));
// Compress the serialized thrift string. This uses string keys and is very
// easy to compress.
scoped_ptr<Codec> compressor;
Codec::CodecInfo codec_info(THdfsCompression::DEFAULT);
RETURN_IF_ERROR(Codec::CreateCompressor(NULL, false, codec_info, &compressor));
const auto close_compressor =
MakeScopeExitTrigger([&compressor]() { compressor->Close(); });
int64_t max_compressed_size = compressor->MaxOutputLen(serialized_buffer.size());
DCHECK_GT(max_compressed_size, 0);
out->resize(max_compressed_size);
int64_t result_len = out->size();
uint8_t* compressed_buffer_ptr = out->data();
RETURN_IF_ERROR(compressor->ProcessBlock(true, serialized_buffer.size(),
serialized_buffer.data(), &result_len, &compressed_buffer_ptr));
out->resize(result_len);
return Status::OK();
}
Status RuntimeProfile::DecompressToThrift(
const vector<uint8_t>& compressed_profile, TRuntimeProfileTree* out) {
scoped_ptr<Codec> decompressor;
MemTracker mem_tracker;
MemPool mem_pool(&mem_tracker);
const auto close_mem_tracker = MakeScopeExitTrigger([&mem_pool, &mem_tracker]() {
mem_pool.FreeAll();
mem_tracker.Close();
});
RETURN_IF_ERROR(Codec::CreateDecompressor(
&mem_pool, false, THdfsCompression::DEFAULT, &decompressor));
const auto close_decompressor =
MakeScopeExitTrigger([&decompressor]() { decompressor->Close(); });
int64_t result_len;
uint8_t* decompressed_buffer;
RETURN_IF_ERROR(decompressor->ProcessBlock(false, compressed_profile.size(),
compressed_profile.data(), &result_len, &decompressed_buffer));
uint32_t deserialized_len = static_cast<uint32_t>(result_len);
RETURN_IF_ERROR(
DeserializeThriftMsg(decompressed_buffer, &deserialized_len, true, out));
return Status::OK();
}
Status RuntimeProfile::DecompressToProfile(
const vector<uint8_t>& compressed_profile, ObjectPool* pool, RuntimeProfile** out) {
TRuntimeProfileTree thrift_profile;
RETURN_IF_ERROR(
RuntimeProfile::DecompressToThrift(compressed_profile, &thrift_profile));
*out = RuntimeProfile::CreateFromThrift(pool, thrift_profile);
return Status::OK();
}
Status RuntimeProfile::SerializeToArchiveString(string* out) const {
stringstream ss;
RETURN_IF_ERROR(SerializeToArchiveString(&ss));
*out = ss.str();
return Status::OK();
}
Status RuntimeProfile::SerializeToArchiveString(stringstream* out) const {
vector<uint8_t> compressed_buffer;
RETURN_IF_ERROR(Compress(&compressed_buffer));
Base64Encode(compressed_buffer, out);
return Status::OK();
}
Status RuntimeProfile::DeserializeFromArchiveString(
const std::string& archive_str, TRuntimeProfileTree* out) {
int64_t decoded_max;
if (!Base64DecodeBufLen(archive_str.c_str(), archive_str.size(), &decoded_max)) {
return Status("Error in DeserializeFromArchiveString: Base64DecodeBufLen failed.");
}
vector<uint8_t> decoded_buffer;
decoded_buffer.resize(decoded_max);
int64_t decoded_len;
if (!Base64Decode(archive_str.c_str(), archive_str.size(), decoded_max,
reinterpret_cast<char*>(decoded_buffer.data()), &decoded_len)) {
return Status("Error in DeserializeFromArchiveString: Base64Decode failed.");
}
decoded_buffer.resize(decoded_len);
return DecompressToThrift(decoded_buffer, out);
}
void RuntimeProfile::SetTExecSummary(const TExecSummary& summary) {
lock_guard<SpinLock> l(t_exec_summary_lock_);
t_exec_summary_ = summary;
}
void RuntimeProfile::ToThrift(TRuntimeProfileTree* tree) const {
tree->nodes.clear();
ToThriftHelper(&tree->nodes);
ExecSummaryToThrift(tree);
if (FLAGS_gen_experimental_profile) tree->__set_profile_version(2);
}
void RuntimeProfileBase::ToThriftHelper(vector<TRuntimeProfileNode>* nodes) const {
// Use a two-pass approach where we first collect nodes with a pre-order traversal and
// then serialize them. This is done to allow reserving the full vector of
// TRuntimeProfileNodes upfront - copying the constructed nodes when resizing the vector
// can be very expensive - see IMPALA-9378.
vector<CollectedNode> preorder_nodes;
CollectNodes(false, &preorder_nodes);
nodes->reserve(preorder_nodes.size());
for (CollectedNode& preorder_node : preorder_nodes) {
nodes->emplace_back();
TRuntimeProfileNode& node = nodes->back();
preorder_node.node->ToThriftHelper(&node);
node.indent = preorder_node.indent;
node.num_children = preorder_node.num_children;
}
}
void RuntimeProfileBase::ToThriftHelper(TRuntimeProfileNode* out_node) const {
// Use a reference to reduce code churn. TODO: clean this up to use a pointer later.
TRuntimeProfileNode& node = *out_node;
node.name = name_;
// Set the required metadata field to the plan node ID for compatibility with any tools
// that rely on the plan node id being set there.
node.metadata = metadata_.__isset.plan_node_id ? metadata_.plan_node_id : -1;
// Thrift requires exactly one field of a union to be set so we only mark node_metadata
// as set if that is the case.
if (metadata_.__isset.plan_node_id || metadata_.__isset.data_sink_id) {
node.__set_node_metadata(metadata_);
}
// Copy of the entries. We need to do this because calling value() on entries in
// 'counter_map_' might invoke an arbitrary function. Use vector instead of map
// for efficiency of creation and iteration. Only copy references to string keys.
// std::map entries are stable.
vector<pair<const string&, const Counter*>> counter_map_entries;
{
lock_guard<SpinLock> l(counter_map_lock_);
counter_map_entries.reserve(counter_map_.size());
std::copy(counter_map_.begin(), counter_map_.end(),
std::back_inserter(counter_map_entries));
node.child_counters_map = child_counter_map_;
}
// TODO: IMPALA-9382: move to RuntimeProfile::ToThriftSubclass() once 'info_strings_'
// is moved.
{
lock_guard<SpinLock> l(info_strings_lock_);
out_node->info_strings = info_strings_;
out_node->info_strings_display_order = info_strings_display_order_;
}
ToThriftSubclass(counter_map_entries, out_node);
}
void RuntimeProfile::ToThriftSubclass(
vector<pair<const string&, const Counter*>>& counter_map_entries,
TRuntimeProfileNode* out_node) const {
out_node->counters.reserve(counter_map_entries.size());
for (const auto& entry : counter_map_entries) {
out_node->counters.emplace_back();
TCounter& counter = out_node->counters.back();
counter.name = entry.first;
counter.value = entry.second->value();
counter.unit = entry.second->unit();
}
{
lock_guard<SpinLock> l(counter_map_lock_);
if (time_series_counter_map_.size() != 0) {
out_node->__isset.time_series_counters = true;
out_node->time_series_counters.reserve(time_series_counter_map_.size());
for (const auto& val : time_series_counter_map_) {
out_node->time_series_counters.emplace_back();
val.second->ToThrift(&out_node->time_series_counters.back());
}
}
}
{
vector<EventSequence::Event> events;
lock_guard<SpinLock> l(event_sequence_lock_);
if (event_sequence_map_.size() != 0) {
out_node->__isset.event_sequences = true;
out_node->event_sequences.reserve(event_sequence_map_.size());
for (const auto& val : event_sequence_map_) {
out_node->event_sequences.emplace_back();
TEventSequence& seq = out_node->event_sequences.back();
seq.name = val.first;
val.second->GetEvents(&events);
seq.labels.reserve(events.size());
seq.timestamps.reserve(events.size());
for (const EventSequence::Event& ev: events) {
seq.labels.push_back(move(ev.first));
seq.timestamps.push_back(ev.second);
}
}
}
}
{
lock_guard<SpinLock> l(summary_stats_map_lock_);
if (summary_stats_map_.size() != 0) {
out_node->__isset.summary_stats_counters = true;
out_node->summary_stats_counters.resize(summary_stats_map_.size());
int idx = 0;
for (const SummaryStatsCounterMap::value_type& val: summary_stats_map_) {
val.second->ToThrift(&out_node->summary_stats_counters[idx++], val.first);
}
}
}
}
void RuntimeProfileBase::CollectNodes(bool indent, vector<CollectedNode>* nodes) const {
lock_guard<SpinLock> l(children_lock_);
nodes->emplace_back(this, indent, children_.size());
for (const auto& child : children_) {
child.first->CollectNodes(child.second, nodes);
}
}
void RuntimeProfile::ExecSummaryToThrift(TRuntimeProfileTree* tree) const {
GetExecSummary(&tree->exec_summary);
tree->__isset.exec_summary = true;
}
void RuntimeProfile::GetExecSummary(TExecSummary* t_exec_summary) const {
lock_guard<SpinLock> l(t_exec_summary_lock_);
*t_exec_summary = t_exec_summary_;
}
void RuntimeProfile::SetPlanNodeId(int node_id) {
DCHECK(!metadata_.__isset.data_sink_id) << "Don't set conflicting metadata";
metadata_.__set_plan_node_id(node_id);
}
void RuntimeProfile::SetDataSinkId(int sink_id) {
DCHECK(!metadata_.__isset.plan_node_id) << "Don't set conflicting metadata";
metadata_.__set_data_sink_id(sink_id);
}
int64_t RuntimeProfile::UnitsPerSecond(
const Counter* total_counter, const Counter* timer) {
DCHECK(total_counter->unit() == TUnit::BYTES || total_counter->unit() == TUnit::UNIT);
DCHECK(timer->unit() == TUnit::TIME_NS);
if (timer->value() == 0) return 0;
double secs = static_cast<double>(timer->value()) / 1000.0 / 1000.0 / 1000.0;
return total_counter->value() / secs;
}
int64_t RuntimeProfile::CounterSum(const vector<Counter*>* counters) {
int64_t value = 0;
for (int i = 0; i < counters->size(); ++i) {
value += (*counters)[i]->value();
}
return value;
}
RuntimeProfileBase::Counter* RuntimeProfile::AddRateCounter(
const string& name, Counter* src_counter) {
TUnit::type dst_unit;
switch (src_counter->unit()) {
case TUnit::BYTES:
dst_unit = TUnit::BYTES_PER_SECOND;
break;
case TUnit::UNIT:
dst_unit = TUnit::UNIT_PER_SECOND;
break;
default:
DCHECK(false) << "Unsupported src counter unit: " << src_counter->unit();
return NULL;
}
{
lock_guard<SpinLock> l(counter_map_lock_);
bool created;
Counter* dst_counter = AddCounterLocked(name, dst_unit, ROOT_COUNTER, &created);
if (!created) return dst_counter;
rate_counters_.push_back(dst_counter);
PeriodicCounterUpdater::RegisterPeriodicCounter(src_counter, NULL, dst_counter,
PeriodicCounterUpdater::RATE_COUNTER);
has_active_periodic_counters_ = true;
return dst_counter;
}
}
RuntimeProfileBase::Counter* RuntimeProfile::AddRateCounter(
const string& name, SampleFunction fn, TUnit::type dst_unit) {
lock_guard<SpinLock> l(counter_map_lock_);
bool created;
Counter* dst_counter = AddCounterLocked(name, dst_unit, ROOT_COUNTER, &created);
if (!created) return dst_counter;
rate_counters_.push_back(dst_counter);
PeriodicCounterUpdater::RegisterPeriodicCounter(NULL, fn, dst_counter,
PeriodicCounterUpdater::RATE_COUNTER);
has_active_periodic_counters_ = true;
return dst_counter;
}
RuntimeProfileBase::Counter* RuntimeProfile::AddSamplingCounter(
const string& name, Counter* src_counter) {
DCHECK(src_counter->unit() == TUnit::UNIT);
lock_guard<SpinLock> l(counter_map_lock_);
bool created;
Counter* dst_counter =
AddCounterLocked(name, TUnit::DOUBLE_VALUE, ROOT_COUNTER, &created);
if (!created) return dst_counter;
sampling_counters_.push_back(dst_counter);
PeriodicCounterUpdater::RegisterPeriodicCounter(src_counter, NULL, dst_counter,
PeriodicCounterUpdater::SAMPLING_COUNTER);
has_active_periodic_counters_ = true;
return dst_counter;
}
RuntimeProfileBase::Counter* RuntimeProfile::AddSamplingCounter(
const string& name, SampleFunction sample_fn) {
lock_guard<SpinLock> l(counter_map_lock_);
bool created;
Counter* dst_counter =
AddCounterLocked(name, TUnit::DOUBLE_VALUE, ROOT_COUNTER, &created);
if (!created) return dst_counter;
sampling_counters_.push_back(dst_counter);
PeriodicCounterUpdater::RegisterPeriodicCounter(NULL, sample_fn, dst_counter,
PeriodicCounterUpdater::SAMPLING_COUNTER);
has_active_periodic_counters_ = true;
return dst_counter;
}
vector<RuntimeProfileBase::Counter*>* RuntimeProfile::AddBucketingCounters(
Counter* src_counter, int num_buckets) {
lock_guard<SpinLock> l(counter_map_lock_);
vector<Counter*>* buckets = pool_->Add(new vector<Counter*>);
for (int i = 0; i < num_buckets; ++i) {
buckets->push_back(pool_->Add(new Counter(TUnit::DOUBLE_VALUE, 0)));
}
bucketing_counters_.insert(buckets);
has_active_periodic_counters_ = true;
PeriodicCounterUpdater::RegisterBucketingCounters(src_counter, buckets);
return buckets;
}
RuntimeProfile::EventSequence* RuntimeProfile::AddEventSequence(const string& name) {
lock_guard<SpinLock> l(event_sequence_lock_);
EventSequenceMap::iterator timer_it = event_sequence_map_.find(name);
if (timer_it != event_sequence_map_.end()) return timer_it->second;
EventSequence* timer = pool_->Add(new EventSequence());
event_sequence_map_[name] = timer;
return timer;
}
RuntimeProfile::EventSequence* RuntimeProfile::AddEventSequence(const string& name,
const TEventSequence& from) {
lock_guard<SpinLock> l(event_sequence_lock_);
EventSequenceMap::iterator timer_it = event_sequence_map_.find(name);
if (timer_it != event_sequence_map_.end()) return timer_it->second;
EventSequence* timer = pool_->Add(new EventSequence(from.timestamps, from.labels));
event_sequence_map_[name] = timer;
return timer;
}
void RuntimeProfileBase::PrintChildCounters(const string& prefix,
const string& counter_name, const CounterMap& counter_map,
const ChildCounterMap& child_counter_map, ostream* s) {
ostream& stream = *s;
ChildCounterMap::const_iterator itr = child_counter_map.find(counter_name);
if (itr != child_counter_map.end()) {
const set<string>& child_counters = itr->second;
for (const string& child_counter: child_counters) {
CounterMap::const_iterator iter = counter_map.find(child_counter);
if (iter == counter_map.end()) continue;
iter->second->PrettyPrint(prefix, iter->first, &stream);
RuntimeProfileBase::PrintChildCounters(
prefix + " ", child_counter, counter_map, child_counter_map, s);
}
}
}
RuntimeProfileBase::SummaryStatsCounter* RuntimeProfile::AddSummaryStatsCounter(
const string& name, TUnit::type unit, const std::string& parent_counter_name) {
lock_guard<SpinLock> l(summary_stats_map_lock_);
if (summary_stats_map_.find(name) != summary_stats_map_.end()) {
return summary_stats_map_[name];
}
SummaryStatsCounter* counter = pool_->Add(new SummaryStatsCounter(unit));
summary_stats_map_[name] = counter;
return counter;
}
RuntimeProfile::TimeSeriesCounter* RuntimeProfile::AddSamplingTimeSeriesCounter(
const string& name, TUnit::type unit, SampleFunction fn) {
DCHECK(fn != nullptr);
lock_guard<SpinLock> l(counter_map_lock_);
TimeSeriesCounterMap::iterator it = time_series_counter_map_.find(name);
if (it != time_series_counter_map_.end()) return it->second;
TimeSeriesCounter* counter = pool_->Add(new SamplingTimeSeriesCounter(name, unit, fn));
time_series_counter_map_[name] = counter;
PeriodicCounterUpdater::RegisterTimeSeriesCounter(counter);
has_active_periodic_counters_ = true;
return counter;
}
RuntimeProfile::TimeSeriesCounter* RuntimeProfile::AddSamplingTimeSeriesCounter(
const string& name, Counter* src_counter) {
DCHECK(src_counter != NULL);
return AddSamplingTimeSeriesCounter(name, src_counter->unit(),
bind(&Counter::value, src_counter));
}
void RuntimeProfile::TimeSeriesCounter::AddSample(int ms_elapsed) {
lock_guard<SpinLock> l(lock_);
int64_t sample = sample_fn_();
AddSampleLocked(sample, ms_elapsed);
}
const int64_t* RuntimeProfile::TimeSeriesCounter::GetSamplesLockedForSend(
int* num_samples, int* period) {
return GetSamplesLocked(num_samples, period);
}
void RuntimeProfile::TimeSeriesCounter::SetSamples(
int period, const std::vector<int64_t>& samples, int64_t start_idx) {
DCHECK(false);
}
void RuntimeProfile::SamplingTimeSeriesCounter::AddSampleLocked(
int64_t sample, int ms_elapsed){
samples_.AddSample(sample, ms_elapsed);
}
const int64_t* RuntimeProfile::SamplingTimeSeriesCounter::GetSamplesLocked(
int* num_samples, int* period) const {
return samples_.GetSamples(num_samples, period);
}
RuntimeProfile::ChunkedTimeSeriesCounter::ChunkedTimeSeriesCounter(
const string& name, TUnit::type unit, SampleFunction fn)
: TimeSeriesCounter(name, unit, fn)
, period_(FLAGS_periodic_counter_update_period_ms)
, max_size_(10 * FLAGS_status_report_interval_ms / period_) {}
void RuntimeProfile::ChunkedTimeSeriesCounter::Clear() {
lock_guard<SpinLock> l(lock_);
previous_sample_count_ += last_get_count_;
values_.erase(values_.begin(), values_.begin() + last_get_count_);
last_get_count_ = 0;
}
void RuntimeProfile::ChunkedTimeSeriesCounter::AddSampleLocked(
int64_t sample, int ms_elapsed) {
// We chose inefficiently erasing elements from a vector over using a std::deque because
// this should only happen very infrequently and we rely on contiguous storage in
// GetSamplesLocked*().
if (max_size_ > 0 && values_.size() == max_size_) {
KLOG_EVERY_N_SECS(WARNING, 60) << "ChunkedTimeSeriesCounter reached maximum size";
values_.erase(values_.begin(), values_.begin() + 1);
}
DCHECK_LT(values_.size(), max_size_);
values_.push_back(sample);
}
const int64_t* RuntimeProfile::ChunkedTimeSeriesCounter::GetSamplesLocked(
int* num_samples, int* period) const {
DCHECK(num_samples != nullptr);
DCHECK(period != nullptr);
*num_samples = values_.size();
*period = period_;
return values_.data();
}
const int64_t* RuntimeProfile::ChunkedTimeSeriesCounter::GetSamplesLockedForSend(
int* num_samples, int* period) {
last_get_count_ = values_.size();
return GetSamplesLocked(num_samples, period);
}
void RuntimeProfile::ChunkedTimeSeriesCounter::SetSamples(
int period, const std::vector<int64_t>& samples, int64_t start_idx) {
lock_guard<SpinLock> l(lock_);
if (start_idx == 0) {
// This could be coming from a SamplingTimeSeriesCounter or another
// ChunkedTimeSeriesCounter.
period_ = period;
values_ = samples;
return;
}
// Only ChunkedTimeSeriesCounter will set start_idx > 0.
DCHECK_GT(start_idx, 0);
DCHECK_EQ(period_, period);
if (values_.size() < start_idx) {
// Fill up with 0.
values_.resize(start_idx);
}
DCHECK_GE(values_.size(), start_idx);
// Skip values we already received.
auto start_it = samples.begin() + values_.size() - start_idx;
values_.insert(values_.end(), start_it, samples.end());
}
RuntimeProfile::TimeSeriesCounter* RuntimeProfile::AddChunkedTimeSeriesCounter(
const string& name, TUnit::type unit, SampleFunction fn) {
DCHECK(fn != nullptr);
lock_guard<SpinLock> l(counter_map_lock_);
TimeSeriesCounterMap::iterator it = time_series_counter_map_.find(name);
if (it != time_series_counter_map_.end()) return it->second;
TimeSeriesCounter* counter = pool_->Add(new ChunkedTimeSeriesCounter(name, unit, fn));
time_series_counter_map_[name] = counter;
PeriodicCounterUpdater::RegisterTimeSeriesCounter(counter);
has_active_periodic_counters_ = true;
return counter;
}
void RuntimeProfileBase::ClearChunkedTimeSeriesCounters() {
{
lock_guard<SpinLock> l(children_lock_);
for (int i = 0; i < children_.size(); ++i) {
children_[i].first->ClearChunkedTimeSeriesCounters();
}
}
}
void RuntimeProfile::ClearChunkedTimeSeriesCounters() {
{
lock_guard<SpinLock> l(counter_map_lock_);
for (auto& it : time_series_counter_map_) it.second->Clear();
}
RuntimeProfileBase::ClearChunkedTimeSeriesCounters();
}
void RuntimeProfile::TimeSeriesCounter::ToThrift(TTimeSeriesCounter* counter) {
lock_guard<SpinLock> l(lock_);
int num, period;
const int64_t* samples = GetSamplesLockedForSend(&num, &period);
counter->values.resize(num);
Ubsan::MemCpy(counter->values.data(), samples, num * sizeof(int64_t));
counter->name = name_;
counter->unit = unit_;
counter->period_ms = period;
counter->__set_start_index(previous_sample_count_);
}
void RuntimeProfile::EventSequence::ToThrift(TEventSequence* seq) {
lock_guard<SpinLock> l(lock_);
/// It's possible that concurrent events can be logged out of sequence so we sort the
/// events before serializing them.
SortEvents();
for (const EventSequence::Event& ev: events_) {
seq->labels.push_back(ev.first);
seq->timestamps.push_back(ev.second);
}
}
RuntimeProfileBase::AveragedCounter::AveragedCounter(TUnit::type unit, int num_samples)
: Counter(unit),
num_values_(num_samples),
has_value_(make_unique<AtomicBool[]>(num_samples)),
values_(make_unique<AtomicInt64[]>(num_samples)) {}
RuntimeProfileBase::AveragedCounter::AveragedCounter(TUnit::type unit,
const std::vector<bool>& has_value, const std::vector<int64_t>& values)
: Counter(unit),
num_values_(values.size()),
has_value_(make_unique<AtomicBool[]>(values.size())),
values_(make_unique<AtomicInt64[]>(values.size())) {
DCHECK_EQ(has_value.size(), values.size());
for (int i = 0; i < values.size(); ++i) {
if (has_value[i]) {
has_value_[i].Store(true);
values_[i].Store(values[i]);
}
}
}
void RuntimeProfileBase::AveragedCounter::UpdateCounter(Counter* new_counter, int idx) {
DCHECK_EQ(new_counter->unit(), unit_);
DCHECK_GE(idx, 0);
DCHECK_LT(idx, num_values_);
if (unit_ == TUnit::DOUBLE_VALUE) {
double new_val = new_counter->double_value();
values_[idx].Store(BitcastToInt64(new_val));
} else {
values_[idx].Store(new_counter->value());
}
// Set has_value_ after the value is valid above so that readers won't
// see invalid values.
has_value_[idx].Store(true);
}
int64_t RuntimeProfileBase::AveragedCounter::value() const {
return unit_ == TUnit::DOUBLE_VALUE ? ComputeMean<double>() : ComputeMean<int64_t>();
}
template <typename T>
int64_t RuntimeProfileBase::AveragedCounter::ComputeMean() const {
// Compute the mean in a single pass over the input. We could instead call GetStats(),
// but this would add some additional overhead when serializing thrift profiles, which
// include the mean counter values.
T sum = 0;
int num_vals = 0;
for (int i = 0; i < num_values_; ++i) {
if (has_value_[i].Load()) {
sum += BitcastFromInt64<T>(values_[i].Load());
++num_vals;
}
}
if (num_vals == 0) return 0; // Avoid divide-by-zero.
return BitcastToInt64(sum / num_vals);
}
template <typename T>
RuntimeProfileBase::AveragedCounter::Stats<T>
RuntimeProfileBase::AveragedCounter::GetStats() const {
static_assert(std::is_same<T, int64_t>::value || std::is_same<T, double>::value,
"Only double and int64_t are supported");
DCHECK_EQ(unit_ == TUnit::DOUBLE_VALUE, (std::is_same<T, double>::value));
vector<T> vals;
vals.reserve(num_values_);
for (int i = 0; i < num_values_; ++i) {
if (has_value_[i].Load()) vals.push_back(BitcastFromInt64<T>(values_[i].Load()));
}
Stats<T> result;
if (!vals.empty()) {
sort(vals.begin(), vals.end());
result.num_vals = vals.size();
result.mean = std::accumulate(vals.begin(), vals.end(), 0) / result.num_vals;
result.min = vals[0];
result.max = vals.back();
int end_idx = vals.size() - 1;
result.p50 = vals[end_idx / 2];
result.p75 = vals[end_idx * 3 / 4];
result.p90 = vals[end_idx * 9 / 10];
result.p95 = vals[end_idx * 19 / 20];
}
return result;
}
void RuntimeProfileBase::AveragedCounter::PrettyPrint(
const string& prefix, const string& name, ostream* s) const {
if (unit_ == TUnit::DOUBLE_VALUE) {
PrettyPrintImpl<double>(prefix, name, s);
} else {
PrettyPrintImpl<int64_t>(prefix, name, s);
}
}
template <typename T>
void RuntimeProfileBase::AveragedCounter::PrettyPrintImpl(
const string& prefix, const string& name, ostream* s) const {
Stats<T> stats = GetStats<T>();
(*s) << prefix << " - " << name << ": ";
if (!FLAGS_gen_experimental_profile || stats.num_vals == 1) {
(*s) << PrettyPrinter::Print(stats.mean, unit_, true) << endl;
return;
}
// For counters with <> 1 values, show summary stats and the values.
(*s) << "mean=" << PrettyPrinter::Print(BitcastToInt64(stats.mean), unit_, true)
<< " min=" << PrettyPrinter::Print(BitcastToInt64(stats.min), unit_, true)
<< " p50=" << PrettyPrinter::Print(BitcastToInt64(stats.p50), unit_, true)
<< " p75=" << PrettyPrinter::Print(BitcastToInt64(stats.p75), unit_, true)
<< " p90=" << PrettyPrinter::Print(BitcastToInt64(stats.p90), unit_, true)
<< " p95=" << PrettyPrinter::Print(BitcastToInt64(stats.p95), unit_, true)
<< " max=" << PrettyPrinter::Print(BitcastToInt64(stats.max), unit_, true) << endl;
// Dump out individual values if they are not all identical.
if (stats.min != stats.max) {
(*s) << prefix << " [";
for (int i = 0; i < num_values_; ++i) {
if (i != 0) {
if (i % 8 == 0) {
(*s) << ",\n" << prefix << " ";
} else {
(*s) << ", ";
}
}
if (has_value_[i].Load()) {
(*s) << PrettyPrinter::Print(values_[i].Load(), unit_, false);
} else {
(*s) << "_";
}
}
(*s) << "]\n";
}
}
void RuntimeProfileBase::AveragedCounter::ToThrift(
const string& name, TAggCounter* tcounter) const {
tcounter->name = name;
tcounter->unit = unit_;
tcounter->has_value.resize(num_values_);
tcounter->values.resize(num_values_);
for (int i = 0; i < num_values_; ++i) {
tcounter->has_value[i] = has_value_[i].Load();
tcounter->values[i] = values_[i].Load();
}
}
void RuntimeProfileBase::SummaryStatsCounter::ToThrift(
TSummaryStatsCounter* counter, const std::string& name) {
lock_guard<SpinLock> l(lock_);
counter->name = name;
counter->unit = unit_;
counter->sum = sum_;
counter->total_num_values = total_num_values_;
counter->min_value = min_;
counter->max_value = max_;
}
void RuntimeProfileBase::SummaryStatsCounter::ToThrift(const string& name,
TUnit::type unit, const vector<SummaryStatsCounter*>& counters,
TAggSummaryStatsCounter* tcounter) {
int num_vals = counters.size();
tcounter->name = name;
tcounter->unit = unit;
tcounter->has_value.resize(num_vals);
tcounter->sum.resize(num_vals);
tcounter->total_num_values.resize(num_vals);
tcounter->min_value.resize(num_vals);
tcounter->max_value.resize(num_vals);
for (int i = 0; i < num_vals; ++i) {
SummaryStatsCounter* counter = counters[i];
if (counter == nullptr) continue;
lock_guard<SpinLock> l(counter->lock_);
tcounter->has_value[i] = true;
tcounter->sum[i] = counter->sum_;
tcounter->min_value[i] = counter->min_;
tcounter->max_value[i] = counter->max_;
tcounter->total_num_values[i] = counter->total_num_values_;
}
}
void RuntimeProfileBase::SummaryStatsCounter::UpdateCounter(int64_t new_value) {
lock_guard<SpinLock> l(lock_);
++total_num_values_;
sum_ += new_value;
value_.Store(sum_ / total_num_values_);
if (new_value < min_) min_ = new_value;
if (new_value > max_) max_ = new_value;
}
void RuntimeProfileBase::SummaryStatsCounter::SetStats(
const TSummaryStatsCounter& counter) {
// We drop this input if it looks malformed.
if (counter.total_num_values < 0) return;
lock_guard<SpinLock> l(lock_);
unit_ = counter.unit;
sum_ = counter.sum;
total_num_values_ = counter.total_num_values;
min_ = counter.min_value;
max_ = counter.max_value;
value_.Store(total_num_values_ == 0 ? 0 : sum_ / total_num_values_);
}
void RuntimeProfileBase::SummaryStatsCounter::SetStats(const SummaryStatsCounter& other) {
lock_guard<SpinLock> ol(other.lock_);
lock_guard<SpinLock> l(lock_);
DCHECK_EQ(unit_, other.unit_);
total_num_values_ = other.total_num_values_;
min_ = other.min_;
max_ = other.max_;
sum_ = other.sum_;
value_.Store(total_num_values_ == 0 ? 0 : sum_ / total_num_values_);
}
void RuntimeProfileBase::SummaryStatsCounter::Merge(const SummaryStatsCounter& other) {
lock_guard<SpinLock> ol(other.lock_);
lock_guard<SpinLock> l(lock_);
total_num_values_ += other.total_num_values_;
min_ = min(min_, other.min_);
max_ = max(max_, other.max_);
sum_ += other.sum_;
value_.Store(total_num_values_ == 0 ? 0 : sum_ / total_num_values_);
}
int64_t RuntimeProfileBase::SummaryStatsCounter::MinValue() {
lock_guard<SpinLock> l(lock_);
return min_;
}
int64_t RuntimeProfileBase::SummaryStatsCounter::MaxValue() {
lock_guard<SpinLock> l(lock_);
return max_;
}
int32_t RuntimeProfileBase::SummaryStatsCounter::TotalNumValues() {
lock_guard<SpinLock> l(lock_);
return total_num_values_;
}
void RuntimeProfileBase::SummaryStatsCounter::PrettyPrint(
const string& prefix, const string& name, ostream* s) const {
ostream& stream = *s;
stream << prefix << " - " << name << ": ";
lock_guard<SpinLock> l(lock_);
if (total_num_values_ == 0) {
// No point printing all the stats if number of samples is zero.
stream << PrettyPrinter::Print(value_.Load(), unit_, true)
<< " (Number of samples: " << total_num_values_ << ")";
} else {
stream << "(Avg: " << PrettyPrinter::Print(value_.Load(), unit_, true)
<< " ; Min: " << PrettyPrinter::Print(min_, unit_, true)
<< " ; Max: " << PrettyPrinter::Print(max_, unit_, true)
<< " ; Number of samples: " << total_num_values_ << ")";
}
stream << endl;
}
void RuntimeProfileBase::Counter::PrettyPrint(
const string& prefix, const string& name, ostream* s) const {
(*s) << prefix << " - " << name << ": " << PrettyPrinter::Print(value(), unit_, true)
<< endl;
}
void RuntimeProfileBase::Counter::ToJson(Document& document, Value* val) const {
Value counter_json(kObjectType);
counter_json.AddMember("value", value(), document.GetAllocator());
auto unit_itr = _TUnit_VALUES_TO_NAMES.find(unit_);
DCHECK(unit_itr != _TUnit_VALUES_TO_NAMES.end());
Value unit_json(unit_itr->second, document.GetAllocator());
counter_json.AddMember("unit", unit_json, document.GetAllocator());
*val = counter_json;
}
void RuntimeProfile::TimeSeriesCounter::ToJson(Document& document, Value* val) {
lock_guard<SpinLock> lock(lock_);
Value counter_json(kObjectType);
Value counter_name_json(name_.c_str(), name_.size(), document.GetAllocator());
counter_json.AddMember("counter_name", counter_name_json, document.GetAllocator());
auto unit_itr = _TUnit_VALUES_TO_NAMES.find(unit_);
DCHECK(unit_itr != _TUnit_VALUES_TO_NAMES.end());
Value unit_json(unit_itr->second, document.GetAllocator());
counter_json.AddMember("unit", unit_json, document.GetAllocator());
int num, period;
const int64_t* samples = GetSamplesLocked(&num, &period);
counter_json.AddMember("num", num, document.GetAllocator());
counter_json.AddMember("period", period, document.GetAllocator());
stringstream stream;
// Clamp number of printed values at 64, the maximum number of values in the
// SamplingTimeSeriesCounter.
int step = 1 + (num - 1) / 64;
period *= step;
for (int i = 0; i < num; i += step) {
stream << samples[i];
if (i + step < num) stream << ",";
}
Value samples_data_json(stream.str().c_str(), document.GetAllocator());
counter_json.AddMember("data", samples_data_json, document.GetAllocator());
*val = counter_json;
}
void RuntimeProfile::EventSequence::ToJson(Document& document, Value* value) {
lock_guard<SpinLock> event_lock(lock_);
SortEvents();
Value event_sequence_json(kObjectType);
event_sequence_json.AddMember("offset", offset_, document.GetAllocator());
Value events_json(kArrayType);
for (const Event& ev: events_) {
Value event_json(kObjectType);
Value label_json(ev.first.c_str(), ev.first.size(), document.GetAllocator());
event_json.AddMember("label", label_json, document.GetAllocator());
event_json.AddMember("timestamp", ev.second, document.GetAllocator());
events_json.PushBack(event_json, document.GetAllocator());
}
event_sequence_json.AddMember("events", events_json, document.GetAllocator());
*value = event_sequence_json;
}
AggregatedRuntimeProfile::AggregatedRuntimeProfile(
ObjectPool* pool, const string& name, int num_input_profiles, bool is_root)
: RuntimeProfileBase(pool, name), num_input_profiles_(num_input_profiles) {
DCHECK_GE(num_input_profiles, 0);
if (is_root) input_profile_names_.resize(num_input_profiles);
set<string>& root_counters = child_counter_map_[ROOT_COUNTER];
Counter* total_time_counter =
pool->Add(new AveragedCounter(TUnit::TIME_NS, num_input_profiles));
Counter* inactive_timer =
pool->Add(new AveragedCounter(TUnit::TIME_NS, num_input_profiles));
counter_map_[TOTAL_TIME_COUNTER_NAME] = total_time_counter;
root_counters.emplace(TOTAL_TIME_COUNTER_NAME);
counter_map_[INACTIVE_TIME_COUNTER_NAME] = inactive_timer;
root_counters.emplace(INACTIVE_TIME_COUNTER_NAME);
}
AggregatedRuntimeProfile* AggregatedRuntimeProfile::Create(
ObjectPool* pool, const string& name, int num_input_profiles, bool is_root) {
return pool->Add(new AggregatedRuntimeProfile(pool, name, num_input_profiles, is_root));
}
void AggregatedRuntimeProfile::InitFromThrift(
const TRuntimeProfileNode& node, ObjectPool* pool) {
DCHECK(node.__isset.aggregated);
DCHECK(node.aggregated.__isset.counters);
if (node.aggregated.__isset.input_profiles) {
input_profile_names_ = node.aggregated.input_profiles;
}
for (const TAggCounter& counter : node.aggregated.counters) {
counter_map_[counter.name] =
pool->Add(new AveragedCounter(counter.unit, counter.has_value, counter.values));
}
DCHECK(node.aggregated.__isset.info_strings);
for (const auto& entry : node.aggregated.info_strings) {
vector<string>& per_instance_vals = agg_info_strings_[entry.first];
if (per_instance_vals.empty()) {
per_instance_vals.resize(num_input_profiles_);
}
for (const auto& distinct_val_entry : entry.second) {
for (int32_t idx : distinct_val_entry.second) {
per_instance_vals[idx] = distinct_val_entry.first;
}
}
}
if (node.aggregated.__isset.summary_stats_counters) {
for (const TAggSummaryStatsCounter& val : node.aggregated.summary_stats_counters) {
DCHECK_GT(num_input_profiles_, 0);
DCHECK_EQ(val.sum.size(), num_input_profiles_);
DCHECK_EQ(val.total_num_values.size(), num_input_profiles_);
DCHECK_EQ(val.min_value.size(), num_input_profiles_);
DCHECK_EQ(val.max_value.size(), num_input_profiles_);
auto& entry = summary_stats_map_[val.name];
entry.first = val.unit;
vector<SummaryStatsCounter*>& instance_counters = entry.second;
instance_counters.resize(num_input_profiles_);
for (int idx = 0; idx < num_input_profiles_; ++idx) {
if (!val.has_value[idx]) continue;
instance_counters[idx] =
pool->Add(new SummaryStatsCounter(val.unit, val.total_num_values[idx],
val.min_value[idx], val.max_value[idx], val.sum[idx]));
}
}
}
}
// Print a sorted vector of indices in compressed form with subsequent indices
// printed as ranges. E.g. [1, 2, 3, 4, 6] would result in "1-4,6".
static void PrettyPrintIndexRanges(ostream* s, const vector<int32_t>& indices) {
if (indices.empty()) return;
ostream& stream = *s;
int32_t start_idx = indices[0];
int32_t prev_idx = indices[0];
for (int i = 0; i < indices.size(); ++i) {
int32_t idx = indices[i];
if (idx > prev_idx + 1) {
// Start of a new range. Print the previous range of values.
stream << start_idx;
if (start_idx < prev_idx) stream << "-" << prev_idx;
stream << ",";
start_idx = idx;
}
prev_idx = idx;
}
// Print the last range of values.
stream << start_idx;
if (start_idx < prev_idx) stream << "-" << prev_idx;
}
void AggregatedRuntimeProfile::PrettyPrintInfoStrings(
ostream* s, const string& prefix) const {
// Aggregated info strings are only shown in experimental profile.
if (!FLAGS_gen_experimental_profile) return;
ostream& stream = *s;
{
lock_guard<SpinLock> l(input_profile_name_lock_);
if (!input_profile_names_.empty()) {
// TODO: IMPALA-9382: improve pretty-printing here
stream << prefix
<< "Instances: " << boost::algorithm::join(input_profile_names_, ", ")
<< endl;
}
}
{
lock_guard<SpinLock> l(agg_info_strings_lock_);
for (const auto& entry : agg_info_strings_) {
map<string, vector<int32_t>> distinct_vals = GroupDistinctInfoStrings(entry.second);
for (const auto& distinct_entry : distinct_vals) {
stream << prefix << " " << entry.first;
stream << "[";
PrettyPrintIndexRanges(s, distinct_entry.second);
stream << "]: " << distinct_entry.first << endl;
}
}
}
}
void AggregatedRuntimeProfile::PrettyPrintSubclassCounters(
ostream* s, const string& prefix) const {
// Hide aggregated state when we are not using the transposed profile
// format.
if (!FLAGS_gen_experimental_profile) return;
{
lock_guard<SpinLock> l(summary_stats_map_lock_);
for (const auto& v : summary_stats_map_) {
// Display fully aggregated stats first.
SummaryStatsCounter aggregated_stats(v.second.first);
AggregateSummaryStats(v.second.second, &aggregated_stats);
aggregated_stats.PrettyPrint(prefix, v.first, s);
// Display per-instance stats, if there is more than one instance.
if (v.second.second.size() > 1) {
for (int idx = 0; idx < v.second.second.size(); ++idx) {
if (v.second.second[idx] == nullptr) continue;
const string& per_instance_prefix = Substitute("$0[$1]", prefix, idx);
aggregated_stats.PrettyPrint(per_instance_prefix, v.first, s);
}
}
}
}
}
void AggregatedRuntimeProfile::ToThriftSubclass(
vector<pair<const string&, const Counter*>>& counter_map_entries,
TRuntimeProfileNode* out_node) const {
out_node->__isset.aggregated = true;
out_node->aggregated.__set_num_instances(num_input_profiles_);
if (!input_profile_names_.empty()) {
out_node->aggregated.__isset.input_profiles = true;
out_node->aggregated.input_profiles = input_profile_names_;
}
// Populate both the aggregated counters, which contain the full information from
// the counters, and the regular counters, which can be understood by older
// readers.
out_node->counters.reserve(counter_map_entries.size());
out_node->aggregated.__isset.counters = true;
out_node->aggregated.counters.reserve(counter_map_entries.size());
for (const auto& entry : counter_map_entries) {
out_node->counters.emplace_back();
TCounter& counter = out_node->counters.back();
counter.name = entry.first;
counter.value = entry.second->value();
counter.unit = entry.second->unit();
const AveragedCounter* avg_counter =
dynamic_cast<const AveragedCounter*>(entry.second);
DCHECK(avg_counter != nullptr) << entry.first;
out_node->aggregated.counters.emplace_back();
avg_counter->ToThrift(entry.first, &out_node->aggregated.counters.back());
}
out_node->aggregated.__isset.info_strings = true;
{
lock_guard<SpinLock> l(agg_info_strings_lock_);
for (const auto& entry : agg_info_strings_) {
out_node->aggregated.info_strings[entry.first] =
GroupDistinctInfoStrings(entry.second);
}
}
out_node->__isset.summary_stats_counters = true;
{
lock_guard<SpinLock> l(summary_stats_map_lock_);
out_node->summary_stats_counters.resize(summary_stats_map_.size());
out_node->aggregated.__isset.summary_stats_counters = true;
out_node->aggregated.summary_stats_counters.resize(summary_stats_map_.size());
int counter_idx = 0;
for (const auto& entry : summary_stats_map_) {
DCHECK_GT(entry.second.second.size(), 0)
<< "no counters can be created w/o instances";
DCHECK_EQ(num_input_profiles_, entry.second.second.size());
SummaryStatsCounter::ToThrift(entry.first, entry.second.first, entry.second.second,
&out_node->aggregated.summary_stats_counters[counter_idx]);
// Compute summarized stats to include at the top level.
SummaryStatsCounter aggregated_stats(entry.second.first);
AggregateSummaryStats(entry.second.second, &aggregated_stats);
aggregated_stats.ToThrift(
&out_node->summary_stats_counters[counter_idx++], entry.first);
}
}
}
void AggregatedRuntimeProfile::ToJsonSubclass(Value* parent, Document* d) const {
// TODO: IMPALA-9382: add Json aggregated profile.
}
map<string, vector<int32_t>> AggregatedRuntimeProfile::GroupDistinctInfoStrings(
const vector<string>& info_string_values) const {
map<string, vector<int32_t>> distinct_vals;
DCHECK_EQ(info_string_values.size(), num_input_profiles_);
for (int idx = 0; idx < num_input_profiles_; ++idx) {
if (!info_string_values[idx].empty()) {
distinct_vals[info_string_values[idx]].push_back(idx);
}
}
return distinct_vals;
}
void AggregatedRuntimeProfile::AggregateSummaryStats(
const vector<SummaryStatsCounter*> counters, SummaryStatsCounter* result) {
for (SummaryStatsCounter* counter : counters) {
if (counter != nullptr) result->Merge(*counter);
}
}
}