| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "util/runtime-profile-counters.h" |
| |
| #include <algorithm> |
| #include <iomanip> |
| #include <iostream> |
| #include <utility> |
| |
| #include <boost/bind.hpp> |
| #include <boost/thread/locks.hpp> |
| #include <boost/thread/thread.hpp> |
| |
| #include "common/object-pool.h" |
| #include "gutil/strings/strip.h" |
| #include "kudu/util/logging.h" |
| #include "rpc/thrift-util.h" |
| #include "runtime/mem-tracker.h" |
| #include "util/coding-util.h" |
| #include "util/compress.h" |
| #include "util/container-util.h" |
| #include "util/debug-util.h" |
| #include "util/periodic-counter-updater.h" |
| #include "util/pretty-printer.h" |
| #include "util/redactor.h" |
| #include "util/scope-exit-trigger.h" |
| #include "util/ubsan.h" |
| |
| #include "common/names.h" |
| |
| DECLARE_int32(status_report_interval_ms); |
| DECLARE_int32(periodic_counter_update_period_ms); |
| |
| using namespace rapidjson; |
| |
| namespace impala { |
| |
| // Thread counters name |
| static const string THREAD_TOTAL_TIME = "TotalWallClockTime"; |
| static const string THREAD_USER_TIME = "UserTime"; |
| static const string THREAD_SYS_TIME = "SysTime"; |
| static const string THREAD_VOLUNTARY_CONTEXT_SWITCHES = "VoluntaryContextSwitches"; |
| static const string THREAD_INVOLUNTARY_CONTEXT_SWITCHES = "InvoluntaryContextSwitches"; |
| |
| // The root counter name for all top level counters. |
| static const string ROOT_COUNTER = ""; |
| |
| const string RuntimeProfile::TOTAL_TIME_COUNTER_NAME = "TotalTime"; |
| const string RuntimeProfile::LOCAL_TIME_COUNTER_NAME = "LocalTime"; |
| const string RuntimeProfile::INACTIVE_TIME_COUNTER_NAME = "InactiveTotalTime"; |
| |
| RuntimeProfile* RuntimeProfile::Create(ObjectPool* pool, const string& name, |
| bool is_averaged_profile) { |
| return pool->Add(new RuntimeProfile(pool, name, is_averaged_profile)); |
| } |
| |
| RuntimeProfile::RuntimeProfile( |
| ObjectPool* pool, const string& name, bool is_averaged_profile) |
| : pool_(pool), |
| name_(name), |
| is_averaged_profile_(is_averaged_profile), |
| counter_total_time_(TUnit::TIME_NS), |
| inactive_timer_(TUnit::TIME_NS), |
| local_time_percent_(0), |
| local_time_ns_(0) { |
| Counter* total_time_counter; |
| Counter* inactive_timer; |
| if (!is_averaged_profile) { |
| total_time_counter = &counter_total_time_; |
| inactive_timer = &inactive_timer_; |
| } else { |
| total_time_counter = pool->Add(new AveragedCounter(TUnit::TIME_NS)); |
| inactive_timer = pool->Add(new AveragedCounter(TUnit::TIME_NS)); |
| } |
| counter_map_[TOTAL_TIME_COUNTER_NAME] = total_time_counter; |
| counter_map_[INACTIVE_TIME_COUNTER_NAME] = inactive_timer; |
| } |
| |
| RuntimeProfile::~RuntimeProfile() { |
| DCHECK(!has_active_periodic_counters_); |
| } |
| |
| void RuntimeProfile::StopPeriodicCounters() { |
| lock_guard<SpinLock> l(counter_map_lock_); |
| if (!has_active_periodic_counters_) return; |
| for (Counter* sampling_counter : sampling_counters_) { |
| PeriodicCounterUpdater::StopSamplingCounter(sampling_counter); |
| } |
| for (Counter* rate_counter : rate_counters_) { |
| PeriodicCounterUpdater::StopRateCounter(rate_counter); |
| } |
| for (vector<Counter*>* counters : bucketing_counters_) { |
| PeriodicCounterUpdater::StopBucketingCounters(counters); |
| } |
| for (auto& time_series_counter_entry : time_series_counter_map_) { |
| PeriodicCounterUpdater::StopTimeSeriesCounter(time_series_counter_entry.second); |
| } |
| has_active_periodic_counters_ = false; |
| } |
| |
| RuntimeProfile* RuntimeProfile::CreateFromThrift(ObjectPool* pool, |
| const TRuntimeProfileTree& profiles) { |
| if (profiles.nodes.size() == 0) return NULL; |
| int idx = 0; |
| RuntimeProfile* profile = RuntimeProfile::CreateFromThrift(pool, profiles.nodes, &idx); |
| profile->SetTExecSummary(profiles.exec_summary); |
| return profile; |
| } |
| |
| RuntimeProfile* RuntimeProfile::CreateFromThrift(ObjectPool* pool, |
| const vector<TRuntimeProfileNode>& nodes, int* idx) { |
| DCHECK_LT(*idx, nodes.size()); |
| |
| const TRuntimeProfileNode& node = nodes[*idx]; |
| RuntimeProfile* profile = Create(pool, node.name); |
| profile->metadata_ = node.node_metadata; |
| for (int i = 0; i < node.counters.size(); ++i) { |
| const TCounter& counter = node.counters[i]; |
| profile->counter_map_[counter.name] = |
| pool->Add(new Counter(counter.unit, counter.value)); |
| } |
| |
| if (node.__isset.event_sequences) { |
| for (const TEventSequence& sequence: node.event_sequences) { |
| profile->event_sequence_map_[sequence.name] = |
| pool->Add(new EventSequence(sequence.timestamps, sequence.labels)); |
| } |
| } |
| |
| if (node.__isset.time_series_counters) { |
| for (const TTimeSeriesCounter& val: node.time_series_counters) { |
| // Capture all incoming time series counters with the same type since re-sampling |
| // will have happened on the sender side. |
| profile->time_series_counter_map_[val.name] = pool->Add( |
| new ChunkedTimeSeriesCounter(val.name, val.unit, val.period_ms, val.values)); |
| } |
| } |
| |
| if (node.__isset.summary_stats_counters) { |
| for (const TSummaryStatsCounter& val: node.summary_stats_counters) { |
| profile->summary_stats_map_[val.name] = |
| pool->Add(new SummaryStatsCounter( |
| val.unit, val.total_num_values, val.min_value, val.max_value, val.sum)); |
| } |
| } |
| |
| profile->child_counter_map_ = node.child_counters_map; |
| profile->info_strings_ = node.info_strings; |
| profile->info_strings_display_order_ = node.info_strings_display_order; |
| |
| ++*idx; |
| for (int i = 0; i < node.num_children; ++i) { |
| profile->AddChild(RuntimeProfile::CreateFromThrift(pool, nodes, idx)); |
| } |
| return profile; |
| } |
| |
| void RuntimeProfile::UpdateAverage(RuntimeProfile* other) { |
| DCHECK(other != NULL); |
| DCHECK(is_averaged_profile_); |
| |
| // Merge this level |
| { |
| CounterMap::iterator dst_iter; |
| CounterMap::const_iterator src_iter; |
| lock_guard<SpinLock> l(counter_map_lock_); |
| lock_guard<SpinLock> m(other->counter_map_lock_); |
| for (src_iter = other->counter_map_.begin(); |
| src_iter != other->counter_map_.end(); ++src_iter) { |
| |
| // Ignore this counter for averages. |
| if (src_iter->first == INACTIVE_TIME_COUNTER_NAME) continue; |
| |
| dst_iter = counter_map_.find(src_iter->first); |
| AveragedCounter* avg_counter; |
| |
| // Get the counter with the same name in dst_iter (this->counter_map_) |
| // Create one if it doesn't exist. |
| if (dst_iter == counter_map_.end()) { |
| avg_counter = pool_->Add(new AveragedCounter(src_iter->second->unit())); |
| counter_map_[src_iter->first] = avg_counter; |
| } else { |
| DCHECK(dst_iter->second->unit() == src_iter->second->unit()); |
| avg_counter = static_cast<AveragedCounter*>(dst_iter->second); |
| } |
| avg_counter->UpdateCounter(src_iter->second); |
| } |
| |
| // TODO: Can we unlock the counter_map_lock_ here? |
| ChildCounterMap::const_iterator child_counter_src_itr; |
| for (child_counter_src_itr = other->child_counter_map_.begin(); |
| child_counter_src_itr != other->child_counter_map_.end(); |
| ++child_counter_src_itr) { |
| set<string>* child_counters = FindOrInsert(&child_counter_map_, |
| child_counter_src_itr->first, set<string>()); |
| child_counters->insert(child_counter_src_itr->second.begin(), |
| child_counter_src_itr->second.end()); |
| } |
| } |
| |
| { |
| lock_guard<SpinLock> l(children_lock_); |
| lock_guard<SpinLock> m(other->children_lock_); |
| // Recursively merge children with matching names. |
| // Track the current position in the vector so we preserve the order of children |
| // if children are added after the first Update()/UpdateAverage() call (IMPALA-6694). |
| // E.g. if the first update sends [B, D] and the second update sends [A, B, C, D], |
| // then this code makes sure that children_ is [A, B, C, D] afterwards. |
| ChildVector::iterator insert_pos = children_.begin(); |
| for (int i = 0; i < other->children_.size(); ++i) { |
| RuntimeProfile* other_child = other->children_[i].first; |
| ChildMap::iterator j = child_map_.find(other_child->name_); |
| RuntimeProfile* child = NULL; |
| if (j != child_map_.end()) { |
| child = j->second; |
| // Search forward until the insert position is either at the end of the vector |
| // or after this child. This preserves the order if the relative order of |
| // children in all updates is consistent. |
| bool found_child = false; |
| while (insert_pos != children_.end() && !found_child) { |
| found_child = insert_pos->first == child; |
| ++insert_pos; |
| } |
| } else { |
| child = Create(pool_, other_child->name_, true); |
| child->metadata_ = other_child->metadata_; |
| bool indent_other_child = other->children_[i].second; |
| child_map_[child->name_] = child; |
| insert_pos = children_.insert(insert_pos, make_pair(child, indent_other_child)); |
| ++insert_pos; |
| } |
| child->UpdateAverage(other_child); |
| } |
| } |
| |
| ComputeTimeInProfile(); |
| } |
| |
| void RuntimeProfile::Update(const TRuntimeProfileTree& thrift_profile) { |
| int idx = 0; |
| Update(thrift_profile.nodes, &idx); |
| DCHECK_EQ(idx, thrift_profile.nodes.size()); |
| } |
| |
| void RuntimeProfile::Update(const vector<TRuntimeProfileNode>& nodes, int* idx) { |
| if (UNLIKELY(nodes.size()) == 0) return; |
| DCHECK_LT(*idx, nodes.size()); |
| const TRuntimeProfileNode& node = nodes[*idx]; |
| { |
| // Update this level. |
| map<string, Counter*>::iterator dst_iter; |
| lock_guard<SpinLock> l(counter_map_lock_); |
| for (int i = 0; i < node.counters.size(); ++i) { |
| const TCounter& tcounter = node.counters[i]; |
| CounterMap::iterator j = counter_map_.find(tcounter.name); |
| if (j == counter_map_.end()) { |
| counter_map_[tcounter.name] = |
| pool_->Add(new Counter(tcounter.unit, tcounter.value)); |
| } else { |
| if (j->second->unit() != tcounter.unit) { |
| LOG(ERROR) << "Cannot update counters with the same name (" |
| << j->first << ") but different units."; |
| } else { |
| j->second->Set(tcounter.value); |
| } |
| } |
| } |
| |
| ChildCounterMap::const_iterator child_counter_src_itr; |
| for (child_counter_src_itr = node.child_counters_map.begin(); |
| child_counter_src_itr != node.child_counters_map.end(); |
| ++child_counter_src_itr) { |
| set<string>* child_counters = FindOrInsert(&child_counter_map_, |
| child_counter_src_itr->first, set<string>()); |
| child_counters->insert(child_counter_src_itr->second.begin(), |
| child_counter_src_itr->second.end()); |
| } |
| } |
| |
| { |
| const InfoStrings& info_strings = node.info_strings; |
| lock_guard<SpinLock> l(info_strings_lock_); |
| for (const string& key: node.info_strings_display_order) { |
| // Look for existing info strings and update in place. If there |
| // are new strings, add them to the end of the display order. |
| // TODO: Is nodes.info_strings always a superset of |
| // info_strings_? If so, can just copy the display order. |
| InfoStrings::const_iterator it = info_strings.find(key); |
| DCHECK(it != info_strings.end()); |
| InfoStrings::iterator existing = info_strings_.find(key); |
| if (existing == info_strings_.end()) { |
| info_strings_.emplace(key, it->second); |
| info_strings_display_order_.push_back(key); |
| } else { |
| info_strings_[key] = it->second; |
| } |
| } |
| } |
| |
| { |
| lock_guard<SpinLock> l(counter_map_lock_); |
| for (int i = 0; i < node.time_series_counters.size(); ++i) { |
| const TTimeSeriesCounter& c = node.time_series_counters[i]; |
| TimeSeriesCounterMap::iterator it = time_series_counter_map_.find(c.name); |
| if (it == time_series_counter_map_.end()) { |
| // Capture all incoming time series counters with the same type since re-sampling |
| // will have happened on the sender side. |
| time_series_counter_map_[c.name] = pool_->Add( |
| new ChunkedTimeSeriesCounter(c.name, c.unit, c.period_ms, c.values)); |
| } else { |
| int64_t start_idx = c.__isset.start_index ? c.start_index : 0; |
| it->second->SetSamples(c.period_ms, c.values, start_idx); |
| } |
| } |
| } |
| |
| { |
| lock_guard<SpinLock> l(event_sequence_lock_); |
| for (int i = 0; i < node.event_sequences.size(); ++i) { |
| const TEventSequence& seq = node.event_sequences[i]; |
| EventSequenceMap::iterator it = event_sequence_map_.find(seq.name); |
| if (it == event_sequence_map_.end()) { |
| event_sequence_map_[seq.name] = |
| pool_->Add(new EventSequence(seq.timestamps, seq.labels)); |
| } else { |
| it->second->AddNewerEvents(seq.timestamps, seq.labels); |
| } |
| } |
| } |
| |
| { |
| lock_guard<SpinLock> l(summary_stats_map_lock_); |
| for (int i = 0; i < node.summary_stats_counters.size(); ++i) { |
| const TSummaryStatsCounter& c = node.summary_stats_counters[i]; |
| SummaryStatsCounterMap::iterator it = summary_stats_map_.find(c.name); |
| if (it == summary_stats_map_.end()) { |
| summary_stats_map_[c.name] = |
| pool_->Add(new SummaryStatsCounter( |
| c.unit, c.total_num_values, c.min_value, c.max_value, c.sum)); |
| } else { |
| it->second->SetStats(c); |
| } |
| } |
| } |
| |
| ++*idx; |
| { |
| lock_guard<SpinLock> l(children_lock_); |
| // Track the current position in the vector so we preserve the order of children |
| // if children are added after the first Update()/UpdateAverage() call (IMPALA-6694). |
| // E.g. if the first update sends [B, D] and the second update sends [A, B, C, D], |
| // then this code makes sure that children_ is [A, B, C, D] afterwards. |
| ChildVector::iterator insert_pos = children_.begin(); |
| // Update children with matching names; create new ones if they don't match. |
| for (int i = 0; i < node.num_children; ++i) { |
| const TRuntimeProfileNode& tchild = nodes[*idx]; |
| ChildMap::iterator j = child_map_.find(tchild.name); |
| RuntimeProfile* child = NULL; |
| if (j != child_map_.end()) { |
| child = j->second; |
| // Search forward until the insert position is either at the end of the vector |
| // or after this child. This preserves the order if the relative order of |
| // children in all updates is consistent. |
| bool found_child = false; |
| while (insert_pos != children_.end() && !found_child) { |
| found_child = insert_pos->first == child; |
| ++insert_pos; |
| } |
| } else { |
| child = Create(pool_, tchild.name); |
| child->metadata_ = tchild.node_metadata; |
| child_map_[tchild.name] = child; |
| insert_pos = children_.insert(insert_pos, make_pair(child, tchild.indent)); |
| ++insert_pos; |
| } |
| child->Update(nodes, idx); |
| } |
| } |
| } |
| |
| void RuntimeProfile::Divide(int n) { |
| DCHECK_GT(n, 0); |
| map<string, Counter*>::iterator iter; |
| { |
| lock_guard<SpinLock> l(counter_map_lock_); |
| for (iter = counter_map_.begin(); iter != counter_map_.end(); ++iter) { |
| if (iter->second->unit() == TUnit::DOUBLE_VALUE) { |
| iter->second->Set(iter->second->double_value() / n); |
| } else { |
| iter->second->value_.Store(iter->second->value() / n); |
| } |
| } |
| } |
| { |
| lock_guard<SpinLock> l(children_lock_); |
| for (ChildMap::iterator i = child_map_.begin(); i != child_map_.end(); ++i) { |
| i->second->Divide(n); |
| } |
| } |
| } |
| |
| void RuntimeProfile::ComputeTimeInProfile() { |
| ComputeTimeInProfile(total_time_counter()->value()); |
| } |
| |
| void RuntimeProfile::ComputeTimeInProfile(int64_t total) { |
| // Recurse on children. After this, childrens' total time is up to date. |
| { |
| lock_guard<SpinLock> l(children_lock_); |
| for (int i = 0; i < children_.size(); ++i) { |
| children_[i].first->ComputeTimeInProfile(); |
| } |
| } |
| |
| // Get total time from children |
| int64_t children_total_time = 0; |
| { |
| lock_guard<SpinLock> l(children_lock_); |
| for (int i = 0; i < children_.size(); ++i) { |
| children_total_time += children_[i].first->total_time(); |
| } |
| } |
| // IMPALA-5200: Take the max, because the parent includes all of the time from the |
| // children, whether or not its total time counter has been updated recently enough |
| // to see this. |
| total_time_ns_ = max(children_total_time, total_time_counter()->value()); |
| |
| // If a local time counter exists, use its value as local time. Otherwise, derive the |
| // local time from total time and the child time. |
| bool has_local_time_counter = false; |
| { |
| lock_guard<SpinLock> l(counter_map_lock_); |
| CounterMap::const_iterator itr = counter_map_.find(LOCAL_TIME_COUNTER_NAME); |
| if (itr != counter_map_.end()) { |
| local_time_ns_ = itr->second->value(); |
| has_local_time_counter = true; |
| } |
| } |
| |
| if (!has_local_time_counter) { |
| local_time_ns_ = total_time_ns_ - children_total_time; |
| if (!is_averaged_profile_) { |
| local_time_ns_ -= inactive_timer()->value(); |
| } |
| } |
| // Counters have some margin, set to 0 if it was negative. |
| local_time_ns_ = ::max<int64_t>(0, local_time_ns_); |
| local_time_percent_ = |
| static_cast<double>(local_time_ns_) / total_time_ns_; |
| local_time_percent_ = ::min(1.0, local_time_percent_) * 100; |
| } |
| |
| void RuntimeProfile::AddChild(RuntimeProfile* child, bool indent, RuntimeProfile* loc) { |
| lock_guard<SpinLock> l(children_lock_); |
| ChildVector::iterator insert_pos; |
| if (loc == NULL) { |
| insert_pos = children_.end(); |
| } else { |
| bool found = false; |
| for (ChildVector::iterator it = children_.begin(); it != children_.end(); ++it) { |
| if (it->first == loc) { |
| insert_pos = it + 1; |
| found = true; |
| break; |
| } |
| } |
| DCHECK(found) << "Invalid loc"; |
| } |
| AddChildLocked(child, indent, insert_pos); |
| } |
| |
| void RuntimeProfile::AddChildLocked( |
| RuntimeProfile* child, bool indent, ChildVector::iterator insert_pos) { |
| children_lock_.DCheckLocked(); |
| DCHECK(child != NULL); |
| if (child_map_.count(child->name_) > 0) { |
| // This child has already been added, so do nothing. |
| // Otherwise, the map and vector will be out of sync. |
| return; |
| } |
| child_map_[child->name_] = child; |
| children_.insert(insert_pos, make_pair(child, indent)); |
| } |
| |
| void RuntimeProfile::PrependChild(RuntimeProfile* child, bool indent) { |
| lock_guard<SpinLock> l(children_lock_); |
| AddChildLocked(child, indent, children_.begin()); |
| } |
| |
| RuntimeProfile* RuntimeProfile::CreateChild(const string& name, bool indent, |
| bool prepend) { |
| lock_guard<SpinLock> l(children_lock_); |
| DCHECK(child_map_.find(name) == child_map_.end()); |
| RuntimeProfile* child = Create(pool_, name); |
| AddChildLocked(child, indent, prepend ? children_.begin() : children_.end()); |
| return child; |
| } |
| |
| void RuntimeProfile::GetChildren(vector<RuntimeProfile*>* children) { |
| children->clear(); |
| lock_guard<SpinLock> l(children_lock_); |
| for (const auto& entry : children_) children->push_back(entry.first); |
| } |
| |
| void RuntimeProfile::GetAllChildren(vector<RuntimeProfile*>* children) { |
| lock_guard<SpinLock> l(children_lock_); |
| for (ChildMap::iterator i = child_map_.begin(); i != child_map_.end(); ++i) { |
| children->push_back(i->second); |
| i->second->GetAllChildren(children); |
| } |
| } |
| |
| void RuntimeProfile::SortChildrenByTotalTime() { |
| lock_guard<SpinLock> l(children_lock_); |
| // Create a snapshot of total time values so that they don't change while we're |
| // sorting. Sort the <total_time, index> pairs, then reshuffle children_. |
| vector<pair<int64_t, int64_t>> total_times; |
| for (int i = 0; i < children_.size(); ++i) { |
| total_times.emplace_back(children_[i].first->total_time_counter()->value(), i); |
| } |
| // Order by descending total time. |
| sort(total_times.begin(), total_times.end(), |
| [](const pair<int64_t, int64_t>& p1, const pair<int64_t, int64_t>& p2) { |
| return p1.first > p2.first; |
| }); |
| ChildVector new_children; |
| for (const auto& p : total_times) new_children.emplace_back(children_[p.second]); |
| children_ = move(new_children); |
| } |
| |
| void RuntimeProfile::AddInfoString(const string& key, const string& value) { |
| return AddInfoStringInternal(key, value, false); |
| } |
| |
| void RuntimeProfile::AddInfoStringRedacted(const string& key, const string& value) { |
| return AddInfoStringInternal(key, value, false, true); |
| } |
| |
| void RuntimeProfile::AppendInfoString(const string& key, const string& value) { |
| return AddInfoStringInternal(key, value, true); |
| } |
| |
| void RuntimeProfile::AddInfoStringInternal(const string& key, string value, |
| bool append, bool redact) { |
| |
| if (redact) Redact(&value); |
| |
| StripTrailingWhitespace(&value); |
| |
| lock_guard<SpinLock> l(info_strings_lock_); |
| InfoStrings::iterator it = info_strings_.find(key); |
| if (it == info_strings_.end()) { |
| info_strings_.emplace(key, std::move(value)); |
| info_strings_display_order_.push_back(key); |
| } else { |
| if (append) { |
| it->second += ", " + std::move(value); |
| } else { |
| it->second = std::move(value); |
| } |
| } |
| } |
| |
| void RuntimeProfile::UpdateInfoString(const string& key, string value) { |
| lock_guard<SpinLock> l(info_strings_lock_); |
| InfoStrings::iterator it = info_strings_.find(key); |
| if (it != info_strings_.end()) it->second = std::move(value); |
| } |
| |
| const string* RuntimeProfile::GetInfoString(const string& key) const { |
| lock_guard<SpinLock> l(info_strings_lock_); |
| InfoStrings::const_iterator it = info_strings_.find(key); |
| if (it == info_strings_.end()) return NULL; |
| return &it->second; |
| } |
| |
| void RuntimeProfile::AddCodegenMsg( |
| bool codegen_enabled, const string& extra_info, const string& extra_label) { |
| string str = codegen_enabled ? "Codegen Enabled" : "Codegen Disabled"; |
| if (!extra_info.empty()) str = str + ": " + extra_info; |
| if (!extra_label.empty()) str = extra_label + " " + str; |
| AppendExecOption(str); |
| } |
| |
| #define ADD_COUNTER_IMPL(NAME, T) \ |
| RuntimeProfile::T* RuntimeProfile::NAME( \ |
| const string& name, TUnit::type unit, const string& parent_counter_name) { \ |
| lock_guard<SpinLock> l(counter_map_lock_); \ |
| bool dummy; \ |
| return NAME##Locked(name, unit, parent_counter_name, &dummy); \ |
| } \ |
| RuntimeProfile::T* RuntimeProfile::NAME##Locked( const string& name, \ |
| TUnit::type unit, const string& parent_counter_name, bool* created) { \ |
| counter_map_lock_.DCheckLocked(); \ |
| DCHECK_EQ(is_averaged_profile_, false); \ |
| if (counter_map_.find(name) != counter_map_.end()) { \ |
| *created = false; \ |
| return reinterpret_cast<T*>(counter_map_[name]); \ |
| } \ |
| DCHECK(parent_counter_name == ROOT_COUNTER \ |
| || counter_map_.find(parent_counter_name) != counter_map_.end()); \ |
| T* counter = pool_->Add(new T(unit)); \ |
| counter_map_[name] = counter; \ |
| set<string>* child_counters = \ |
| FindOrInsert(&child_counter_map_, parent_counter_name, set<string>()); \ |
| child_counters->insert(name); \ |
| *created = true; \ |
| return counter; \ |
| } |
| |
| ADD_COUNTER_IMPL(AddCounter, Counter); |
| ADD_COUNTER_IMPL(AddHighWaterMarkCounter, HighWaterMarkCounter); |
| ADD_COUNTER_IMPL(AddConcurrentTimerCounter, ConcurrentTimerCounter); |
| |
| RuntimeProfile::DerivedCounter* RuntimeProfile::AddDerivedCounter( |
| const string& name, TUnit::type unit, |
| const SampleFunction& counter_fn, const string& parent_counter_name) { |
| DCHECK_EQ(is_averaged_profile_, false); |
| lock_guard<SpinLock> l(counter_map_lock_); |
| if (counter_map_.find(name) != counter_map_.end()) return NULL; |
| DerivedCounter* counter = pool_->Add(new DerivedCounter(unit, counter_fn)); |
| counter_map_[name] = counter; |
| set<string>* child_counters = |
| FindOrInsert(&child_counter_map_, parent_counter_name, set<string>()); |
| child_counters->insert(name); |
| return counter; |
| } |
| |
| RuntimeProfile::ThreadCounters* RuntimeProfile::AddThreadCounters( |
| const string& prefix) { |
| ThreadCounters* counter = pool_->Add(new ThreadCounters()); |
| counter->total_time_ = AddCounter(prefix + THREAD_TOTAL_TIME, TUnit::TIME_NS); |
| counter->user_time_ = AddCounter(prefix + THREAD_USER_TIME, TUnit::TIME_NS, |
| prefix + THREAD_TOTAL_TIME); |
| counter->sys_time_ = AddCounter(prefix + THREAD_SYS_TIME, TUnit::TIME_NS, |
| prefix + THREAD_TOTAL_TIME); |
| counter->voluntary_context_switches_ = |
| AddCounter(prefix + THREAD_VOLUNTARY_CONTEXT_SWITCHES, TUnit::UNIT); |
| counter->involuntary_context_switches_ = |
| AddCounter(prefix + THREAD_INVOLUNTARY_CONTEXT_SWITCHES, TUnit::UNIT); |
| return counter; |
| } |
| |
| void RuntimeProfile::AddLocalTimeCounter(const SampleFunction& counter_fn) { |
| DerivedCounter* local_time_counter = pool_->Add( |
| new DerivedCounter(TUnit::TIME_NS, counter_fn)); |
| lock_guard<SpinLock> l(counter_map_lock_); |
| DCHECK(counter_map_.find(LOCAL_TIME_COUNTER_NAME) == counter_map_.end()) |
| << "LocalTimeCounter already exists in the map."; |
| counter_map_[LOCAL_TIME_COUNTER_NAME] = local_time_counter; |
| } |
| |
| RuntimeProfile::Counter* RuntimeProfile::GetCounter(const string& name) { |
| lock_guard<SpinLock> l(counter_map_lock_); |
| if (counter_map_.find(name) != counter_map_.end()) { |
| return counter_map_[name]; |
| } |
| return NULL; |
| } |
| |
| RuntimeProfile::SummaryStatsCounter* RuntimeProfile::GetSummaryStatsCounter( |
| const string& name) { |
| lock_guard<SpinLock> l(summary_stats_map_lock_); |
| if (summary_stats_map_.find(name) != summary_stats_map_.end()) { |
| return summary_stats_map_[name]; |
| } |
| return nullptr; |
| } |
| |
| void RuntimeProfile::GetCounters(const string& name, vector<Counter*>* counters) { |
| Counter* c = GetCounter(name); |
| if (c != NULL) counters->push_back(c); |
| |
| lock_guard<SpinLock> l(children_lock_); |
| for (int i = 0; i < children_.size(); ++i) { |
| children_[i].first->GetCounters(name, counters); |
| } |
| } |
| |
| RuntimeProfile::EventSequence* RuntimeProfile::GetEventSequence(const string& name) const |
| { |
| lock_guard<SpinLock> l(event_sequence_lock_); |
| EventSequenceMap::const_iterator it = event_sequence_map_.find(name); |
| if (it == event_sequence_map_.end()) return NULL; |
| return it->second; |
| } |
| |
| void RuntimeProfile::ToJson(Document* d) const{ |
| // queryObj that stores all JSON format profile information |
| Value queryObj(kObjectType); |
| RuntimeProfile::ToJsonHelper(&queryObj, d); |
| d->RemoveMember("contents"); |
| d->AddMember("contents", queryObj, d->GetAllocator()); |
| } |
| |
| void RuntimeProfile::ToJsonCounters(Value* parent, Document* d, |
| const string& counter_name, const CounterMap& counter_map, |
| const ChildCounterMap& child_counter_map) const{ |
| auto& allocator = d->GetAllocator(); |
| ChildCounterMap::const_iterator itr = child_counter_map.find(counter_name); |
| if (itr != child_counter_map.end()) { |
| const set<string>& child_counters = itr->second; |
| for (const string& child_counter: child_counters) { |
| CounterMap::const_iterator iter = counter_map.find(child_counter); |
| if (iter == counter_map.end()) continue; |
| |
| Value counter(kObjectType); |
| iter->second->ToJson(*d, &counter); |
| counter.AddMember("counter_name", StringRef(child_counter.c_str()), allocator); |
| |
| Value child_counters_json(kArrayType); |
| RuntimeProfile::ToJsonCounters(&child_counters_json, d, |
| child_counter, counter_map,child_counter_map); |
| if (!child_counters_json.Empty()){ |
| counter.AddMember("child_counters", child_counters_json, allocator); |
| } |
| parent->PushBack(counter, allocator); |
| } |
| } |
| } |
| |
| void RuntimeProfile::ToJsonHelper(Value* parent, Document* d) const{ |
| Document::AllocatorType& allocator = d->GetAllocator(); |
| // Create copy of counter_map_ and child_counter_map_ so we don't need to hold lock |
| // while we call value() on the counters (some of those might be DerivedCounters). |
| CounterMap counter_map; |
| ChildCounterMap child_counter_map; |
| { |
| lock_guard<SpinLock> l(counter_map_lock_); |
| counter_map = counter_map_; |
| child_counter_map = child_counter_map_; |
| } |
| |
| // 1. Name |
| Value name(name_.c_str(), allocator); |
| parent->AddMember("profile_name", name, allocator); |
| |
| // 2. Num_children |
| parent->AddMember("num_children", children_.size(), allocator); |
| |
| // 3. Metadata |
| // Set the required metadata field to the plan node ID for compatibility with any tools |
| // that rely on the plan node id being set there. |
| // Legacy field. May contain the node ID for plan nodes. |
| // Replaced by node_metadata, which contains richer metadata. |
| parent->AddMember("metadata", |
| metadata_.__isset.plan_node_id ? metadata_.plan_node_id : -1, allocator); |
| // requires exactly one field of a union to be set so we only mark node_metadata |
| // as set if that is the case. |
| if (metadata_.__isset.plan_node_id || metadata_.__isset.data_sink_id){ |
| Value node_metadata_json(kObjectType); |
| if (metadata_.__isset.plan_node_id){ |
| node_metadata_json.AddMember("plan_node_id", metadata_.plan_node_id, allocator); |
| } |
| if (metadata_.__isset.data_sink_id){ |
| node_metadata_json.AddMember("data_sink_id", metadata_.data_sink_id, allocator); |
| } |
| parent->AddMember("node_metadata", node_metadata_json, allocator); |
| } |
| |
| // 4. Info_strings |
| { |
| lock_guard<SpinLock> l(info_strings_lock_); |
| if (!info_strings_.empty()) { |
| Value info_strings_json(kArrayType); |
| for (const string& key : info_strings_display_order_) { |
| Value info_string_json(kObjectType); |
| Value key_json(key.c_str(), allocator); |
| auto value_itr = info_strings_.find(key); |
| DCHECK(value_itr != info_strings_.end()); |
| Value value_json(value_itr->second.c_str(), allocator); |
| info_string_json.AddMember("key", key_json, allocator); |
| info_string_json.AddMember("value", value_json, allocator); |
| info_strings_json.PushBack(info_string_json, allocator); |
| } |
| parent->AddMember("info_strings", info_strings_json, allocator); |
| } |
| } |
| |
| // 5. Events |
| { |
| lock_guard<SpinLock> l(event_sequence_lock_); |
| if (!event_sequence_map_.empty()) { |
| Value event_sequences_json(kArrayType); |
| for (EventSequenceMap::const_iterator it = event_sequence_map_.begin(); |
| it != event_sequence_map_.end(); ++it) { |
| Value event_sequence_json(kObjectType); |
| it->second->ToJson(*d, &event_sequence_json); |
| event_sequences_json.PushBack(event_sequence_json, allocator); |
| } |
| parent->AddMember("event_sequences", event_sequences_json, allocator); |
| } |
| } |
| |
| |
| // 6. Counters |
| Value counters(kArrayType); |
| RuntimeProfile::ToJsonCounters(&counters , d, "", counter_map, child_counter_map); |
| if (!counters.Empty()) { |
| parent->AddMember("counters", counters, allocator); |
| } |
| |
| // 7. SummaryStatsCounter |
| { |
| lock_guard<SpinLock> l(summary_stats_map_lock_); |
| if (!summary_stats_map_.empty()) { |
| Value summary_stats_counters_json(kArrayType); |
| for (const SummaryStatsCounterMap::value_type& v : summary_stats_map_) { |
| Value summary_stats_counter(kObjectType); |
| Value summary_name(v.first.c_str(), allocator); |
| v.second->ToJson(*d, &summary_stats_counter); |
| // Remove Kind here because it would be redundant information for users |
| summary_stats_counter.RemoveMember("kind"); |
| summary_stats_counter.AddMember("counter_name", summary_name, allocator); |
| summary_stats_counters_json.PushBack(summary_stats_counter, allocator); |
| } |
| parent->AddMember( |
| "summary_stats_counters", summary_stats_counters_json, allocator); |
| } |
| } |
| |
| // 8. Time_series_counter_map |
| { |
| // Print all time series counters as following: |
| // - <Name> (<period>): <val1>, <val2>, <etc> |
| lock_guard<SpinLock> l(counter_map_lock_); |
| if (!time_series_counter_map_.empty()) { |
| Value time_series_counters_json(kArrayType); |
| for (const TimeSeriesCounterMap::value_type& v : time_series_counter_map_) { |
| TimeSeriesCounter* counter = v.second; |
| Value time_series_json(kObjectType); |
| counter->ToJson(*d, &time_series_json); |
| time_series_counters_json.PushBack(time_series_json, allocator); |
| } |
| parent->AddMember("time_series_counters", time_series_counters_json, allocator); |
| } |
| } |
| |
| // 9. Children Runtime Profiles |
| // |
| // Create copy of children_ so we don't need to hold lock while we call |
| // ToJsonHelper() on the children. |
| ChildVector children; |
| { |
| lock_guard<SpinLock> l(children_lock_); |
| children = children_; |
| } |
| |
| if (!children.empty()) { |
| Value child_profiles(kArrayType); |
| for (int i = 0; i < children.size(); ++i) { |
| RuntimeProfile* profile = children[i].first; |
| Value child_profile(kObjectType); |
| profile->ToJsonHelper(&child_profile, d); |
| child_profiles.PushBack(child_profile, allocator); |
| } |
| parent->AddMember("child_profiles", child_profiles, allocator); |
| } |
| } |
| |
| // Print the profile: |
| // 1. Profile Name |
| // 2. Info Strings |
| // 3. Counters |
| // 4. Children |
| void RuntimeProfile::PrettyPrint(ostream* s, const string& prefix) const { |
| ostream& stream = *s; |
| |
| // Create copy of counter_map_ and child_counter_map_ so we don't need to hold lock |
| // while we call value() on the counters (some of those might be DerivedCounters). |
| CounterMap counter_map; |
| ChildCounterMap child_counter_map; |
| { |
| lock_guard<SpinLock> l(counter_map_lock_); |
| counter_map = counter_map_; |
| child_counter_map = child_counter_map_; |
| } |
| |
| map<string, Counter*>::const_iterator total_time = |
| counter_map.find(TOTAL_TIME_COUNTER_NAME); |
| DCHECK(total_time != counter_map.end()); |
| |
| stream.flags(ios::fixed); |
| stream << prefix << name_ << ":"; |
| if (total_time->second->value() != 0) { |
| stream << "(Total: " |
| << PrettyPrinter::Print(total_time->second->value(), |
| total_time->second->unit()) |
| << ", non-child: " |
| << PrettyPrinter::Print(local_time_ns_, TUnit::TIME_NS) |
| << ", % non-child: " |
| << setprecision(2) << local_time_percent_ |
| << "%)"; |
| } |
| stream << endl; |
| |
| { |
| lock_guard<SpinLock> l(info_strings_lock_); |
| for (const string& key: info_strings_display_order_) { |
| stream << prefix << " " << key << ": " << info_strings_.find(key)->second << endl; |
| } |
| } |
| |
| { |
| // Print all the event timers as the following: |
| // <EventKey> Timeline: 2s719ms |
| // - Event 1: 6.522us (6.522us) |
| // - Event 2: 2s288ms (2s288ms) |
| // - Event 3: 2s410ms (121.138ms) |
| // The times in parentheses are the time elapsed since the last event. |
| vector<EventSequence::Event> events; |
| lock_guard<SpinLock> l(event_sequence_lock_); |
| for (const EventSequenceMap::value_type& event_sequence: event_sequence_map_) { |
| // If the stopwatch has never been started (e.g. because this sequence came from |
| // Thrift), look for the last element to tell us the total runtime. For |
| // currently-updating sequences, it's better to use the stopwatch value because that |
| // updates continuously. |
| int64_t last = event_sequence.second->ElapsedTime(); |
| event_sequence.second->GetEvents(&events); |
| if (last == 0 && events.size() > 0) last = events.back().second; |
| stream << prefix << " " << event_sequence.first << ": " |
| << PrettyPrinter::Print(last, TUnit::TIME_NS) |
| << endl; |
| |
| int64_t prev = 0L; |
| event_sequence.second->GetEvents(&events); |
| for (const EventSequence::Event& event: events) { |
| stream << prefix << " - " << event.first << ": " |
| << PrettyPrinter::Print(event.second, TUnit::TIME_NS) << " (" |
| << PrettyPrinter::Print(event.second - prev, TUnit::TIME_NS) << ")" |
| << endl; |
| prev = event.second; |
| } |
| } |
| } |
| |
| { |
| // Print all time series counters as following: |
| // - <Name> (<period>): <val1>, <val2>, <etc> |
| lock_guard<SpinLock> l(counter_map_lock_); |
| for (const TimeSeriesCounterMap::value_type& v: time_series_counter_map_) { |
| const TimeSeriesCounter* counter = v.second; |
| lock_guard<SpinLock> l(counter->lock_); |
| int num, period; |
| const int64_t* samples = counter->GetSamplesLocked(&num, &period); |
| if (num > 0) { |
| // Clamp number of printed values at 64, the maximum number of values in the |
| // SamplingTimeSeriesCounter. |
| int step = 1 + (num - 1) / 64; |
| period *= step; |
| stream << prefix << " - " << v.first << " (" |
| << PrettyPrinter::Print(period * 1000000L, TUnit::TIME_NS) << "): "; |
| for (int i = 0; i < num; i += step) { |
| stream << PrettyPrinter::Print(samples[i], counter->unit()); |
| if (i + step < num) stream << ", "; |
| } |
| if (step > 1) { |
| stream << " (Showing " << ((num + 1) / step) << " of " << num << " values from " |
| "Thrift Profile)"; |
| } |
| stream << endl; |
| } |
| } |
| } |
| |
| { |
| lock_guard<SpinLock> l(summary_stats_map_lock_); |
| // Print all SummaryStatsCounters as following: |
| // <Name>: (Avg: <value> ; Min: <min_value> ; Max: <max_value> ; |
| // Number of samples: <total>) |
| for (const SummaryStatsCounterMap::value_type& v: summary_stats_map_) { |
| if (v.second->TotalNumValues() == 0) { |
| // No point printing all the stats if number of samples is zero. |
| stream << prefix << " - " << v.first << ": " |
| << PrettyPrinter::Print(v.second->value(), v.second->unit(), true) |
| << " (Number of samples: " << v.second->TotalNumValues() << ")" << endl; |
| } else { |
| stream << prefix << " - " << v.first << ": (Avg: " |
| << PrettyPrinter::Print(v.second->value(), v.second->unit(), true) |
| << " ; Min: " |
| << PrettyPrinter::Print(v.second->MinValue(), v.second->unit(), true) |
| << " ; Max: " |
| << PrettyPrinter::Print(v.second->MaxValue(), v.second->unit(), true) |
| << " ; Number of samples: " << v.second->TotalNumValues() << ")" << endl; |
| } |
| } |
| } |
| RuntimeProfile::PrintChildCounters( |
| prefix, ROOT_COUNTER, counter_map, child_counter_map, s); |
| |
| // Create copy of children_ so we don't need to hold lock while we call |
| // PrettyPrint() on the children. |
| ChildVector children; |
| { |
| lock_guard<SpinLock> l(children_lock_); |
| children = children_; |
| } |
| for (int i = 0; i < children.size(); ++i) { |
| RuntimeProfile* profile = children[i].first; |
| bool indent = children[i].second; |
| profile->PrettyPrint(s, prefix + (indent ? " " : "")); |
| } |
| } |
| |
| Status RuntimeProfile::SerializeToArchiveString(string* out) const { |
| stringstream ss; |
| RETURN_IF_ERROR(SerializeToArchiveString(&ss)); |
| *out = ss.str(); |
| return Status::OK(); |
| } |
| |
| Status RuntimeProfile::SerializeToArchiveString(stringstream* out) const { |
| Status status; |
| TRuntimeProfileTree thrift_object; |
| const_cast<RuntimeProfile*>(this)->ToThrift(&thrift_object); |
| ThriftSerializer serializer(true); |
| vector<uint8_t> serialized_buffer; |
| RETURN_IF_ERROR(serializer.SerializeToVector(&thrift_object, &serialized_buffer)); |
| |
| // Compress the serialized thrift string. This uses string keys and is very |
| // easy to compress. |
| scoped_ptr<Codec> compressor; |
| Codec::CodecInfo codec_info(THdfsCompression::DEFAULT); |
| RETURN_IF_ERROR(Codec::CreateCompressor(NULL, false, codec_info, &compressor)); |
| const auto close_compressor = |
| MakeScopeExitTrigger([&compressor]() { compressor->Close(); }); |
| |
| vector<uint8_t> compressed_buffer; |
| int64_t max_compressed_size = compressor->MaxOutputLen(serialized_buffer.size()); |
| DCHECK_GT(max_compressed_size, 0); |
| compressed_buffer.resize(max_compressed_size); |
| int64_t result_len = compressed_buffer.size(); |
| uint8_t* compressed_buffer_ptr = compressed_buffer.data(); |
| RETURN_IF_ERROR(compressor->ProcessBlock(true, serialized_buffer.size(), |
| serialized_buffer.data(), &result_len, &compressed_buffer_ptr)); |
| compressed_buffer.resize(result_len); |
| |
| Base64Encode(compressed_buffer, out); |
| return Status::OK();; |
| } |
| |
| Status RuntimeProfile::DeserializeFromArchiveString( |
| const std::string& archive_str, TRuntimeProfileTree* out) { |
| int64_t decoded_max; |
| if (!Base64DecodeBufLen(archive_str.c_str(), archive_str.size(), &decoded_max)) { |
| return Status("Error in DeserializeFromArchiveString: Base64DecodeBufLen failed."); |
| } |
| |
| vector<uint8_t> decoded_buffer; |
| decoded_buffer.resize(decoded_max); |
| int64_t decoded_len; |
| if (!Base64Decode(archive_str.c_str(), archive_str.size(), decoded_max, |
| reinterpret_cast<char*>(decoded_buffer.data()), &decoded_len)) { |
| return Status("Error in DeserializeFromArchiveString: Base64Decode failed."); |
| } |
| decoded_buffer.resize(decoded_len); |
| |
| scoped_ptr<Codec> decompressor; |
| MemTracker mem_tracker; |
| MemPool mem_pool(&mem_tracker); |
| const auto close_mem_tracker = MakeScopeExitTrigger([&mem_pool, &mem_tracker]() { |
| mem_pool.FreeAll(); |
| mem_tracker.Close(); |
| }); |
| RETURN_IF_ERROR(Codec::CreateDecompressor( |
| &mem_pool, false, THdfsCompression::DEFAULT, &decompressor)); |
| const auto close_decompressor = |
| MakeScopeExitTrigger([&decompressor]() { decompressor->Close(); }); |
| |
| int64_t result_len; |
| uint8_t* decompressed_buffer; |
| RETURN_IF_ERROR(decompressor->ProcessBlock( |
| false, decoded_len, decoded_buffer.data(), &result_len, &decompressed_buffer)); |
| |
| uint32_t deserialized_len = static_cast<uint32_t>(result_len); |
| RETURN_IF_ERROR( |
| DeserializeThriftMsg(decompressed_buffer, &deserialized_len, true, out)); |
| return Status::OK(); |
| } |
| |
| void RuntimeProfile::SetTExecSummary(const TExecSummary& summary) { |
| lock_guard<SpinLock> l(t_exec_summary_lock_); |
| t_exec_summary_ = summary; |
| } |
| |
| void RuntimeProfile::ToThrift(TRuntimeProfileTree* tree) const { |
| tree->nodes.clear(); |
| ToThrift(&tree->nodes); |
| ExecSummaryToThrift(tree); |
| } |
| |
| void RuntimeProfile::ToThrift(vector<TRuntimeProfileNode>* nodes) const { |
| int index = nodes->size(); |
| nodes->push_back(TRuntimeProfileNode()); |
| TRuntimeProfileNode& node = (*nodes)[index]; |
| node.name = name_; |
| // Set the required metadata field to the plan node ID for compatibility with any tools |
| // that rely on the plan node id being set there. |
| node.metadata = metadata_.__isset.plan_node_id ? metadata_.plan_node_id : -1; |
| // Thrift requires exactly one field of a union to be set so we only mark node_metadata |
| // as set if that is the case. |
| if (metadata_.__isset.plan_node_id || metadata_.__isset.data_sink_id) { |
| node.__set_node_metadata(metadata_); |
| } |
| node.indent = true; |
| |
| CounterMap counter_map; |
| { |
| lock_guard<SpinLock> l(counter_map_lock_); |
| counter_map = counter_map_; |
| node.child_counters_map = child_counter_map_; |
| } |
| for (map<string, Counter*>::const_iterator iter = counter_map.begin(); |
| iter != counter_map.end(); ++iter) { |
| TCounter counter; |
| counter.name = iter->first; |
| counter.value = iter->second->value(); |
| counter.unit = iter->second->unit(); |
| node.counters.push_back(counter); |
| } |
| |
| { |
| lock_guard<SpinLock> l(info_strings_lock_); |
| node.info_strings = info_strings_; |
| node.info_strings_display_order = info_strings_display_order_; |
| } |
| |
| { |
| vector<EventSequence::Event> events; |
| lock_guard<SpinLock> l(event_sequence_lock_); |
| if (event_sequence_map_.size() != 0) { |
| node.__set_event_sequences(vector<TEventSequence>()); |
| node.event_sequences.resize(event_sequence_map_.size()); |
| int idx = 0; |
| for (const EventSequenceMap::value_type& val: event_sequence_map_) { |
| TEventSequence* seq = &node.event_sequences[idx++]; |
| seq->name = val.first; |
| val.second->GetEvents(&events); |
| for (const EventSequence::Event& ev: events) { |
| seq->labels.push_back(ev.first); |
| seq->timestamps.push_back(ev.second); |
| } |
| } |
| } |
| } |
| |
| { |
| lock_guard<SpinLock> l(counter_map_lock_); |
| if (time_series_counter_map_.size() != 0) { |
| node.__set_time_series_counters( |
| vector<TTimeSeriesCounter>(time_series_counter_map_.size())); |
| int idx = 0; |
| for (const TimeSeriesCounterMap::value_type& val: time_series_counter_map_) { |
| val.second->ToThrift(&node.time_series_counters[idx++]); |
| } |
| } |
| } |
| |
| { |
| lock_guard<SpinLock> l(summary_stats_map_lock_); |
| if (summary_stats_map_.size() != 0) { |
| node.__set_summary_stats_counters( |
| vector<TSummaryStatsCounter>(summary_stats_map_.size())); |
| int idx = 0; |
| for (const SummaryStatsCounterMap::value_type& val: summary_stats_map_) { |
| val.second->ToThrift(&node.summary_stats_counters[idx++], val.first); |
| } |
| } |
| } |
| |
| ChildVector children; |
| { |
| lock_guard<SpinLock> l(children_lock_); |
| children = children_; |
| node.num_children = children_.size(); |
| } |
| for (int i = 0; i < children.size(); ++i) { |
| int child_idx = nodes->size(); |
| children[i].first->ToThrift(nodes); |
| // fix up indentation flag |
| (*nodes)[child_idx].indent = children[i].second; |
| } |
| } |
| |
| void RuntimeProfile::ExecSummaryToThrift(TRuntimeProfileTree* tree) const { |
| GetExecSummary(&tree->exec_summary); |
| tree->__isset.exec_summary = true; |
| } |
| |
| void RuntimeProfile::GetExecSummary(TExecSummary* t_exec_summary) const { |
| lock_guard<SpinLock> l(t_exec_summary_lock_); |
| *t_exec_summary = t_exec_summary_; |
| } |
| |
| void RuntimeProfile::SetPlanNodeId(int node_id) { |
| DCHECK(!metadata_.__isset.data_sink_id) << "Don't set conflicting metadata"; |
| metadata_.__set_plan_node_id(node_id); |
| } |
| |
| void RuntimeProfile::SetDataSinkId(int sink_id) { |
| DCHECK(!metadata_.__isset.plan_node_id) << "Don't set conflicting metadata"; |
| metadata_.__set_data_sink_id(sink_id); |
| } |
| |
| int64_t RuntimeProfile::UnitsPerSecond( |
| const RuntimeProfile::Counter* total_counter, const RuntimeProfile::Counter* timer) { |
| DCHECK(total_counter->unit() == TUnit::BYTES || total_counter->unit() == TUnit::UNIT); |
| DCHECK(timer->unit() == TUnit::TIME_NS); |
| |
| if (timer->value() == 0) return 0; |
| double secs = static_cast<double>(timer->value()) / 1000.0 / 1000.0 / 1000.0; |
| return total_counter->value() / secs; |
| } |
| |
| int64_t RuntimeProfile::CounterSum(const vector<Counter*>* counters) { |
| int64_t value = 0; |
| for (int i = 0; i < counters->size(); ++i) { |
| value += (*counters)[i]->value(); |
| } |
| return value; |
| } |
| |
| RuntimeProfile::Counter* RuntimeProfile::AddRateCounter( |
| const string& name, Counter* src_counter) { |
| TUnit::type dst_unit; |
| switch (src_counter->unit()) { |
| case TUnit::BYTES: |
| dst_unit = TUnit::BYTES_PER_SECOND; |
| break; |
| case TUnit::UNIT: |
| dst_unit = TUnit::UNIT_PER_SECOND; |
| break; |
| default: |
| DCHECK(false) << "Unsupported src counter unit: " << src_counter->unit(); |
| return NULL; |
| } |
| { |
| lock_guard<SpinLock> l(counter_map_lock_); |
| bool created; |
| Counter* dst_counter = AddCounterLocked(name, dst_unit, "", &created); |
| if (!created) return dst_counter; |
| rate_counters_.push_back(dst_counter); |
| PeriodicCounterUpdater::RegisterPeriodicCounter(src_counter, NULL, dst_counter, |
| PeriodicCounterUpdater::RATE_COUNTER); |
| has_active_periodic_counters_ = true; |
| return dst_counter; |
| } |
| } |
| |
| RuntimeProfile::Counter* RuntimeProfile::AddRateCounter( |
| const string& name, SampleFunction fn, TUnit::type dst_unit) { |
| lock_guard<SpinLock> l(counter_map_lock_); |
| bool created; |
| Counter* dst_counter = AddCounterLocked(name, dst_unit, "", &created); |
| if (!created) return dst_counter; |
| rate_counters_.push_back(dst_counter); |
| PeriodicCounterUpdater::RegisterPeriodicCounter(NULL, fn, dst_counter, |
| PeriodicCounterUpdater::RATE_COUNTER); |
| has_active_periodic_counters_ = true; |
| return dst_counter; |
| } |
| |
| RuntimeProfile::Counter* RuntimeProfile::AddSamplingCounter( |
| const string& name, Counter* src_counter) { |
| DCHECK(src_counter->unit() == TUnit::UNIT); |
| lock_guard<SpinLock> l(counter_map_lock_); |
| bool created; |
| Counter* dst_counter = AddCounterLocked(name, TUnit::DOUBLE_VALUE, "", &created); |
| if (!created) return dst_counter; |
| sampling_counters_.push_back(dst_counter); |
| PeriodicCounterUpdater::RegisterPeriodicCounter(src_counter, NULL, dst_counter, |
| PeriodicCounterUpdater::SAMPLING_COUNTER); |
| has_active_periodic_counters_ = true; |
| return dst_counter; |
| } |
| |
| RuntimeProfile::Counter* RuntimeProfile::AddSamplingCounter( |
| const string& name, SampleFunction sample_fn) { |
| lock_guard<SpinLock> l(counter_map_lock_); |
| bool created; |
| Counter* dst_counter = AddCounterLocked(name, TUnit::DOUBLE_VALUE, "", &created); |
| if (!created) return dst_counter; |
| sampling_counters_.push_back(dst_counter); |
| PeriodicCounterUpdater::RegisterPeriodicCounter(NULL, sample_fn, dst_counter, |
| PeriodicCounterUpdater::SAMPLING_COUNTER); |
| has_active_periodic_counters_ = true; |
| return dst_counter; |
| } |
| |
| vector<RuntimeProfile::Counter*>* RuntimeProfile::AddBucketingCounters( |
| Counter* src_counter, int num_buckets) { |
| lock_guard<SpinLock> l(counter_map_lock_); |
| vector<RuntimeProfile::Counter*>* buckets = pool_->Add(new vector<Counter*>); |
| for (int i = 0; i < num_buckets; ++i) { |
| buckets->push_back( |
| pool_->Add(new RuntimeProfile::Counter(TUnit::DOUBLE_VALUE, 0))); |
| } |
| bucketing_counters_.insert(buckets); |
| has_active_periodic_counters_ = true; |
| PeriodicCounterUpdater::RegisterBucketingCounters(src_counter, buckets); |
| return buckets; |
| } |
| |
| RuntimeProfile::EventSequence* RuntimeProfile::AddEventSequence(const string& name) { |
| lock_guard<SpinLock> l(event_sequence_lock_); |
| EventSequenceMap::iterator timer_it = event_sequence_map_.find(name); |
| if (timer_it != event_sequence_map_.end()) return timer_it->second; |
| |
| EventSequence* timer = pool_->Add(new EventSequence()); |
| event_sequence_map_[name] = timer; |
| return timer; |
| } |
| |
| RuntimeProfile::EventSequence* RuntimeProfile::AddEventSequence(const string& name, |
| const TEventSequence& from) { |
| lock_guard<SpinLock> l(event_sequence_lock_); |
| EventSequenceMap::iterator timer_it = event_sequence_map_.find(name); |
| if (timer_it != event_sequence_map_.end()) return timer_it->second; |
| |
| EventSequence* timer = pool_->Add(new EventSequence(from.timestamps, from.labels)); |
| event_sequence_map_[name] = timer; |
| return timer; |
| } |
| |
| void RuntimeProfile::PrintChildCounters(const string& prefix, |
| const string& counter_name, const CounterMap& counter_map, |
| const ChildCounterMap& child_counter_map, ostream* s) { |
| ostream& stream = *s; |
| ChildCounterMap::const_iterator itr = child_counter_map.find(counter_name); |
| if (itr != child_counter_map.end()) { |
| const set<string>& child_counters = itr->second; |
| for (const string& child_counter: child_counters) { |
| CounterMap::const_iterator iter = counter_map.find(child_counter); |
| if (iter == counter_map.end()) continue; |
| stream << prefix << " - " << iter->first << ": " |
| << PrettyPrinter::Print(iter->second->value(), iter->second->unit(), true) |
| << endl; |
| RuntimeProfile::PrintChildCounters(prefix + " ", child_counter, counter_map, |
| child_counter_map, s); |
| } |
| } |
| } |
| |
| RuntimeProfile::SummaryStatsCounter* RuntimeProfile::AddSummaryStatsCounter( |
| const string& name, TUnit::type unit, const std::string& parent_counter_name) { |
| DCHECK_EQ(is_averaged_profile_, false); |
| lock_guard<SpinLock> l(summary_stats_map_lock_); |
| if (summary_stats_map_.find(name) != summary_stats_map_.end()) { |
| return summary_stats_map_[name]; |
| } |
| SummaryStatsCounter* counter = pool_->Add(new SummaryStatsCounter(unit)); |
| summary_stats_map_[name] = counter; |
| return counter; |
| } |
| |
| RuntimeProfile::TimeSeriesCounter* RuntimeProfile::AddSamplingTimeSeriesCounter( |
| const string& name, TUnit::type unit, SampleFunction fn) { |
| DCHECK(fn != nullptr); |
| lock_guard<SpinLock> l(counter_map_lock_); |
| TimeSeriesCounterMap::iterator it = time_series_counter_map_.find(name); |
| if (it != time_series_counter_map_.end()) return it->second; |
| TimeSeriesCounter* counter = pool_->Add(new SamplingTimeSeriesCounter(name, unit, fn)); |
| time_series_counter_map_[name] = counter; |
| PeriodicCounterUpdater::RegisterTimeSeriesCounter(counter); |
| has_active_periodic_counters_ = true; |
| return counter; |
| } |
| |
| RuntimeProfile::TimeSeriesCounter* RuntimeProfile::AddSamplingTimeSeriesCounter( |
| const string& name, Counter* src_counter) { |
| DCHECK(src_counter != NULL); |
| return AddSamplingTimeSeriesCounter(name, src_counter->unit(), |
| bind(&Counter::value, src_counter)); |
| } |
| |
| void RuntimeProfile::TimeSeriesCounter::AddSample(int ms_elapsed) { |
| lock_guard<SpinLock> l(lock_); |
| int64_t sample = sample_fn_(); |
| AddSampleLocked(sample, ms_elapsed); |
| } |
| |
| const int64_t* RuntimeProfile::TimeSeriesCounter::GetSamplesLockedForSend( |
| int* num_samples, int* period) { |
| return GetSamplesLocked(num_samples, period); |
| } |
| |
| void RuntimeProfile::TimeSeriesCounter::SetSamples( |
| int period, const std::vector<int64_t>& samples, int64_t start_idx) { |
| DCHECK(false); |
| } |
| |
| void RuntimeProfile::SamplingTimeSeriesCounter::AddSampleLocked( |
| int64_t sample, int ms_elapsed){ |
| samples_.AddSample(sample, ms_elapsed); |
| } |
| |
| const int64_t* RuntimeProfile::SamplingTimeSeriesCounter::GetSamplesLocked( |
| int* num_samples, int* period) const { |
| return samples_.GetSamples(num_samples, period); |
| } |
| |
| RuntimeProfile::ChunkedTimeSeriesCounter::ChunkedTimeSeriesCounter( |
| const string& name, TUnit::type unit, SampleFunction fn) |
| : TimeSeriesCounter(name, unit, fn) |
| , period_(FLAGS_periodic_counter_update_period_ms) |
| , max_size_(10 * FLAGS_status_report_interval_ms / period_) {} |
| |
| void RuntimeProfile::ChunkedTimeSeriesCounter::Clear() { |
| lock_guard<SpinLock> l(lock_); |
| previous_sample_count_ += last_get_count_; |
| values_.erase(values_.begin(), values_.begin() + last_get_count_); |
| last_get_count_ = 0; |
| } |
| |
| void RuntimeProfile::ChunkedTimeSeriesCounter::AddSampleLocked( |
| int64_t sample, int ms_elapsed) { |
| // We chose inefficiently erasing elements from a vector over using a std::deque because |
| // this should only happen very infrequently and we rely on contiguous storage in |
| // GetSamplesLocked*(). |
| if (max_size_ > 0 && values_.size() == max_size_) { |
| KLOG_EVERY_N_SECS(WARNING, 60) << "ChunkedTimeSeriesCounter reached maximum size"; |
| values_.erase(values_.begin(), values_.begin() + 1); |
| } |
| DCHECK_LT(values_.size(), max_size_); |
| values_.push_back(sample); |
| } |
| |
| const int64_t* RuntimeProfile::ChunkedTimeSeriesCounter::GetSamplesLocked( |
| int* num_samples, int* period) const { |
| DCHECK(num_samples != nullptr); |
| DCHECK(period != nullptr); |
| *num_samples = values_.size(); |
| *period = period_; |
| return values_.data(); |
| } |
| |
| const int64_t* RuntimeProfile::ChunkedTimeSeriesCounter::GetSamplesLockedForSend( |
| int* num_samples, int* period) { |
| last_get_count_ = values_.size(); |
| return GetSamplesLocked(num_samples, period); |
| } |
| |
| void RuntimeProfile::ChunkedTimeSeriesCounter::SetSamples( |
| int period, const std::vector<int64_t>& samples, int64_t start_idx) { |
| lock_guard<SpinLock> l(lock_); |
| if (start_idx == 0) { |
| // This could be coming from a SamplingTimeSeriesCounter or another |
| // ChunkedTimeSeriesCounter. |
| period_ = period; |
| values_ = samples; |
| return; |
| } |
| // Only ChunkedTimeSeriesCounter will set start_idx > 0. |
| DCHECK_GT(start_idx, 0); |
| DCHECK_EQ(period_, period); |
| if (values_.size() < start_idx) { |
| // Fill up with 0. |
| values_.resize(start_idx); |
| } |
| DCHECK_GE(values_.size(), start_idx); |
| // Skip values we already received. |
| auto start_it = samples.begin() + values_.size() - start_idx; |
| values_.insert(values_.end(), start_it, samples.end()); |
| } |
| |
| RuntimeProfile::TimeSeriesCounter* RuntimeProfile::AddChunkedTimeSeriesCounter( |
| const string& name, TUnit::type unit, SampleFunction fn) { |
| DCHECK(fn != nullptr); |
| lock_guard<SpinLock> l(counter_map_lock_); |
| TimeSeriesCounterMap::iterator it = time_series_counter_map_.find(name); |
| if (it != time_series_counter_map_.end()) return it->second; |
| TimeSeriesCounter* counter = pool_->Add(new ChunkedTimeSeriesCounter(name, unit, fn)); |
| time_series_counter_map_[name] = counter; |
| PeriodicCounterUpdater::RegisterTimeSeriesCounter(counter); |
| has_active_periodic_counters_ = true; |
| return counter; |
| } |
| |
| void RuntimeProfile::ClearChunkedTimeSeriesCounters() { |
| { |
| lock_guard<SpinLock> l(counter_map_lock_); |
| for (auto& it : time_series_counter_map_) it.second->Clear(); |
| } |
| { |
| lock_guard<SpinLock> l(children_lock_); |
| for (int i = 0; i < children_.size(); ++i) { |
| children_[i].first->ClearChunkedTimeSeriesCounters(); |
| } |
| } |
| } |
| |
| void RuntimeProfile::TimeSeriesCounter::ToThrift(TTimeSeriesCounter* counter) { |
| lock_guard<SpinLock> l(lock_); |
| int num, period; |
| const int64_t* samples = GetSamplesLockedForSend(&num, &period); |
| counter->values.resize(num); |
| Ubsan::MemCpy(counter->values.data(), samples, num * sizeof(int64_t)); |
| |
| counter->name = name_; |
| counter->unit = unit_; |
| counter->period_ms = period; |
| counter->__set_start_index(previous_sample_count_); |
| } |
| |
| void RuntimeProfile::EventSequence::ToThrift(TEventSequence* seq) { |
| lock_guard<SpinLock> l(lock_); |
| /// It's possible that concurrent events can be logged out of sequence so we sort the |
| /// events before serializing them. |
| SortEvents(); |
| for (const EventSequence::Event& ev: events_) { |
| seq->labels.push_back(ev.first); |
| seq->timestamps.push_back(ev.second); |
| } |
| } |
| |
| void RuntimeProfile::SummaryStatsCounter::ToThrift(TSummaryStatsCounter* counter, |
| const std::string& name) { |
| lock_guard<SpinLock> l(lock_); |
| counter->name = name; |
| counter->unit = unit_; |
| counter->sum = sum_; |
| counter->total_num_values = total_num_values_; |
| counter->min_value = min_; |
| counter->max_value = max_; |
| } |
| |
| void RuntimeProfile::SummaryStatsCounter::UpdateCounter(int64_t new_value) { |
| lock_guard<SpinLock> l(lock_); |
| |
| ++total_num_values_; |
| sum_ += new_value; |
| value_.Store(sum_ / total_num_values_); |
| |
| if (new_value < min_) min_ = new_value; |
| if (new_value > max_) max_ = new_value; |
| } |
| |
| void RuntimeProfile::SummaryStatsCounter::SetStats(const TSummaryStatsCounter& counter) { |
| // We drop this input if it looks malformed. |
| if (counter.total_num_values < 0) return; |
| lock_guard<SpinLock> l(lock_); |
| unit_ = counter.unit; |
| sum_ = counter.sum; |
| total_num_values_ = counter.total_num_values; |
| min_ = counter.min_value; |
| max_ = counter.max_value; |
| |
| value_.Store(total_num_values_ == 0 ? 0 : sum_ / total_num_values_); |
| } |
| |
| int64_t RuntimeProfile::SummaryStatsCounter::MinValue() { |
| lock_guard<SpinLock> l(lock_); |
| return min_; |
| } |
| |
| int64_t RuntimeProfile::SummaryStatsCounter::MaxValue() { |
| lock_guard<SpinLock> l(lock_); |
| return max_; |
| } |
| |
| int32_t RuntimeProfile::SummaryStatsCounter::TotalNumValues() { |
| lock_guard<SpinLock> l(lock_); |
| return total_num_values_; |
| } |
| |
| void RuntimeProfile::Counter::ToJson(Document& document, Value* val) const { |
| Value counter_json(kObjectType); |
| counter_json.AddMember("value", value(), document.GetAllocator()); |
| auto unit_itr = _TUnit_VALUES_TO_NAMES.find(unit_); |
| DCHECK(unit_itr != _TUnit_VALUES_TO_NAMES.end()); |
| Value unit_json(unit_itr->second, document.GetAllocator()); |
| counter_json.AddMember("unit", unit_json, document.GetAllocator()); |
| Value kind_json(CounterType().c_str(), document.GetAllocator()); |
| counter_json.AddMember("kind", kind_json, document.GetAllocator()); |
| *val = counter_json; |
| } |
| |
| void RuntimeProfile::TimeSeriesCounter::ToJson(Document& document, Value* val) { |
| lock_guard<SpinLock> lock(lock_); |
| Value counter_json(kObjectType); |
| counter_json.AddMember("counter_name", |
| StringRef(name_.c_str()), document.GetAllocator()); |
| auto unit_itr = _TUnit_VALUES_TO_NAMES.find(unit_); |
| DCHECK(unit_itr != _TUnit_VALUES_TO_NAMES.end()); |
| Value unit_json(unit_itr->second, document.GetAllocator()); |
| counter_json.AddMember("unit", unit_json, document.GetAllocator()); |
| |
| int num, period; |
| const int64_t* samples = GetSamplesLocked(&num, &period); |
| |
| counter_json.AddMember("num", num, document.GetAllocator()); |
| counter_json.AddMember("period", period, document.GetAllocator()); |
| stringstream stream; |
| // Clamp number of printed values at 64, the maximum number of values in the |
| // SamplingTimeSeriesCounter. |
| int step = 1 + (num - 1) / 64; |
| period *= step; |
| |
| for (int i = 0; i < num; i += step) { |
| stream << samples[i]; |
| if (i + step < num) stream << ","; |
| } |
| |
| Value samples_data_json(stream.str().c_str(), document.GetAllocator()); |
| counter_json.AddMember("data", samples_data_json, document.GetAllocator()); |
| *val = counter_json; |
| } |
| |
| void RuntimeProfile::EventSequence::ToJson(Document& document, Value* value) { |
| boost::lock_guard<SpinLock> event_lock(lock_); |
| SortEvents(); |
| |
| Value event_sequence_json(kObjectType); |
| event_sequence_json.AddMember("offset", offset_, document.GetAllocator()); |
| |
| Value events_json(kArrayType); |
| |
| for (const Event& ev: events_) { |
| Value event_json(kObjectType); |
| event_json.AddMember("label", StringRef(ev.first.c_str()), document.GetAllocator()); |
| event_json.AddMember("timestamp", ev.second, document.GetAllocator()); |
| events_json.PushBack(event_json, document.GetAllocator()); |
| } |
| |
| event_sequence_json.AddMember("events", events_json, document.GetAllocator()); |
| *value = event_sequence_json; |
| } |
| |
| } |