blob: 5aa1078573c79b729b43ce0580173a18f25f7fb6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef __TMETRICS_COLLECTOR_H_
#define __TMETRICS_COLLECTOR_H_
#include <map>
#include <list>
#include <string>
#include "basics/callback.h"
#include "basics/sptypes.h"
#include "network/event_loop.h"
#include "proto/tmanager.pb.h"
#include "proto/topology.pb.h"
#include "metrics/tmanager-metrics.h"
namespace heron {
namespace tmanager {
using std::unique_ptr;
using std::shared_ptr;
// Helper class to manage aggregation and and serving of metrics. Metrics are logically stored as a
// component_name -> {instance_id ->value}n .
// TODO(kramasamy): Store metrics persistently to prevent against crashes.
class TMetricsCollector {
public:
// _max_interval is how far along we keep individual metric blobs.
TMetricsCollector(sp_int32 _max_interval, std::shared_ptr<EventLoop> eventLoop,
const std::string& metrics_sinks_yaml);
// Deletes all stored ComponentMetrics.
virtual ~TMetricsCollector();
// Initiated on recieving a new _metrics from metricsmanager. Will initiate appropriate calls
// to add metrics/exception stored in '_metrics' to the respective components.
void AddMetric(const proto::tmanager::PublishMetrics& _metrics);
// Returns a new response to fetch metrics. The request gets propagated to Component's and
// Instance's get metrics. Doesn't own Response.
unique_ptr<proto::tmanager::MetricResponse> GetMetrics(
const proto::tmanager::MetricRequest& _request,
const proto::api::Topology& _topology);
// Returns response for fetching exceptions. Doesn't own response.
unique_ptr<proto::tmanager::ExceptionLogResponse> GetExceptions(
const proto::tmanager::ExceptionLogRequest& request);
// Returns exception summary response. Doesn't own response.
unique_ptr<proto::tmanager::ExceptionLogResponse> GetExceptionsSummary(
const proto::tmanager::ExceptionLogRequest& request);
private:
// Fetches exceptions for ExceptionLogRequest. Save the returned exception in
// 'all_exceptions'.
// Doesn't own 'all_exceptions' pointer
void GetExceptionsHelper(const proto::tmanager::ExceptionLogRequest& request,
proto::tmanager::ExceptionLogResponse& all_excepions);
// Aggregate exceptions from 'all_exceptions' to 'aggregate_exceptions'.
// Doesn't own 'aggregate_exceptions'.
void AggregateExceptions(const proto::tmanager::ExceptionLogResponse& all_exceptions,
proto::tmanager::ExceptionLogResponse& aggregate_exceptions);
// Add metrics for 'component_name'
void AddMetricsForComponent(const sp_string& component_name,
const proto::tmanager::MetricDatum& metrics_data);
// Add exception logs for 'component_name'
void AddExceptionsForComponent(const sp_string& component_name,
const proto::tmanager::TmanagerExceptionLog& exception_log);
// Clean all metrics.
void Purge(EventLoop::Status _status);
// Timeseries of metrics.
struct TimeBucket {
// A list of metrics each accumulated inside the time
// that this bucket represents
std::list<sp_string> data_;
// Whats the start and end time that this TimeBucket contains metrics for
sp_int32 start_time_;
sp_int32 end_time_;
explicit TimeBucket(sp_int32 bucket_interval) {
start_time_ = time(NULL);
end_time_ = start_time_ + bucket_interval;
}
bool overlaps(sp_int64 start_time, sp_int64 end_time) {
return start_time_ <= end_time && start_time <= end_time_;
}
sp_double64 aggregate() {
if (data_.empty()) {
return 0;
} else {
sp_double64 total = 0;
for (std::list<sp_string>::iterator it = data_.begin(); it != data_.end(); ++it) {
total += strtod(it->c_str(), NULL);
}
return total;
}
}
sp_int64 count() { return data_.size(); }
};
// Data structure to store metrics. A metric is a Time series of data.
// TODO(kramasamy): Use proto to store this data structure.
class Metric {
public:
// TODO(kramasamy): Add ctor for default UNKNOWN type and give a set type function.
Metric(const sp_string& name, common::TManagerMetrics::MetricAggregationType type,
sp_int32 nbuckets, sp_int32 bucket_interval);
// Deletes all TimeBucket.
virtual ~Metric();
void Purge();
// Add a new value to the end of 'data_' extending the time series.
void AddValueToMetric(const sp_string& value);
// Return past '_nbuckets' value for this metric.
void GetMetrics(bool minutely, sp_int64 start_time, sp_int64 end_time,
proto::tmanager::MetricResponse::IndividualMetric* response);
private:
sp_string name_;
// Time series. data_ will be ordered by their time of arrival.
std::list<unique_ptr<TimeBucket>> data_;
// Type of metric. This can be SUM or AVG. It specify how to aggregate these metrics for
// display.
common::TManagerMetrics::MetricAggregationType metric_type_;
sp_double64 all_time_cumulative_;
sp_int64 all_time_nitems_;
sp_int32 bucket_interval_;
};
// Most granualar metrics/exception store level. This store exception and metrics
// associated with an instance.
class InstanceMetrics {
public:
// ctor. '_instance_id' is the id generated by heron. '_nbuckets' number of metrics buckets
// stored for instances belonging to this component.
InstanceMetrics(const sp_string& instance_id, sp_int32 nbuckets, sp_int32 bucket_interval);
// dtor
virtual ~InstanceMetrics();
// Clear old metrics associated with this instance.
void Purge();
// Add metrics with name '_name' of type '_type' and value _value.
void AddMetricWithName(const sp_string& name,
common::TManagerMetrics::MetricAggregationType type,
const sp_string& value);
// Add TmanagerExceptionLog to the list of exceptions for this instance_id.
void AddExceptions(const proto::tmanager::TmanagerExceptionLog& exception);
// Returns the metric metrics. Doesn't own _response.
void GetMetrics(const proto::tmanager::MetricRequest& request, sp_int64 start_time,
sp_int64 end_time, proto::tmanager::MetricResponse& response);
// Fills response for fetching exceptions. Doesn't own response.
void GetExceptionLog(proto::tmanager::ExceptionLogResponse& response);
private:
// Create or return existing Metric. Retains ownership of Metric object returned.
shared_ptr<Metric> GetOrCreateMetric(const sp_string& name,
common::TManagerMetrics::MetricAggregationType type);
sp_string instance_id_;
sp_int32 nbuckets_;
sp_int32 bucket_interval_;
// map between metric name and its values
std::map<sp_string, shared_ptr<Metric>> metrics_;
// list of exceptions
std::list<unique_ptr<proto::tmanager::TmanagerExceptionLog>> exceptions_;
};
// Component level metrics. A component metrics is a map storing metrics for each of its
// instance as 'InstanceMetrics'.
class ComponentMetrics {
public:
// ctor. '_component_name' is the user supplied name given to the spout/bolt. '_nbuckets' is
// number of buckets stored for this component.
ComponentMetrics(const sp_string& component_name, sp_int32 nbuckets, sp_int32 bucket_interval);
// dtor
virtual ~ComponentMetrics();
// Remove old metrics and exception associated with this spout/bolt component.
void Purge();
// Add metrics for an Instance 'instance_id' of this spout/bolt component.
void AddMetricForInstance(const sp_string& instance_id, const sp_string& name,
common::TManagerMetrics::MetricAggregationType type,
const sp_string& value);
// Add exception for an Instance 'instance_id' of this spout/bolt component.
void AddExceptionForInstance(const sp_string& instance_id,
const proto::tmanager::TmanagerExceptionLog& exception);
// Request aggregated metrics for this component for the '_nbucket' interval.
// Doesn't own '_response' object.
void GetMetrics(const proto::tmanager::MetricRequest& request, sp_int64 start_time,
sp_int64 end_time, proto::tmanager::MetricResponse& response);
// Returns response for fetching exceptions. Doesn't own response.
void GetExceptionsForInstance(const sp_string& instance_id,
proto::tmanager::ExceptionLogResponse& response);
void GetAllExceptions(proto::tmanager::ExceptionLogResponse& response);
private:
// Create or return existing mutable InstanceMetrics associated with 'instance_id'. This
// method doesn't verify if the instance_id is valid fof the component.
// Doesn't transfer ownership of returned InstanceMetrics.
shared_ptr<InstanceMetrics> GetOrCreateInstanceMetrics(const sp_string& instance_id);
sp_string component_name_;
sp_int32 nbuckets_;
sp_int32 bucket_interval_;
// map between instance id and its set of metrics
std::map<sp_string, shared_ptr<InstanceMetrics>> metrics_;
};
// Create or return existing mutable ComponentMetrics associated with 'component_name'.
// Doesn't transfer ownership of returned ComponentMetrics
shared_ptr<ComponentMetrics> GetOrCreateComponentMetrics(const sp_string& component_name);
// map of component name to its metrics
std::map<sp_string, shared_ptr<ComponentMetrics>> metrics_;
sp_int32 max_interval_;
sp_int32 nintervals_;
sp_int32 interval_;
std::shared_ptr<EventLoop> eventLoop_;
std::string metrics_sinks_yaml_;
std::unique_ptr<common::TManagerMetrics> tmetrics_info_;
time_t start_time_;
};
} // namespace tmanager
} // namespace heron
#endif