blob: f19bf956d1627d29265f15e21f732e24eb0053c8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "available_detector.h"
#include <algorithm>
#include <dsn/dist/replication/duplication_common.h>
#include <dsn/utils/time_utils.h>
#include <iomanip>
#include <sstream>
#include "base/pegasus_key_schema.h"
#include "result_writer.h"
namespace pegasus {
namespace server {
DEFINE_TASK_CODE(LPC_DETECT_AVAILABLE, TASK_PRIORITY_COMMON, ::dsn::THREAD_POOL_DEFAULT)
available_detector::available_detector()
: _client(nullptr),
_ddl_client(nullptr),
_app_id(-1),
_partition_count(0),
_recent_day_detect_times(0),
_recent_day_fail_times(0),
_recent_hour_detect_times(0),
_recent_hour_fail_times(0),
_recent_minute_detect_times(0),
_recent_minute_fail_times(0)
{
// initialize information for available_detector.
_cluster_name = dsn::replication::get_current_cluster_name();
_app_name = dsn_config_get_value_string(
"pegasus.collector", "available_detect_app", "", "available detector app name");
dassert(_app_name.size() > 0, "");
_alert_script_dir = dsn_config_get_value_string("pegasus.collector",
"available_detect_alert_script_dir",
".",
"available detect alert script dir");
dassert(_alert_script_dir.size() > 0, "");
_alert_email_address = dsn_config_get_value_string(
"pegasus.collector",
"available_detect_alert_email_address",
"",
"available detect alert email address, empty means not send email");
_meta_list.clear();
dsn::replication::replica_helper::load_meta_servers(_meta_list);
dassert(_meta_list.size() > 0, "");
_detect_interval_seconds =
(uint32_t)dsn_config_get_value_uint64("pegasus.collector",
"available_detect_interval_seconds",
3, // default value 3s
"detect interval seconds");
_alert_fail_count = (int32_t)dsn_config_get_value_uint64("pegasus.collector",
"available_detect_alert_fail_count",
30,
"available detect alert fail count");
_detect_timeout =
(uint32_t)dsn_config_get_value_uint64("pegasus.collector",
"available_detect_timeout",
1000, // unit is millisecond,default is 1s = 1000ms
"available detect timeout");
// initialize the _client.
if (!pegasus_client_factory::initialize(nullptr)) {
dassert(false, "Initialize the pegasus client failed");
}
_client = pegasus_client_factory::get_client(_cluster_name.c_str(), _app_name.c_str());
dassert(_client != nullptr, "Initialize the _client failed");
_result_writer = dsn::make_unique<result_writer>(_client);
_ddl_client.reset(new replication_ddl_client(_meta_list));
dassert(_ddl_client != nullptr, "Initialize the _ddl_client failed");
if (!_alert_email_address.empty()) {
_send_alert_email_cmd = "cd " + _alert_script_dir + "; bash sendmail.sh alert " +
_alert_email_address + " " + _cluster_name + " " + _app_name + " ";
_send_availability_info_email_cmd = "cd " + _alert_script_dir +
"; bash sendmail.sh availability_info " +
_alert_email_address + " " + _cluster_name + " ";
}
_pfc_detect_times_day.init_app_counter("app.pegasus",
"cluster.available.detect.times.day",
COUNTER_TYPE_NUMBER,
"statistic the available detect total times every day");
_pfc_fail_times_day.init_app_counter("app.pegasus",
"cluster.available.fail.times.day",
COUNTER_TYPE_NUMBER,
"statistic the available detect fail times every day");
_pfc_available_day.init_app_counter("app.pegasus",
"cluster.available.day",
COUNTER_TYPE_NUMBER,
"statistic the availability of cluster every day");
_pfc_detect_times_hour.init_app_counter(
"app.pegasus",
"cluster.available.detect.times.hour",
COUNTER_TYPE_NUMBER,
"statistic the available detect total times every hour");
_pfc_fail_times_hour.init_app_counter("app.pegasus",
"cluster.available.fail.times.hour",
COUNTER_TYPE_NUMBER,
"statistic the available detect fail times every hour");
_pfc_available_hour.init_app_counter("app.pegasus",
"cluster.available.hour",
COUNTER_TYPE_NUMBER,
"statistic the availability of cluster every hour");
_pfc_detect_times_minute.init_app_counter(
"app.pegasus",
"cluster.available.detect.times.minute",
COUNTER_TYPE_NUMBER,
"statistic the available detect total times every minute");
_pfc_fail_times_minute.init_app_counter(
"app.pegasus",
"cluster.available.fail.times.minute",
COUNTER_TYPE_NUMBER,
"statistic the available detect fail times every minute");
_pfc_available_minute.init_app_counter("app.pegasus",
"cluster.available.minute",
COUNTER_TYPE_NUMBER,
"statistic the availability of cluster every minute");
_pfc_available_day->set(1000000); // init to 100%
_pfc_available_hour->set(1000000); // init to 100%
_pfc_available_minute->set(1000000); // init to 100%
}
available_detector::~available_detector() = default;
void available_detector::start()
{
// available detector delay 60s to wait the pegasus finishing the initialization.
_detect_timer = ::dsn::tasking::enqueue(LPC_DETECT_AVAILABLE,
&_tracker,
std::bind(&available_detector::detect_available, this),
0,
std::chrono::minutes(1));
report_availability_info();
}
void available_detector::stop() { _tracker.cancel_outstanding_tasks(); }
void available_detector::detect_available()
{
if (!generate_hash_keys()) {
derror("initialize hash_keys failed, do not detect available, retry after 60 seconds");
_detect_timer =
::dsn::tasking::enqueue(LPC_DETECT_AVAILABLE,
&_tracker,
std::bind(&available_detector::detect_available, this),
0,
std::chrono::minutes(1));
return;
}
_detect_tasks.clear();
_detect_tasks.resize(_partition_count);
_fail_count.resize(_partition_count);
// initialize detect_times to zero, this is used to generate distinct value.
for (auto i = 0; i < _partition_count; i++) {
_fail_count[i].reset(new std::atomic<int32_t>(0));
_fail_count[i]->store(0);
auto call_func = std::bind(&available_detector::on_detect, this, i);
_detect_tasks[i] =
::dsn::tasking::enqueue_timer(LPC_DETECT_AVAILABLE,
&_tracker,
std::move(call_func),
std::chrono::seconds(_detect_interval_seconds));
}
}
std::string get_date_to_string(uint64_t now_ms)
{
char buf[50] = {'\0'};
::dsn::utils::time_ms_to_date(now_ms, buf, 50);
return std::string(buf);
}
std::string get_hour_to_string(uint64_t now_ms)
{
char buf[50] = {'\0'};
::dsn::utils::time_ms_to_date_time(now_ms, buf, 50);
return std::string(buf, 13);
}
std::string get_minute_to_string(uint64_t now_ms)
{
char buf[50] = {'\0'};
::dsn::utils::time_ms_to_date_time(now_ms, buf, 50);
return std::string(buf, 16);
}
void available_detector::report_availability_info()
{
uint64_t now_ms = dsn_now_ms();
_old_day = get_date_to_string(now_ms);
_old_hour = get_hour_to_string(now_ms);
_old_minute = get_minute_to_string(now_ms);
auto call_func = [this]() {
uint64_t now_ms = dsn_now_ms();
std::string new_day = get_date_to_string(now_ms);
std::string new_hour = get_hour_to_string(now_ms);
std::string new_minute = get_minute_to_string(now_ms);
if (new_day != _old_day) {
this->on_day_report();
_old_day = new_day;
}
if (new_hour != _old_hour) {
this->on_hour_report();
_old_hour = new_hour;
}
if (new_minute != _old_minute) {
this->on_minute_report();
_old_minute = new_minute;
}
};
_report_task = ::dsn::tasking::enqueue_timer(
LPC_DETECT_AVAILABLE,
&_tracker,
std::move(call_func),
std::chrono::minutes(1),
0,
std::chrono::minutes(2) // waiting for pegasus finishing start.
);
}
bool available_detector::generate_hash_keys()
{
// get app_id and partition_count.
auto err = _ddl_client->list_app(_app_name, _app_id, _partition_count, partitions);
if (err == ::dsn::ERR_OK && _app_id >= 0) {
_hash_keys.clear();
for (auto pidx = 0; pidx < _partition_count; pidx++) {
std::string base_key = "detect_available_p" + std::to_string(pidx) + "_";
// loop to generate the key that can fall into pidx-th partition.
int32_t base_idx = 0;
while (true) {
++base_idx;
std::string key = base_key + std::to_string(base_idx);
::dsn::blob my_hash_key(key.c_str(), 0, key.length());
::dsn::blob tmp_key;
pegasus_generate_key(tmp_key, my_hash_key, ::dsn::blob());
if (pegasus_key_hash(tmp_key) % _partition_count == pidx) {
_hash_keys.emplace_back(key);
break;
}
}
}
return true;
} else {
dwarn("Get partition count of table '%s' on cluster '%s' failed",
_app_name.c_str(),
_cluster_name.c_str());
return false;
}
}
void available_detector::on_detect(int32_t idx)
{
if (idx == 0) {
ddebug("detecting table[%s] with app_id[%d] and partition_count[%d] on cluster[%s], "
"recent_day_detect_times(%" PRId64 "), recent_day_fail_times(%" PRId64 "), "
"recent_hour_detect_times(%" PRId64 "), recent_hour_fail_times(%" PRId64 ") "
"recent_minute_detect_times(%" PRId64 "), recent_minute_fail_times(%" PRId64 ")",
_app_name.c_str(),
_app_id,
_partition_count,
_cluster_name.c_str(),
_recent_day_detect_times.load(),
_recent_day_fail_times.load(),
_recent_hour_detect_times.load(),
_recent_hour_fail_times.load(),
_recent_minute_detect_times.load(),
_recent_minute_fail_times.load());
}
dinfo("available_detector begin to detect partition[%d] of table[%s] with id[%d] on the "
"cluster[%s]",
idx,
_app_name.c_str(),
_app_id,
_cluster_name.c_str());
auto time = dsn_now_ms();
std::string value = "detect_value_" + std::to_string((time / 1000));
_recent_day_detect_times.fetch_add(1);
_recent_hour_detect_times.fetch_add(1);
_recent_minute_detect_times.fetch_add(1);
// define async_get callback function.
auto async_get_callback =
[this, idx](int err, std::string &&_value, pegasus_client::internal_info &&info) {
std::atomic<int> &cnt = (*_fail_count[idx]);
if (err != PERR_OK) {
int prev = cnt.fetch_add(1);
_recent_day_fail_times.fetch_add(1);
_recent_hour_fail_times.fetch_add(1);
_recent_minute_fail_times.fetch_add(1);
derror("async_get partition[%d] fail, fail_count = %d, hash_key = %s, error = %s",
idx,
prev + 1,
_hash_keys[idx].c_str(),
_client->get_error_string(err));
check_and_send_email(&cnt, idx);
} else {
cnt.store(0);
dinfo("async_get partition[%d] ok, hash_key = %s, value = %s",
idx,
_hash_keys[idx].c_str(),
_value.c_str());
}
};
// define async_set callback function.
auto async_set_callback =
[ this, idx, user_async_get_callback = std::move(async_get_callback) ](
int err, pegasus_client::internal_info &&info)
{
std::atomic<int> &cnt = (*_fail_count[idx]);
if (err != PERR_OK) {
int prev = cnt.fetch_add(1);
_recent_day_fail_times.fetch_add(1);
_recent_hour_fail_times.fetch_add(1);
_recent_minute_fail_times.fetch_add(1);
derror("async_set partition[%d] fail, fail_count = %d, hash_key = %s , error = %s",
idx,
prev + 1,
_hash_keys[idx].c_str(),
_client->get_error_string(err));
check_and_send_email(&cnt, idx);
} else {
dinfo("async_set partition[%d] ok, hash_key = %s", idx, _hash_keys[idx].c_str());
_client->async_get(
_hash_keys[idx], "", std::move(user_async_get_callback), _detect_timeout);
}
};
_client->async_set(_hash_keys[idx], "", value, std::move(async_set_callback), _detect_timeout);
}
void available_detector::check_and_send_email(std::atomic<int> *cnt, int32_t idx)
{
bool send_email = false;
if (cnt->load() >= _alert_fail_count) {
::dsn::utils::auto_lock<::dsn::utils::ex_lock_nr> l(_alert_lock);
if (cnt->load() >= _alert_fail_count) {
for (auto i = 0; i < _partition_count; i++) {
std::atomic<int> &c = (*_fail_count[i]);
c.store(0);
}
send_email = true;
}
}
if (send_email) {
ddebug("start to send alert email, partition_index = %d", idx);
if (_send_alert_email_cmd.empty()) {
ddebug("ignore sending alert email because email address is not set, "
"partition_index = %d",
idx);
} else {
int r = system((_send_alert_email_cmd + std::to_string(idx)).c_str());
if (r == 0) {
ddebug("send alert email done, partition_index = %d", idx);
} else {
derror("send alert email failed, partition_index = %d, "
"command_return = %d",
idx,
r);
}
}
}
}
void available_detector::on_day_report()
{
ddebug("start to report on new day, last_day = %s", _old_day.c_str());
int64_t detect_times = _recent_day_detect_times.fetch_and(0);
int64_t fail_times = _recent_day_fail_times.fetch_and(0);
int64_t succ_times = std::max(0L, detect_times - fail_times);
int64_t available = 0;
std::string hash_key("detect_available_day");
std::string sort_key(_old_day);
std::string value("0,0,0");
if (detect_times > 0) {
available = (int64_t)((double)succ_times / detect_times * 1000000);
std::ostringstream oss;
oss << detect_times << ',' << succ_times << ',' << available;
value = oss.str();
}
_pfc_detect_times_day->set(detect_times);
_pfc_fail_times_day->set(fail_times);
_pfc_available_day->set(available);
ddebug("start to send availability email, date = %s", _old_day.c_str());
if (_send_availability_info_email_cmd.empty()) {
ddebug("ignore sending availability email because email address is not set, "
"date = %s, total_detect_times = %u, total_fail_times = %u",
_old_day.c_str(),
detect_times,
fail_times);
} else {
int r = system((_send_availability_info_email_cmd + std::to_string(detect_times) + " " +
std::to_string(fail_times) + " " + _old_day)
.c_str());
if (r == 0) {
ddebug("send availability email done, date = %s, "
"total_detect_times = %u, total_fail_times = %u",
_old_day.c_str(),
detect_times,
fail_times);
} else {
derror("send availability email fail, date = %s, "
"total_detect_times = %u, total_fail_times = %u, command_return = %d",
_old_day.c_str(),
detect_times,
fail_times,
r);
}
}
_result_writer->set_result(hash_key, sort_key, value);
}
void available_detector::on_hour_report()
{
ddebug("start to report on new hour, last_hour = %s", _old_hour.c_str());
int64_t detect_times = _recent_hour_detect_times.fetch_and(0);
int64_t fail_times = _recent_hour_fail_times.fetch_and(0);
int64_t succ_times = std::max(0L, detect_times - fail_times);
int64_t available = 0;
std::string hash_key("detect_available_hour");
std::string sort_key(_old_hour);
std::string value("0,0,0");
if (detect_times > 0) {
available = (int64_t)((double)succ_times / detect_times * 1000000);
std::ostringstream oss;
oss << detect_times << ',' << succ_times << ',' << available;
value = oss.str();
}
_pfc_detect_times_hour->set(detect_times);
_pfc_fail_times_hour->set(fail_times);
_pfc_available_hour->set(available);
_result_writer->set_result(hash_key, sort_key, value);
}
void available_detector::on_minute_report()
{
ddebug("start to report on new minute, last_minute = %s", _old_minute.c_str());
int64_t detect_times = _recent_minute_detect_times.fetch_and(0);
int64_t fail_times = _recent_minute_fail_times.fetch_and(0);
int64_t succ_times = std::max(0L, detect_times - fail_times);
int64_t available = 0;
std::string hash_key("detect_available_minute");
std::string sort_key(_old_minute);
std::string value("0,0,0");
if (detect_times > 0) {
available = (int64_t)((double)succ_times / detect_times * 1000000);
std::ostringstream oss;
oss << detect_times << ',' << succ_times << ',' << available;
value = oss.str();
}
_pfc_detect_times_minute->set(detect_times);
_pfc_fail_times_minute->set(fail_times);
_pfc_available_minute->set(available);
_result_writer->set_result(hash_key, sort_key, value);
}
} // namespace server
} // namespace pegasus