fix(hotspot): replace partition_resolver to ddl_client (#641)

diff --git a/src/server/hotspot_partition_calculator.cpp b/src/server/hotspot_partition_calculator.cpp
index aea5a33..cc0142b 100644
--- a/src/server/hotspot_partition_calculator.cpp
+++ b/src/server/hotspot_partition_calculator.cpp
@@ -17,16 +17,13 @@
 
 #include "hotspot_partition_calculator.h"
 
-#include <algorithm>
 #include <math.h>
 #include <dsn/dist/fmt_logging.h>
+#include <dsn/cpp/serialization_helper/dsn.layer2_types.h>
 #include <dsn/utility/flags.h>
-#include <dsn/tool-api/rpc_address.h>
-#include <dsn/tool-api/group_address.h>
 #include <dsn/utility/error_code.h>
-#include <dsn/dist/replication/duplication_common.h>
-#include <dsn/tool-api/task_tracker.h>
 #include <dsn/utility/fail_point.h>
+#include <dsn/dist/replication/duplication_common.h>
 
 namespace pegasus {
 namespace server {
@@ -180,44 +177,39 @@
     const dsn::replication::detect_action::type action)
 {
     FAIL_POINT_INJECT_F("send_detect_hotkey_request", [](dsn::string_view) {});
-    auto request = dsn::make_unique<dsn::replication::detect_hotkey_request>();
-    request->type = hotkey_type;
-    request->action = action;
-    ddebug_f("{} {} hotkey detection in {}.{}",
+
+    int app_id;
+    int partition_count;
+    std::vector<dsn::partition_configuration> partitions;
+    _shell_context->ddl_client->list_app(app_name, app_id, partition_count, partitions);
+
+    auto target_address = partitions[partition_index].primary;
+    dsn::replication::detect_hotkey_response resp;
+    dsn::replication::detect_hotkey_request req;
+    req.type = hotkey_type;
+    req.action = action;
+    auto error = _shell_context->ddl_client->detect_hotkey(target_address, req, resp);
+
+    ddebug_f("{} {} hotkey detection in {}.{}, server address: {}",
              (action == dsn::replication::detect_action::STOP) ? "Stop" : "Start",
              (hotkey_type == dsn::replication::hotkey_type::WRITE) ? "write" : "read",
              app_name,
-             partition_index);
-    dsn::rpc_address meta_server;
-    meta_server.assign_group("meta-servers");
-    std::vector<dsn::rpc_address> meta_servers;
-    replica_helper::load_meta_servers(meta_servers);
-    for (const auto &address : meta_servers) {
-        meta_server.group_address()->add(address);
-    }
-    auto cluster_name = dsn::replication::get_current_cluster_name();
-    // TODO: (Tangyanzhao) refactor partition_resolver to replication_ddl_client
-    auto resolver = partition_resolver::get_resolver(cluster_name, meta_servers, app_name.c_str());
-    dsn::task_tracker tracker;
-    detect_hotkey_rpc rpc(
-        std::move(request), RPC_DETECT_HOTKEY, std::chrono::seconds(10), partition_index);
-    rpc.call(resolver,
-             &tracker,
-             [app_name, partition_index](dsn::error_code error) {
-                 if (error != dsn::ERR_OK) {
-                     derror_f("Hotkey detect rpc sending failed, in {}.{}, error_hint:{}",
-                              app_name,
-                              partition_index,
-                              error.to_string());
-                 }
-             })
-        ->wait();
-    if (rpc.response().err != dsn::ERR_OK) {
-        derror_f("Hotkey detect rpc sending failed, in {}.{}, error_hint:{} {}",
+             partition_index,
+             target_address.to_string());
+
+    if (error != dsn::ERR_OK) {
+        derror_f("Hotkey detect rpc sending failed, in {}.{}, error_hint:{}",
                  app_name,
                  partition_index,
-                 rpc.response().err,
-                 rpc.response().err_hint);
+                 error.to_string());
+    }
+
+    if (resp.err != dsn::ERR_OK) {
+        derror_f("Hotkey detect rpc executing failed, in {}.{}, error_hint:{} {}",
+                 app_name,
+                 partition_index,
+                 resp.err,
+                 resp.err_hint);
     }
 }
 
diff --git a/src/server/hotspot_partition_calculator.h b/src/server/hotspot_partition_calculator.h
index 2cbc62b..ddbbb15 100644
--- a/src/server/hotspot_partition_calculator.h
+++ b/src/server/hotspot_partition_calculator.h
@@ -37,7 +37,9 @@
 class hotspot_partition_calculator
 {
 public:
-    hotspot_partition_calculator(const std::string &app_name, int partition_count)
+    hotspot_partition_calculator(const std::string &app_name,
+                                 int partition_count,
+                                 std::shared_ptr<shell_context> context)
         : _app_name(app_name), _hot_points(partition_count), _hotpartition_counter(partition_count)
     {
         init_perf_counter(partition_count);
@@ -46,10 +48,10 @@
     void data_aggregate(const std::vector<row_data> &partitions);
     // analyse the saved data to find hotspot partition
     void data_analyse();
-    static void send_detect_hotkey_request(const std::string &app_name,
-                                           const uint64_t partition_index,
-                                           const dsn::replication::hotkey_type::type hotkey_type,
-                                           const dsn::replication::detect_action::type action);
+    void send_detect_hotkey_request(const std::string &app_name,
+                                    const uint64_t partition_index,
+                                    const dsn::replication::hotkey_type::type hotkey_type,
+                                    const dsn::replication::detect_action::type action);
 
 private:
     // empirical rule to calculate hot point of each partition
@@ -65,6 +67,7 @@
     hot_partition_counters _hot_points;
     // saving historical data can improve accuracy
     stat_histories _partitions_stat_histories;
+    std::shared_ptr<shell_context> _shell_context;
 
     // _hotpartition_counter p[index_of_partitions][type_of_read(0)/write(1)_stat]
     // it's a counter to find partitions that often exceed the threshold
diff --git a/src/server/info_collector.cpp b/src/server/info_collector.cpp
index d3cbe74..2efabae 100644
--- a/src/server/info_collector.cpp
+++ b/src/server/info_collector.cpp
@@ -57,9 +57,10 @@
 
     _cluster_name = dsn::replication::get_current_cluster_name();
 
-    _shell_context.current_cluster_name = _cluster_name;
-    _shell_context.meta_list = meta_servers;
-    _shell_context.ddl_client.reset(new replication_ddl_client(meta_servers));
+    _shell_context = std::make_shared<shell_context>();
+    _shell_context->current_cluster_name = _cluster_name;
+    _shell_context->meta_list = meta_servers;
+    _shell_context->ddl_client.reset(new replication_ddl_client(meta_servers));
 
     _app_stat_interval_seconds = (uint32_t)dsn_config_get_value_uint64("pegasus.collector",
                                                                        "app_stat_interval_seconds",
@@ -143,7 +144,7 @@
 {
     ddebug("start to stat apps");
     std::map<std::string, std::vector<row_data>> all_rows;
-    if (!get_app_partition_stat(&_shell_context, all_rows)) {
+    if (!get_app_partition_stat(_shell_context.get(), all_rows)) {
         derror("call get_app_stat() failed");
         return;
     }
@@ -241,7 +242,7 @@
 {
     ddebug("start to stat capacity unit, remaining_retry_count = %d", remaining_retry_count);
     std::vector<node_capacity_unit_stat> nodes_stat;
-    if (!get_capacity_unit_stat(&_shell_context, nodes_stat)) {
+    if (!get_capacity_unit_stat(_shell_context.get(), nodes_stat)) {
         if (remaining_retry_count > 0) {
             dwarn("get capacity unit stat failed, remaining_retry_count = %d, "
                   "wait %u seconds to retry",
@@ -288,7 +289,7 @@
 {
     ddebug("start to stat storage size, remaining_retry_count = %d", remaining_retry_count);
     app_storage_size_stat st_stat;
-    if (!get_storage_size_stat(&_shell_context, st_stat)) {
+    if (!get_storage_size_stat(_shell_context.get(), st_stat)) {
         if (remaining_retry_count > 0) {
             dwarn("get storage size stat failed, remaining_retry_count = %d, "
                   "wait %u seconds to retry",
@@ -316,7 +317,8 @@
     if (iter != _hotspot_calculator_store.end()) {
         return iter->second;
     }
-    auto calculator = std::make_shared<hotspot_partition_calculator>(app_name, partition_count);
+    auto calculator =
+        std::make_shared<hotspot_partition_calculator>(app_name, partition_count, _shell_context);
     _hotspot_calculator_store[app_name_pcount] = calculator;
     return calculator;
 }
diff --git a/src/server/info_collector.h b/src/server/info_collector.h
index 12bebdd..a7afff2 100644
--- a/src/server/info_collector.h
+++ b/src/server/info_collector.h
@@ -173,7 +173,7 @@
     dsn::task_tracker _tracker;
     ::dsn::rpc_address _meta_servers;
     std::string _cluster_name;
-    shell_context _shell_context;
+    std::shared_ptr<shell_context> _shell_context;
     uint32_t _app_stat_interval_seconds;
     ::dsn::task_ptr _app_stat_timer_task;
     ::dsn::utils::ex_lock_nr _app_stat_counter_lock;
diff --git a/src/server/test/hotspot_partition_test.cpp b/src/server/test/hotspot_partition_test.cpp
index f41923c..645fa8a 100644
--- a/src/server/test/hotspot_partition_test.cpp
+++ b/src/server/test/hotspot_partition_test.cpp
@@ -29,7 +29,7 @@
 class hotspot_partition_test : public pegasus_server_test_base
 {
 public:
-    hotspot_partition_test() : calculator("TEST", 8)
+    hotspot_partition_test() : calculator("TEST", 8, nullptr)
     {
         dsn::fail::setup();
         dsn::fail::cfg("send_detect_hotkey_request", "return()");