blob: cc0142bdf65f740cc4b62c4727acfee3943df8f7 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "hotspot_partition_calculator.h"
#include <math.h>
#include <dsn/dist/fmt_logging.h>
#include <dsn/cpp/serialization_helper/dsn.layer2_types.h>
#include <dsn/utility/flags.h>
#include <dsn/utility/error_code.h>
#include <dsn/utility/fail_point.h>
#include <dsn/dist/replication/duplication_common.h>
namespace pegasus {
namespace server {
DSN_DEFINE_int64("pegasus.collector",
max_hotspot_store_size,
100,
"the max count of historical data "
"stored in calculator, The FIFO "
"queue design is used to "
"eliminate outdated historical "
"data");
DSN_DEFINE_bool("pegasus.collector",
enable_detect_hotkey,
false,
"auto detect hot key in the hot paritition");
DSN_DEFINE_int32("pegasus.collector",
hot_partition_threshold,
3,
"threshold of hotspot partition value, if app.stat.hotspots >= "
"FLAGS_hotpartition_threshold, this partition is a hot partition");
DSN_DEFINE_int32("pegasus.collector",
occurrence_threshold,
100,
"hot paritiotion occurrence times' threshold to send rpc to detect hotkey");
void hotspot_partition_calculator::data_aggregate(const std::vector<row_data> &partition_stats)
{
while (_partitions_stat_histories.size() >= FLAGS_max_hotspot_store_size) {
_partitions_stat_histories.pop_front();
}
std::vector<hotspot_partition_stat> temp;
for (const auto &partition_stat : partition_stats) {
temp.emplace_back(hotspot_partition_stat(partition_stat));
}
_partitions_stat_histories.emplace_back(temp);
}
void hotspot_partition_calculator::init_perf_counter(int partition_count)
{
for (int data_type = 0; data_type <= 1; data_type++) {
for (int i = 0; i < partition_count; i++) {
string partition_desc =
_app_name + '.' +
(data_type == partition_qps_type::WRITE_HOTSPOT_DATA ? "write." : "read.") +
std::to_string(i);
std::string counter_name = fmt::format("app.stat.hotspots@{}", partition_desc);
std::string counter_desc =
fmt::format("statistic the hotspots of app {}", partition_desc);
_hot_points[i][data_type].init_app_counter(
"app.pegasus", counter_name.c_str(), COUNTER_TYPE_NUMBER, counter_desc.c_str());
}
}
}
void hotspot_partition_calculator::stat_histories_analyse(int data_type,
std::vector<int> &hot_points)
{
double table_qps_sum = 0, standard_deviation = 0, table_qps_avg = 0;
int sample_count = 0;
for (const auto &one_partition_stat_histories : _partitions_stat_histories) {
for (const auto &partition_stat : one_partition_stat_histories) {
table_qps_sum += partition_stat.total_qps[data_type];
sample_count++;
}
}
if (sample_count <= 1) {
ddebug("_partitions_stat_histories size <= 1, not enough data for calculation");
return;
}
table_qps_avg = table_qps_sum / sample_count;
for (const auto &one_partition_stat_histories : _partitions_stat_histories) {
for (const auto &partition_stat : one_partition_stat_histories) {
standard_deviation += pow((partition_stat.total_qps[data_type] - table_qps_avg), 2);
}
}
standard_deviation = sqrt(standard_deviation / (sample_count - 1));
const auto &anly_data = _partitions_stat_histories.back();
int hot_point_size = _hot_points.size();
hot_points.resize(hot_point_size);
for (int i = 0; i < hot_point_size; i++) {
double hot_point = 0;
if (standard_deviation != 0) {
hot_point = (anly_data[i].total_qps[data_type] - table_qps_avg) / standard_deviation;
}
// perf_counter->set can only be unsigned uint64_t
// use ceil to guarantee conversion results
hot_points[i] = ceil(std::max(hot_point, double(0)));
}
}
void hotspot_partition_calculator::update_hot_point(int data_type, std::vector<int> &hot_points)
{
dcheck_eq(_hot_points.size(), hot_points.size());
int size = hot_points.size();
for (int i = 0; i < size; i++) {
_hot_points[i][data_type].get()->set(hot_points[i]);
}
}
void hotspot_partition_calculator::data_analyse()
{
dassert(_partitions_stat_histories.back().size() == _hot_points.size(),
"The number of partitions in this table has changed, and hotspot analysis cannot be "
"performed,in %s",
_app_name.c_str());
for (int data_type = 0; data_type <= 1; data_type++) {
// data_type 0: READ_HOTSPOT_DATA; 1: WRITE_HOTSPOT_DATA
std::vector<int> hot_points;
stat_histories_analyse(data_type, hot_points);
update_hot_point(data_type, hot_points);
}
if (!FLAGS_enable_detect_hotkey) {
return;
}
for (int data_type = 0; data_type <= 1; data_type++) {
detect_hotkey_in_hotpartition(data_type);
}
}
void hotspot_partition_calculator::detect_hotkey_in_hotpartition(int data_type)
{
for (int index = 0; index < _hot_points.size(); index++) {
if (_hot_points[index][data_type].get()->get_value() >= FLAGS_hot_partition_threshold) {
if (++_hotpartition_counter[index][data_type] >= FLAGS_occurrence_threshold) {
derror_f("Find a {} hot partition {}.{}",
(data_type == partition_qps_type::READ_HOTSPOT_DATA ? "read" : "write"),
_app_name,
index);
send_detect_hotkey_request(_app_name,
index,
(data_type == dsn::replication::hotkey_type::type::READ)
? dsn::replication::hotkey_type::type::READ
: dsn::replication::hotkey_type::type::WRITE,
dsn::replication::detect_action::type::START);
}
} else {
_hotpartition_counter[index][data_type] =
std::max(_hotpartition_counter[index][data_type] - 1, 0);
}
}
}
/*static*/ void hotspot_partition_calculator::send_detect_hotkey_request(
const std::string &app_name,
const uint64_t partition_index,
const dsn::replication::hotkey_type::type hotkey_type,
const dsn::replication::detect_action::type action)
{
FAIL_POINT_INJECT_F("send_detect_hotkey_request", [](dsn::string_view) {});
int app_id;
int partition_count;
std::vector<dsn::partition_configuration> partitions;
_shell_context->ddl_client->list_app(app_name, app_id, partition_count, partitions);
auto target_address = partitions[partition_index].primary;
dsn::replication::detect_hotkey_response resp;
dsn::replication::detect_hotkey_request req;
req.type = hotkey_type;
req.action = action;
auto error = _shell_context->ddl_client->detect_hotkey(target_address, req, resp);
ddebug_f("{} {} hotkey detection in {}.{}, server address: {}",
(action == dsn::replication::detect_action::STOP) ? "Stop" : "Start",
(hotkey_type == dsn::replication::hotkey_type::WRITE) ? "write" : "read",
app_name,
partition_index,
target_address.to_string());
if (error != dsn::ERR_OK) {
derror_f("Hotkey detect rpc sending failed, in {}.{}, error_hint:{}",
app_name,
partition_index,
error.to_string());
}
if (resp.err != dsn::ERR_OK) {
derror_f("Hotkey detect rpc executing failed, in {}.{}, error_hint:{} {}",
app_name,
partition_index,
resp.err,
resp.err_hint);
}
}
} // namespace server
} // namespace pegasus