blob: 839092d0197a990114f458d11fcdae2e58bcc409 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "hotspot_partition_calculator.h"
#include <algorithm>
#include <math.h>
#include <dsn/dist/fmt_logging.h>
#include <dsn/utility/flags.h>
#include <dsn/tool-api/rpc_address.h>
#include <dsn/tool-api/group_address.h>
#include <dsn/utility/error_code.h>
#include <rrdb/rrdb_types.h>
#include <dsn/dist/replication/duplication_common.h>
#include <dsn/tool-api/task_tracker.h>
#include "pegasus_read_service.h"
namespace pegasus {
namespace server {
DSN_DEFINE_int64("pegasus.collector",
max_hotspot_store_size,
100,
"the max count of historical data "
"stored in calculator, The FIFO "
"queue design is used to "
"eliminate outdated historical "
"data");
void hotspot_partition_calculator::data_aggregate(const std::vector<row_data> &partitions)
{
while (_partition_stat_histories.size() > FLAGS_max_hotspot_store_size - 1) {
_partition_stat_histories.pop();
}
std::vector<hotspot_partition_data> temp(partitions.size());
// TODO refactor the data structure
for (int i = 0; i < partitions.size(); i++) {
temp[i] = std::move(hotspot_partition_data(partitions[i]));
}
_partition_stat_histories.emplace(temp);
}
void hotspot_partition_calculator::init_perf_counter(int partition_count)
{
std::string counter_name;
std::string counter_desc;
for (int i = 0; i < partition_count; i++) {
string partition_desc = _app_name + '.' + std::to_string(i);
counter_name = fmt::format("app.stat.hotspots@{}", partition_desc);
counter_desc = fmt::format("statistic the hotspots of app {}", partition_desc);
_hot_points[i].init_app_counter(
"app.pegasus", counter_name.c_str(), COUNTER_TYPE_NUMBER, counter_desc.c_str());
}
}
void hotspot_partition_calculator::data_analyse()
{
dassert(_partition_stat_histories.back().size() == _hot_points.size(),
"partition counts error, please check");
std::vector<double> data_samples;
data_samples.reserve(_partition_stat_histories.size() * _hot_points.size());
auto temp_data = _partition_stat_histories;
double table_qps_sum = 0, standard_deviation = 0, table_qps_avg = 0;
int sample_count = 0;
while (!temp_data.empty()) {
for (const auto &partition_data : temp_data.front()) {
if (partition_data.total_qps - 1.00 > 0) {
data_samples.push_back(partition_data.total_qps);
table_qps_sum += partition_data.total_qps;
sample_count++;
}
}
temp_data.pop();
}
if (sample_count == 0) {
ddebug("_partition_stat_histories size == 0");
return;
}
table_qps_avg = table_qps_sum / sample_count;
for (const auto &data_sample : data_samples) {
standard_deviation += pow((data_sample - table_qps_avg), 2);
}
standard_deviation = sqrt(standard_deviation / sample_count);
const auto &anly_data = _partition_stat_histories.back();
for (int i = 0; i < _hot_points.size(); i++) {
double hot_point = (anly_data[i].total_qps - table_qps_avg) / standard_deviation;
// perf_counter->set can only be unsigned __int64
// use ceil to guarantee conversion results
hot_point = ceil(std::max(hot_point, double(0)));
_hot_points[i]->set(hot_point);
}
}
// TODO:(TangYanzhao) call this function to start hotkey detection
/*static*/ void hotspot_partition_calculator::send_hotkey_detect_request(
const std::string &app_name,
const uint64_t partition_index,
const dsn::apps::hotkey_type::type hotkey_type,
const dsn::apps::hotkey_detect_action::type action)
{
auto request = std::make_unique<dsn::apps::hotkey_detect_request>();
request->type = hotkey_type;
request->action = action;
ddebug_f("{} {} hotkey detection in {}.{}",
(action == dsn::apps::hotkey_detect_action::STOP) ? "Stop" : "Start",
(hotkey_type == dsn::apps::hotkey_type::WRITE) ? "write" : "read",
app_name,
partition_index);
dsn::rpc_address meta_server;
meta_server.assign_group("meta-servers");
std::vector<dsn::rpc_address> meta_servers;
replica_helper::load_meta_servers(meta_servers);
for (const auto &address : meta_servers) {
meta_server.group_address()->add(address);
}
auto cluster_name = dsn::replication::get_current_cluster_name();
auto resolver = partition_resolver::get_resolver(cluster_name, meta_servers, app_name.c_str());
dsn::task_tracker tracker;
detect_hotkey_rpc rpc(
std::move(request), RPC_DETECT_HOTKEY, std::chrono::seconds(10), partition_index);
rpc.call(resolver,
&tracker,
[app_name, partition_index](dsn::error_code error) {
if (error != dsn::ERR_OK) {
derror_f("Hotkey detect rpc sending failed, in {}.{}, error_hint:{}",
app_name,
partition_index,
error.to_string());
}
})
->wait();
if (rpc.response().err != dsn::ERR_OK) {
derror_f("Hotkey detect rpc sending failed, in {}.{}, error_hint:{} {}",
app_name,
partition_index,
rpc.response().err,
rpc.response().err_hint);
}
}
} // namespace server
} // namespace pegasus