blob: f6b3300860fcca208397fc35d41c97444a566337 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "manager/tmetrics-collector.h"
#include <iostream>
#include <list>
#include <map>
#include <string>
#include "metrics/tmanager-metrics.h"
#include "basics/basics.h"
#include "errors/errors.h"
#include "threads/threads.h"
#include "network/network.h"
#include "zookeeper/zkclient.h"
#include "proto/metrics.pb.h"
#include "proto/tmanager.pb.h"
#include "proto/topology.pb.h"
#include "config/heron-internals-config-reader.h"
namespace {
typedef heron::common::TManagerMetrics TManagerMetrics;
typedef heron::proto::tmanager::ExceptionLogRequest ExceptionLogRequest;
typedef heron::proto::tmanager::ExceptionLogResponse ExceptionLogResponse;
typedef heron::proto::tmanager::MetricRequest MetricRequest;
typedef heron::proto::tmanager::MetricResponse MetricResponse;
typedef heron::proto::tmanager::MetricResponse::IndividualMetric IndividualMetric;
typedef heron::proto::tmanager::MetricResponse::IndividualMetric::IntervalValue IntervalValue;
typedef heron::proto::tmanager::TmanagerExceptionLog TmanagerExceptionLog;
typedef heron::proto::tmanager::PublishMetrics PublishMetrics;
} // namespace
namespace heron {
namespace tmanager {
TMetricsCollector::TMetricsCollector(sp_int32 _max_interval, std::shared_ptr<EventLoop> eventLoop,
const std::string& metrics_sinks_yaml)
: max_interval_(_max_interval),
eventLoop_(eventLoop),
metrics_sinks_yaml_(metrics_sinks_yaml),
tmetrics_info_(make_unique<common::TManagerMetrics>(metrics_sinks_yaml, eventLoop)),
start_time_(time(NULL)) {
interval_ = config::HeronInternalsConfigReader::Instance()
->GetHeronTmanagerMetricsCollectorPurgeIntervalSec();
CHECK_EQ(max_interval_ % interval_, 0);
nintervals_ = max_interval_ / interval_;
auto cb = [this](EventLoop::Status status) { this->Purge(status); };
CHECK_GT(eventLoop_->registerTimer(std::move(cb), false, interval_ * 1000000), 0);
}
TMetricsCollector::~TMetricsCollector() {}
void TMetricsCollector::Purge(EventLoop::Status) {
for (auto iter = metrics_.begin(); iter != metrics_.end(); ++iter) {
iter->second->Purge();
}
auto cb = [this](EventLoop::Status status) { this->Purge(status); };
CHECK_GT(eventLoop_->registerTimer(std::move(cb), false, interval_ * 1000000), 0);
}
void TMetricsCollector::AddMetricsForComponent(const sp_string& component_name,
const proto::tmanager::MetricDatum& metrics_data) {
auto component_metrics = GetOrCreateComponentMetrics(component_name);
const sp_string& name = metrics_data.name();
const TManagerMetrics::MetricAggregationType& type = tmetrics_info_->GetAggregationType(name);
component_metrics->AddMetricForInstance(metrics_data.instance_id(), name, type,
metrics_data.value());
}
void TMetricsCollector::AddExceptionsForComponent(const sp_string& component_name,
const TmanagerExceptionLog& exception_log) {
auto component_metrics = GetOrCreateComponentMetrics(component_name);
component_metrics->AddExceptionForInstance(exception_log.instance_id(), exception_log);
}
void TMetricsCollector::AddMetric(const PublishMetrics& _metrics) {
for (sp_int32 i = 0; i < _metrics.metrics_size(); ++i) {
const sp_string& component_name = _metrics.metrics(i).component_name();
AddMetricsForComponent(component_name, _metrics.metrics(i));
}
for (int i = 0; i < _metrics.exceptions_size(); i++) {
const sp_string& component_name = _metrics.exceptions(i).component_name();
AddExceptionsForComponent(component_name, _metrics.exceptions(i));
}
}
unique_ptr<MetricResponse> TMetricsCollector::GetMetrics(const MetricRequest& _request,
const proto::api::Topology& _topology) {
auto response = make_unique<MetricResponse>();
if (metrics_.find(_request.component_name()) == metrics_.end()) {
bool component_exists = false;
for (int i = 0; i < _topology.spouts_size(); i++) {
if ((_topology.spouts(i)).comp().name() == _request.component_name()) {
component_exists = true;
break;
}
}
if (!component_exists) {
for (int i = 0; i < _topology.bolts_size(); i++) {
if ((_topology.bolts(i)).comp().name() == _request.component_name()) {
component_exists = true;
break;
}
}
}
if (component_exists) {
LOG(WARNING) << "Metrics for component `" << _request.component_name()
<< "` are not available";
response->mutable_status()->set_status(proto::system::NOTOK);
response->mutable_status()->set_message("Metrics not available for component `" + \
_request.component_name() + "`");
} else {
LOG(ERROR) << "GetMetrics request received for unknown component "
<< _request.component_name();
response->mutable_status()->set_status(proto::system::NOTOK);
response->mutable_status()->set_message("Unknown component: " + _request.component_name());
}
} else if (!_request.has_interval() && !_request.has_explicit_interval()) {
LOG(ERROR) << "GetMetrics request does not have either interval"
<< " nor explicit interval";
response->mutable_status()->set_status(proto::system::NOTOK);
response->mutable_status()->set_message("No interval or explicit interval set");
} else {
sp_int64 start_time, end_time;
if (_request.has_interval()) {
end_time = time(NULL);
if (_request.interval() <= 0) {
start_time = 0;
} else {
start_time = end_time - _request.interval();
}
} else {
start_time = _request.explicit_interval().start();
end_time = _request.explicit_interval().end();
}
metrics_[_request.component_name()]->GetMetrics(_request, start_time, end_time, *response);
response->set_interval(end_time - start_time);
}
return response;
}
void TMetricsCollector::GetExceptionsHelper(const ExceptionLogRequest& request,
ExceptionLogResponse& exceptions) {
auto component_metrics = metrics_[request.component_name()];
if (request.instances_size() == 0) {
component_metrics->GetAllExceptions(exceptions);
} else {
for (int i = 0; i < request.instances_size(); ++i) {
component_metrics->GetExceptionsForInstance(request.instances(i), exceptions);
}
}
}
unique_ptr<ExceptionLogResponse> TMetricsCollector::GetExceptions(
const ExceptionLogRequest& request) {
auto response = make_unique<ExceptionLogResponse>();
if (metrics_.find(request.component_name()) == metrics_.end()) {
LOG(ERROR) << "GetExceptions request received for unknown component "
<< request.component_name();
response->mutable_status()->set_status(proto::system::NOTOK);
response->mutable_status()->set_message("Unknown component");
return response;
}
response->mutable_status()->set_status(proto::system::OK);
response->mutable_status()->set_message("OK");
GetExceptionsHelper(request, *response);
return response;
}
unique_ptr<ExceptionLogResponse> TMetricsCollector::GetExceptionsSummary(
const ExceptionLogRequest& request) {
auto response = make_unique<ExceptionLogResponse>();
if (metrics_.find(request.component_name()) == metrics_.end()) {
LOG(ERROR) << "GetExceptionSummary request received for unknown component "
<< request.component_name();
response->mutable_status()->set_status(proto::system::NOTOK);
response->mutable_status()->set_message("Unknown component");
return response;
}
response->mutable_status()->set_status(proto::system::OK);
response->mutable_status()->set_message("OK");
// Owns this pointer.
auto all_exceptions = make_unique<ExceptionLogResponse>();
GetExceptionsHelper(request, *all_exceptions); // Store un aggregated exceptions.
AggregateExceptions(*all_exceptions, *response);
return response;
}
// Aggregate exceptions in all_exceptions and fill up response
// (TODO: Merge aggregating exceptions based on classname and based on stack_trace (GetExceptions)
// into one function which take aggregation as argument. Modify the ExceptionRequest to
// take argument for which aggregation function to use)
void TMetricsCollector::AggregateExceptions(const ExceptionLogResponse& all_exceptions,
ExceptionLogResponse& aggregate_exceptions) {
using std::map;
using std::string;
map<string, unique_ptr<TmanagerExceptionLog>> exception_summary; // Owns exception log pointer.
for (int i = 0; i < all_exceptions.exceptions_size(); ++i) {
const TmanagerExceptionLog& log = all_exceptions.exceptions(i);
// Get classname by splitting on first colon
const std::string& stack_trace = log.stacktrace();
size_t pos = stack_trace.find_first_of(':');
if (pos != std::string::npos) {
const std::string class_name = stack_trace.substr(0, pos);
if (exception_summary.find(class_name) == exception_summary.end()) {
auto new_exception = make_unique<TmanagerExceptionLog>();
new_exception->CopyFrom(log);
new_exception->set_stacktrace(class_name);
exception_summary[class_name] = std::move(new_exception);
} else {
TmanagerExceptionLog& prev_log = *exception_summary[class_name];
prev_log.set_count(log.count() + prev_log.count());
prev_log.set_lasttime(log.lasttime());
}
}
}
for (auto summary_iter = exception_summary.begin();
summary_iter != exception_summary.end(); ++summary_iter) {
aggregate_exceptions.add_exceptions()->CopyFrom(*(summary_iter->second));
}
}
shared_ptr<TMetricsCollector::ComponentMetrics> TMetricsCollector::GetOrCreateComponentMetrics(
const sp_string& component_name) {
if (metrics_.find(component_name) == metrics_.end()) {
metrics_[component_name] =
std::make_shared<ComponentMetrics>(component_name, nintervals_, interval_);
}
return metrics_[component_name];
}
TMetricsCollector::ComponentMetrics::ComponentMetrics(const sp_string& component_name,
sp_int32 nbuckets, sp_int32 bucket_interval)
: component_name_(component_name), nbuckets_(nbuckets), bucket_interval_(bucket_interval) {}
TMetricsCollector::ComponentMetrics::~ComponentMetrics() {}
void TMetricsCollector::ComponentMetrics::Purge() {
for (auto iter = metrics_.begin(); iter != metrics_.end(); ++iter) {
iter->second->Purge();
}
}
void TMetricsCollector::ComponentMetrics::AddMetricForInstance(
const sp_string& instance_id, const sp_string& name, TManagerMetrics::MetricAggregationType type,
const sp_string& value) {
auto instance_metrics = GetOrCreateInstanceMetrics(instance_id);
instance_metrics->AddMetricWithName(name, type, value);
}
void TMetricsCollector::ComponentMetrics::AddExceptionForInstance(
const sp_string& instance_id, const TmanagerExceptionLog& exception) {
auto instance_metrics = GetOrCreateInstanceMetrics(instance_id);
instance_metrics->AddExceptions(exception);
}
shared_ptr<TMetricsCollector::InstanceMetrics>
TMetricsCollector::ComponentMetrics::GetOrCreateInstanceMetrics(const sp_string& instance_id) {
if (metrics_.find(instance_id) == metrics_.end()) {
metrics_[instance_id] =
std::make_shared<InstanceMetrics>(instance_id, nbuckets_, bucket_interval_);
}
return metrics_[instance_id];
}
void TMetricsCollector::ComponentMetrics::GetMetrics(const MetricRequest& _request,
sp_int64 start_time, sp_int64 end_time,
MetricResponse& _response) {
if (_request.instance_id_size() == 0) {
// This means that all instances need to be returned
for (auto iter = metrics_.begin(); iter != metrics_.end(); ++iter) {
iter->second->GetMetrics(_request, start_time, end_time, _response);
if (_response.status().status() != proto::system::OK) {
return;
}
}
} else {
for (sp_int32 i = 0; i < _request.instance_id_size(); ++i) {
const sp_string& id = _request.instance_id(i);
if (metrics_.find(id) == metrics_.end()) {
LOG(ERROR) << "GetMetrics request received for unknown instance_id " << id;
_response.mutable_status()->set_status(proto::system::NOTOK);
return;
} else {
metrics_[id]->GetMetrics(_request, start_time, end_time, _response);
if (_response.status().status() != proto::system::OK) {
return;
}
}
}
}
_response.mutable_status()->set_status(proto::system::OK);
}
void TMetricsCollector::ComponentMetrics::GetExceptionsForInstance(const sp_string& instance_id,
ExceptionLogResponse& response) {
if (metrics_.find(instance_id) != metrics_.end()) {
metrics_[instance_id]->GetExceptionLog(response);
}
}
void TMetricsCollector::ComponentMetrics::GetAllExceptions(ExceptionLogResponse& response) {
for (auto iter = metrics_.begin(); iter != metrics_.end(); ++iter) {
iter->second->GetExceptionLog(response);
}
}
TMetricsCollector::InstanceMetrics::InstanceMetrics(const sp_string& instance_id, sp_int32 nbuckets,
sp_int32 bucket_interval)
: instance_id_(instance_id), nbuckets_(nbuckets), bucket_interval_(bucket_interval) {}
TMetricsCollector::InstanceMetrics::~InstanceMetrics() {}
void TMetricsCollector::InstanceMetrics::Purge() {
for (auto iter = metrics_.begin(); iter != metrics_.end(); ++iter) {
iter->second->Purge();
}
}
void TMetricsCollector::InstanceMetrics::AddMetricWithName(
const sp_string& name, common::TManagerMetrics::MetricAggregationType type,
const sp_string& value) {
auto metric_data = GetOrCreateMetric(name, type);
metric_data->AddValueToMetric(value);
}
// Creates a copy of exception and takes ownership of the pointer.
void TMetricsCollector::InstanceMetrics::AddExceptions(const TmanagerExceptionLog& exception) {
// TODO(kramasamy): Aggregate exceptions across minutely buckets. Try to avoid duplication of
// hash-fuction
// used to aggregate in heron-worker.
auto new_exception = make_unique<TmanagerExceptionLog>();
new_exception->CopyFrom(exception);
exceptions_.push_back(std::move(new_exception));
sp_uint32 max_exception = config::HeronInternalsConfigReader::Instance()
->GetHeronTmanagerMetricsCollectorMaximumException();
while (exceptions_.size() > max_exception) {
exceptions_.pop_front();
}
}
shared_ptr<TMetricsCollector::Metric> TMetricsCollector::InstanceMetrics::GetOrCreateMetric(
const sp_string& name, TManagerMetrics::MetricAggregationType type) {
if (metrics_.find(name) == metrics_.end()) {
metrics_[name] = std::make_shared<Metric>(name, type, nbuckets_, bucket_interval_);
}
return metrics_[name];
}
void TMetricsCollector::InstanceMetrics::GetMetrics(const MetricRequest& request,
sp_int64 start_time, sp_int64 end_time,
MetricResponse& response) {
MetricResponse::TaskMetric* m = response.add_metric();
m->set_instance_id(instance_id_);
for (sp_int32 i = 0; i < request.metric_size(); ++i) {
const sp_string& id = request.metric(i);
if (metrics_.find(id) != metrics_.end()) {
metrics_[id]->GetMetrics(request.minutely(), start_time, end_time, m->add_metric());
}
}
}
void TMetricsCollector::InstanceMetrics::GetExceptionLog(ExceptionLogResponse& response) {
for (auto ex_iter = exceptions_.begin(); ex_iter != exceptions_.end(); ++ex_iter) {
response.add_exceptions()->CopyFrom(*(*ex_iter));
}
}
TMetricsCollector::Metric::Metric(const sp_string& name,
common::TManagerMetrics::MetricAggregationType type,
sp_int32 nbuckets, sp_int32 bucket_interval)
: name_(name),
metric_type_(type),
all_time_cumulative_(0),
all_time_nitems_(0),
bucket_interval_(bucket_interval) {
for (sp_int32 i = 0; i < nbuckets; ++i) {
data_.push_back(std::move(make_unique<TimeBucket>(bucket_interval_)));
}
}
TMetricsCollector::Metric::~Metric() {}
void TMetricsCollector::Metric::Purge() {
data_.pop_back();
data_.push_front(std::move(make_unique<TimeBucket>(bucket_interval_)));
}
void TMetricsCollector::Metric::AddValueToMetric(const sp_string& _value) {
if (metric_type_ == common::TManagerMetrics::LAST) {
// Just keep one value per time bucket
data_.front()->data_.clear();
data_.front()->data_.push_front(_value);
// Do thsi for the cumulative as well
all_time_cumulative_ = strtod(_value.c_str(), NULL);
all_time_nitems_ = 1;
} else {
data_.front()->data_.push_front(_value);
all_time_cumulative_ += strtod(_value.c_str(), NULL);
all_time_nitems_++;
}
}
void TMetricsCollector::Metric::GetMetrics(bool minutely, sp_int64 start_time, sp_int64 end_time,
IndividualMetric* _response) {
_response->set_name(name_);
if (minutely) {
// we need minutely data
for (auto iter = data_.begin(); iter != data_.end(); ++iter) {
TimeBucket& bucket = **iter;
// Does this time bucket have overlap with needed range
if (bucket.overlaps(start_time, end_time)) {
IntervalValue* val = _response->add_interval_values();
val->mutable_interval()->set_start(bucket.start_time_);
val->mutable_interval()->set_end(bucket.end_time_);
sp_double64 result = bucket.aggregate();
if (metric_type_ == common::TManagerMetrics::SUM) {
val->set_value(std::to_string(result));
} else if (metric_type_ == common::TManagerMetrics::AVG) {
sp_double64 avg = result / bucket.count();
val->set_value(std::to_string(avg));
} else if (metric_type_ == common::TManagerMetrics::LAST) {
val->set_value(std::to_string(result));
} else {
LOG(FATAL) << "Unknown metric type " << metric_type_;
}
}
// The timebuckets are reverse chronologically arranged
if (start_time > bucket.end_time_) break;
}
} else {
// We don't need minutely data
sp_double64 result = 0;
if (start_time <= 0) {
// We want cumulative metrics
if (metric_type_ == common::TManagerMetrics::SUM) {
result = all_time_cumulative_;
} else if (metric_type_ == common::TManagerMetrics::AVG) {
result = all_time_cumulative_ / all_time_nitems_;
} else if (metric_type_ == common::TManagerMetrics::LAST) {
result = all_time_cumulative_;
} else {
LOG(FATAL) << "Uknown metric type " << metric_type_;
}
} else {
// we want only for a specific interval
sp_int64 total_items = 0;
sp_double64 total_count = 0;
for (auto iter = data_.begin(); iter != data_.end(); ++iter) {
TimeBucket& bucket = **iter;
// Does this time bucket have overlap with needed range
if (bucket.overlaps(start_time, end_time)) {
total_count += bucket.aggregate();
total_items += bucket.count();
if (metric_type_ == TManagerMetrics::LAST) break;
}
// The timebuckets are reverse chronologically arranged
if (start_time > bucket.end_time_) break;
}
if (metric_type_ == TManagerMetrics::SUM) {
result = total_count;
} else if (metric_type_ == TManagerMetrics::AVG) {
result = total_count / total_items;
} else if (metric_type_ == TManagerMetrics::LAST) {
result = total_count;
} else {
LOG(FATAL) << "Uknown metric type " << metric_type_;
}
}
_response->set_value(std::to_string(result));
}
}
} // namespace tmanager
} // namespace heron