fix(hotspot): replace partition_resolver to ddl_client (#641)
diff --git a/src/server/hotspot_partition_calculator.cpp b/src/server/hotspot_partition_calculator.cpp
index aea5a33..cc0142b 100644
--- a/src/server/hotspot_partition_calculator.cpp
+++ b/src/server/hotspot_partition_calculator.cpp
@@ -17,16 +17,13 @@
#include "hotspot_partition_calculator.h"
-#include <algorithm>
#include <math.h>
#include <dsn/dist/fmt_logging.h>
+#include <dsn/cpp/serialization_helper/dsn.layer2_types.h>
#include <dsn/utility/flags.h>
-#include <dsn/tool-api/rpc_address.h>
-#include <dsn/tool-api/group_address.h>
#include <dsn/utility/error_code.h>
-#include <dsn/dist/replication/duplication_common.h>
-#include <dsn/tool-api/task_tracker.h>
#include <dsn/utility/fail_point.h>
+#include <dsn/dist/replication/duplication_common.h>
namespace pegasus {
namespace server {
@@ -180,44 +177,39 @@
const dsn::replication::detect_action::type action)
{
FAIL_POINT_INJECT_F("send_detect_hotkey_request", [](dsn::string_view) {});
- auto request = dsn::make_unique<dsn::replication::detect_hotkey_request>();
- request->type = hotkey_type;
- request->action = action;
- ddebug_f("{} {} hotkey detection in {}.{}",
+
+ int app_id;
+ int partition_count;
+ std::vector<dsn::partition_configuration> partitions;
+ _shell_context->ddl_client->list_app(app_name, app_id, partition_count, partitions);
+
+ auto target_address = partitions[partition_index].primary;
+ dsn::replication::detect_hotkey_response resp;
+ dsn::replication::detect_hotkey_request req;
+ req.type = hotkey_type;
+ req.action = action;
+ auto error = _shell_context->ddl_client->detect_hotkey(target_address, req, resp);
+
+ ddebug_f("{} {} hotkey detection in {}.{}, server address: {}",
(action == dsn::replication::detect_action::STOP) ? "Stop" : "Start",
(hotkey_type == dsn::replication::hotkey_type::WRITE) ? "write" : "read",
app_name,
- partition_index);
- dsn::rpc_address meta_server;
- meta_server.assign_group("meta-servers");
- std::vector<dsn::rpc_address> meta_servers;
- replica_helper::load_meta_servers(meta_servers);
- for (const auto &address : meta_servers) {
- meta_server.group_address()->add(address);
- }
- auto cluster_name = dsn::replication::get_current_cluster_name();
- // TODO: (Tangyanzhao) refactor partition_resolver to replication_ddl_client
- auto resolver = partition_resolver::get_resolver(cluster_name, meta_servers, app_name.c_str());
- dsn::task_tracker tracker;
- detect_hotkey_rpc rpc(
- std::move(request), RPC_DETECT_HOTKEY, std::chrono::seconds(10), partition_index);
- rpc.call(resolver,
- &tracker,
- [app_name, partition_index](dsn::error_code error) {
- if (error != dsn::ERR_OK) {
- derror_f("Hotkey detect rpc sending failed, in {}.{}, error_hint:{}",
- app_name,
- partition_index,
- error.to_string());
- }
- })
- ->wait();
- if (rpc.response().err != dsn::ERR_OK) {
- derror_f("Hotkey detect rpc sending failed, in {}.{}, error_hint:{} {}",
+ partition_index,
+ target_address.to_string());
+
+ if (error != dsn::ERR_OK) {
+ derror_f("Hotkey detect rpc sending failed, in {}.{}, error_hint:{}",
app_name,
partition_index,
- rpc.response().err,
- rpc.response().err_hint);
+ error.to_string());
+ }
+
+ if (resp.err != dsn::ERR_OK) {
+ derror_f("Hotkey detect rpc executing failed, in {}.{}, error_hint:{} {}",
+ app_name,
+ partition_index,
+ resp.err,
+ resp.err_hint);
}
}
diff --git a/src/server/hotspot_partition_calculator.h b/src/server/hotspot_partition_calculator.h
index 2cbc62b..ddbbb15 100644
--- a/src/server/hotspot_partition_calculator.h
+++ b/src/server/hotspot_partition_calculator.h
@@ -37,7 +37,9 @@
class hotspot_partition_calculator
{
public:
- hotspot_partition_calculator(const std::string &app_name, int partition_count)
+ hotspot_partition_calculator(const std::string &app_name,
+ int partition_count,
+ std::shared_ptr<shell_context> context)
: _app_name(app_name), _hot_points(partition_count), _hotpartition_counter(partition_count)
{
init_perf_counter(partition_count);
@@ -46,10 +48,10 @@
void data_aggregate(const std::vector<row_data> &partitions);
// analyse the saved data to find hotspot partition
void data_analyse();
- static void send_detect_hotkey_request(const std::string &app_name,
- const uint64_t partition_index,
- const dsn::replication::hotkey_type::type hotkey_type,
- const dsn::replication::detect_action::type action);
+ void send_detect_hotkey_request(const std::string &app_name,
+ const uint64_t partition_index,
+ const dsn::replication::hotkey_type::type hotkey_type,
+ const dsn::replication::detect_action::type action);
private:
// empirical rule to calculate hot point of each partition
@@ -65,6 +67,7 @@
hot_partition_counters _hot_points;
// saving historical data can improve accuracy
stat_histories _partitions_stat_histories;
+ std::shared_ptr<shell_context> _shell_context;
// _hotpartition_counter p[index_of_partitions][type_of_read(0)/write(1)_stat]
// it's a counter to find partitions that often exceed the threshold
diff --git a/src/server/info_collector.cpp b/src/server/info_collector.cpp
index d3cbe74..2efabae 100644
--- a/src/server/info_collector.cpp
+++ b/src/server/info_collector.cpp
@@ -57,9 +57,10 @@
_cluster_name = dsn::replication::get_current_cluster_name();
- _shell_context.current_cluster_name = _cluster_name;
- _shell_context.meta_list = meta_servers;
- _shell_context.ddl_client.reset(new replication_ddl_client(meta_servers));
+ _shell_context = std::make_shared<shell_context>();
+ _shell_context->current_cluster_name = _cluster_name;
+ _shell_context->meta_list = meta_servers;
+ _shell_context->ddl_client.reset(new replication_ddl_client(meta_servers));
_app_stat_interval_seconds = (uint32_t)dsn_config_get_value_uint64("pegasus.collector",
"app_stat_interval_seconds",
@@ -143,7 +144,7 @@
{
ddebug("start to stat apps");
std::map<std::string, std::vector<row_data>> all_rows;
- if (!get_app_partition_stat(&_shell_context, all_rows)) {
+ if (!get_app_partition_stat(_shell_context.get(), all_rows)) {
derror("call get_app_stat() failed");
return;
}
@@ -241,7 +242,7 @@
{
ddebug("start to stat capacity unit, remaining_retry_count = %d", remaining_retry_count);
std::vector<node_capacity_unit_stat> nodes_stat;
- if (!get_capacity_unit_stat(&_shell_context, nodes_stat)) {
+ if (!get_capacity_unit_stat(_shell_context.get(), nodes_stat)) {
if (remaining_retry_count > 0) {
dwarn("get capacity unit stat failed, remaining_retry_count = %d, "
"wait %u seconds to retry",
@@ -288,7 +289,7 @@
{
ddebug("start to stat storage size, remaining_retry_count = %d", remaining_retry_count);
app_storage_size_stat st_stat;
- if (!get_storage_size_stat(&_shell_context, st_stat)) {
+ if (!get_storage_size_stat(_shell_context.get(), st_stat)) {
if (remaining_retry_count > 0) {
dwarn("get storage size stat failed, remaining_retry_count = %d, "
"wait %u seconds to retry",
@@ -316,7 +317,8 @@
if (iter != _hotspot_calculator_store.end()) {
return iter->second;
}
- auto calculator = std::make_shared<hotspot_partition_calculator>(app_name, partition_count);
+ auto calculator =
+ std::make_shared<hotspot_partition_calculator>(app_name, partition_count, _shell_context);
_hotspot_calculator_store[app_name_pcount] = calculator;
return calculator;
}
diff --git a/src/server/info_collector.h b/src/server/info_collector.h
index 12bebdd..a7afff2 100644
--- a/src/server/info_collector.h
+++ b/src/server/info_collector.h
@@ -173,7 +173,7 @@
dsn::task_tracker _tracker;
::dsn::rpc_address _meta_servers;
std::string _cluster_name;
- shell_context _shell_context;
+ std::shared_ptr<shell_context> _shell_context;
uint32_t _app_stat_interval_seconds;
::dsn::task_ptr _app_stat_timer_task;
::dsn::utils::ex_lock_nr _app_stat_counter_lock;
diff --git a/src/server/test/hotspot_partition_test.cpp b/src/server/test/hotspot_partition_test.cpp
index f41923c..645fa8a 100644
--- a/src/server/test/hotspot_partition_test.cpp
+++ b/src/server/test/hotspot_partition_test.cpp
@@ -29,7 +29,7 @@
class hotspot_partition_test : public pegasus_server_test_base
{
public:
- hotspot_partition_test() : calculator("TEST", 8)
+ hotspot_partition_test() : calculator("TEST", 8, nullptr)
{
dsn::fail::setup();
dsn::fail::cfg("send_detect_hotkey_request", "return()");