blob: f3a823167d2640a1e6f816037a7b80be55203040 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "pegasus_counter_reporter.h"
#include <regex>
#include <ios>
#include <iomanip>
#include <iostream>
#include <unistd.h>
#include <chrono>
#include <map>
#include <memory>
#include <string>
#include <event2/event.h>
#include <event2/buffer.h>
#include <event2/http.h>
#include <event2/keyvalq_struct.h>
#include <dsn/cpp/service_app.h>
#include <dsn/dist/replication/duplication_common.h>
#include <dsn/dist/fmt_logging.h>
#include <dsn/utility/flags.h>
#include "base/pegasus_utils.h"
#include "pegasus_io_service.h"
using namespace ::dsn;
DSN_DEFINE_uint64("pegasus.server",
perf_counter_update_interval_seconds,
10,
"perf_counter_update_interval_seconds");
DSN_DEFINE_bool("pegasus.server", perf_counter_enable_logging, true, "perf_counter_enable_logging");
DSN_DEFINE_string("pegasus.server", perf_counter_sink, "", "perf_counter_sink");
DSN_DEFINE_uint64("pegasus.server", prometheus_port, 9091, "prometheus exposer port");
DSN_DEFINE_string("pegasus.server", falcon_host, "127.0.0.1", "falcon agent host");
DSN_DEFINE_uint64("pegasus.server", falcon_port, 1988, "falcon agent port");
DSN_DEFINE_string("pegasus.server", falcon_path, "/v1/push", "falcon agent http path");
namespace pegasus {
namespace server {
static std::string get_hostname()
{
char hostname[1024];
if (::gethostname(hostname, sizeof(hostname))) {
return {};
}
return hostname;
}
static void format_metrics_name(std::string &metrics_name)
{
replace(metrics_name.begin(), metrics_name.end(), '@', ':');
replace(metrics_name.begin(), metrics_name.end(), '#', ':');
replace(metrics_name.begin(), metrics_name.end(), '.', '_');
replace(metrics_name.begin(), metrics_name.end(), '*', '_');
replace(metrics_name.begin(), metrics_name.end(), '(', '_');
replace(metrics_name.begin(), metrics_name.end(), ')', '_');
}
static void libevent_log(int severity, const char *msg)
{
dsn_log_level_t level;
if (severity == EVENT_LOG_DEBUG)
level = LOG_LEVEL_INFORMATION;
else if (severity == EVENT_LOG_MSG)
level = LOG_LEVEL_DEBUG;
else if (severity == EVENT_LOG_WARN)
level = LOG_LEVEL_WARNING;
else
level = LOG_LEVEL_ERROR;
dlog(level, msg);
}
pegasus_counter_reporter::pegasus_counter_reporter()
: _local_port(0), _last_report_time_ms(0), _perf_counter_sink(perf_counter_sink_t::INVALID)
{
}
pegasus_counter_reporter::~pegasus_counter_reporter() { stop(); }
void pegasus_counter_reporter::prometheus_initialize()
{
_registry = std::make_shared<prometheus::Registry>();
_exposer =
dsn::make_unique<prometheus::Exposer>(fmt::format("0.0.0.0:{}", FLAGS_prometheus_port));
_exposer->RegisterCollectable(_registry);
ddebug_f("prometheus exposer [0.0.0.0:{}] started", FLAGS_prometheus_port);
}
void pegasus_counter_reporter::falcon_initialize()
{
_falcon_metric.endpoint = _local_host;
_falcon_metric.step = FLAGS_perf_counter_update_interval_seconds;
_falcon_metric.tags = fmt::format(
"service=pegasus,cluster={},job={},port={}", _cluster_name, _app_name, _local_port);
ddebug("falcon initialize: endpoint(%s), tag(%s)",
_falcon_metric.endpoint.c_str(),
_falcon_metric.tags.c_str());
}
void pegasus_counter_reporter::start()
{
::dsn::utils::auto_write_lock l(_lock);
if (_report_timer != nullptr)
return;
rpc_address addr(dsn_primary_address());
char buf[1000];
pegasus::utils::addr2host(addr, buf, 1000);
_local_host = buf;
_local_port = addr.port();
_app_name = dsn::service_app::current_service_app_info().full_name;
_cluster_name = dsn::replication::get_current_cluster_name();
_last_report_time_ms = dsn_now_ms();
if (strcmp("prometheus", FLAGS_perf_counter_sink) == 0) {
_perf_counter_sink = perf_counter_sink_t::PROMETHEUS;
} else if (strcmp("falcon", FLAGS_perf_counter_sink) == 0) {
_perf_counter_sink = perf_counter_sink_t::FALCON;
} else {
_perf_counter_sink = perf_counter_sink_t::INVALID;
}
if (perf_counter_sink_t::FALCON == _perf_counter_sink) {
falcon_initialize();
}
if (perf_counter_sink_t::PROMETHEUS == _perf_counter_sink) {
prometheus_initialize();
}
event_set_log_callback(libevent_log);
_report_timer.reset(new boost::asio::deadline_timer(pegasus_io_service::instance().ios));
_report_timer->expires_from_now(
boost::posix_time::seconds(rand() % FLAGS_perf_counter_update_interval_seconds + 1));
_report_timer->async_wait(std::bind(
&pegasus_counter_reporter::on_report_timer, this, _report_timer, std::placeholders::_1));
}
void pegasus_counter_reporter::stop()
{
::dsn::utils::auto_write_lock l(_lock);
if (_report_timer != nullptr) {
_report_timer->cancel();
}
_exposer = nullptr;
_registry = nullptr;
}
void pegasus_counter_reporter::update_counters_to_falcon(const std::string &result,
int64_t timestamp)
{
ddebug("update counters to falcon with timestamp = %" PRId64, timestamp);
http_post_request(FLAGS_falcon_host,
FLAGS_falcon_port,
FLAGS_falcon_path,
"application/x-www-form-urlencoded",
result);
}
void pegasus_counter_reporter::update()
{
uint64_t now = dsn_now_ms();
int64_t timestamp = now / 1000;
perf_counters::instance().take_snapshot();
if (FLAGS_perf_counter_enable_logging) {
std::stringstream oss;
oss << "logging perf counter(name, type, value):" << std::endl;
oss << std::fixed << std::setprecision(2);
perf_counters::instance().iterate_snapshot(
[&oss](const dsn::perf_counters::counter_snapshot &cs) {
oss << "[" << cs.name << ", " << dsn_counter_type_to_string(cs.type) << ", "
<< cs.value << "]" << std::endl;
});
ddebug("%s", oss.str().c_str());
}
if (perf_counter_sink_t::FALCON == _perf_counter_sink) {
std::stringstream oss;
oss << "[";
bool first_append = true;
_falcon_metric.timestamp = timestamp;
perf_counters::instance().iterate_snapshot(
[&oss, &first_append, this](const dsn::perf_counters::counter_snapshot &cs) {
_falcon_metric.metric = cs.name;
_falcon_metric.value = cs.value;
_falcon_metric.counterType = "GAUGE";
if (!first_append)
oss << ",";
_falcon_metric.encode_json_state(oss);
first_append = false;
});
oss << "]";
update_counters_to_falcon(oss.str(), timestamp);
}
if (perf_counter_sink_t::PROMETHEUS == _perf_counter_sink) {
const std::string hostname = get_hostname();
perf_counters::instance().iterate_snapshot([&hostname, this](
const dsn::perf_counters::counter_snapshot &cs) {
std::string metrics_name = cs.name;
// prometheus metric_name don't support characters like .*()@, it only support ":"
// and "_"
// so change the name to make it all right
format_metrics_name(metrics_name);
// split metric_name like "collector_app_pegasus_app_stat_multi_put_qps:1_0_p999" or
// "collector_app_pegasus_app_stat_multi_put_qps:1_0"
// app[0] = "1" which is the app(app name or app id)
// app[1] = "0" which is the partition_index
// app[2] = "p999" or "" which represent the percent
std::string app[3] = {"", "", ""};
std::list<std::string> lv;
::dsn::utils::split_args(metrics_name.c_str(), lv, ':');
if (lv.size() > 1) {
std::list<std::string> lv1;
::dsn::utils::split_args(lv.back().c_str(), lv1, '_');
int i = 0;
for (auto &v : lv1) {
app[i] = v;
i++;
}
}
/**
* deal with corner case, for example:
* replica*eon.replica*table.level.RPC_RRDB_RRDB_GET.latency(ns)@${table_name}.p999
* in this case, app[0] = app name, app[1] = p999, app[2] = ""
**/
if ("p999" == app[1]) {
app[2] = app[1];
app[1].clear();
}
// create metrics that prometheus support to report data
metrics_name = lv.front() + app[2];
std::map<std::string, prometheus::Family<prometheus::Gauge> *>::iterator it =
_gauge_family_map.find(metrics_name);
if (it == _gauge_family_map.end()) {
auto &add_gauge_family = prometheus::BuildGauge()
.Name(metrics_name)
.Labels({{"service", "pegasus"},
{"host_name", hostname},
{"cluster", _cluster_name},
{"pegasus_job", _app_name},
{"port", std::to_string(_local_port)}})
.Register(*_registry);
it = _gauge_family_map
.insert(std::pair<std::string, prometheus::Family<prometheus::Gauge> *>(
metrics_name, &add_gauge_family))
.first;
}
auto &second_gauge = it->second->Add({{"app", app[0]}, {"partition", app[1]}});
second_gauge.Set(cs.value);
});
}
ddebug("update now_ms(%lld), last_report_time_ms(%lld)", now, _last_report_time_ms);
_last_report_time_ms = now;
}
void pegasus_counter_reporter::http_post_request(const std::string &host,
int32_t port,
const std::string &path,
const std::string &contentType,
const std::string &data)
{
dinfo("start update_request, %s", data.c_str());
struct event_base *base = event_base_new();
struct evhttp_connection *conn = evhttp_connection_base_new(base, nullptr, host.c_str(), port);
struct evhttp_request *req =
evhttp_request_new(pegasus_counter_reporter::http_request_done, base);
evhttp_add_header(req->output_headers, "Host", host.c_str());
evhttp_add_header(req->output_headers, "Content-Type", contentType.c_str());
evbuffer_add(req->output_buffer, data.c_str(), data.size());
evhttp_connection_set_timeout(conn, 1);
evhttp_make_request(conn, req, EVHTTP_REQ_POST, path.c_str());
event_base_dispatch(base);
// clear;
evhttp_connection_free(conn);
event_base_free(base);
}
void pegasus_counter_reporter::http_request_done(struct evhttp_request *req, void *arg)
{
struct event_base *event = (struct event_base *)arg;
if (req == nullptr) {
derror("http post request failed: unknown reason");
} else if (req->response_code == 0) {
derror("http post request failed: connection refused");
} else if (req->response_code == HTTP_OK) {
dinfo("http post request succeed");
} else {
struct evbuffer *buf = evhttp_request_get_input_buffer(req);
size_t len = evbuffer_get_length(buf);
char *tmp = (char *)alloca(len + 1);
memcpy(tmp, evbuffer_pullup(buf, -1), len);
tmp[len] = '\0';
derror("http post request failed: code = %u, code_line = %s, input_buffer = %s",
req->response_code,
req->response_code_line,
tmp);
}
event_base_loopexit(event, 0);
}
void pegasus_counter_reporter::on_report_timer(std::shared_ptr<boost::asio::deadline_timer> timer,
const boost::system::error_code &ec)
{
// NOTICE: the following code is running out of rDSN's tls_context
if (!ec) {
update();
timer->expires_from_now(
boost::posix_time::seconds(FLAGS_perf_counter_update_interval_seconds));
timer->async_wait(std::bind(
&pegasus_counter_reporter::on_report_timer, this, timer, std::placeholders::_1));
} else if (boost::system::errc::operation_canceled != ec) {
dassert(false, "pegasus report timer error!!!");
}
}
} // namespace server
} // namespace pegasus