feat(hotspot): add a function to start hotkey detecting in hotspot_partition_calculator (#601)
diff --git a/src/base/rrdb_types.cpp b/src/base/rrdb_types.cpp
index c940a2b..53ac7c7 100644
--- a/src/base/rrdb_types.cpp
+++ b/src/base/rrdb_types.cpp
@@ -4608,9 +4608,9 @@
void hotkey_detect_request::__set_type(const hotkey_type::type val) { this->type = val; }
-void hotkey_detect_request::__set_operation(const hotkey_detect_action::type val)
+void hotkey_detect_request::__set_action(const hotkey_detect_action::type val)
{
- this->operation = val;
+ this->action = val;
}
uint32_t hotkey_detect_request::read(::apache::thrift::protocol::TProtocol *iprot)
@@ -4646,8 +4646,8 @@
if (ftype == ::apache::thrift::protocol::T_I32) {
int32_t ecast135;
xfer += iprot->readI32(ecast135);
- this->operation = (hotkey_detect_action::type)ecast135;
- this->__isset.operation = true;
+ this->action = (hotkey_detect_action::type)ecast135;
+ this->__isset.action = true;
} else {
xfer += iprot->skip(ftype);
}
@@ -4674,8 +4674,8 @@
xfer += oprot->writeI32((int32_t)this->type);
xfer += oprot->writeFieldEnd();
- xfer += oprot->writeFieldBegin("operation", ::apache::thrift::protocol::T_I32, 2);
- xfer += oprot->writeI32((int32_t)this->operation);
+ xfer += oprot->writeFieldBegin("action", ::apache::thrift::protocol::T_I32, 2);
+ xfer += oprot->writeI32((int32_t)this->action);
xfer += oprot->writeFieldEnd();
xfer += oprot->writeFieldStop();
@@ -4687,33 +4687,33 @@
{
using ::std::swap;
swap(a.type, b.type);
- swap(a.operation, b.operation);
+ swap(a.action, b.action);
swap(a.__isset, b.__isset);
}
hotkey_detect_request::hotkey_detect_request(const hotkey_detect_request &other136)
{
type = other136.type;
- operation = other136.operation;
+ action = other136.action;
__isset = other136.__isset;
}
hotkey_detect_request::hotkey_detect_request(hotkey_detect_request &&other137)
{
type = std::move(other137.type);
- operation = std::move(other137.operation);
+ action = std::move(other137.action);
__isset = std::move(other137.__isset);
}
hotkey_detect_request &hotkey_detect_request::operator=(const hotkey_detect_request &other138)
{
type = other138.type;
- operation = other138.operation;
+ action = other138.action;
__isset = other138.__isset;
return *this;
}
hotkey_detect_request &hotkey_detect_request::operator=(hotkey_detect_request &&other139)
{
type = std::move(other139.type);
- operation = std::move(other139.operation);
+ action = std::move(other139.action);
__isset = std::move(other139.__isset);
return *this;
}
@@ -4723,7 +4723,7 @@
out << "hotkey_detect_request(";
out << "type=" << to_string(type);
out << ", "
- << "operation=" << to_string(operation);
+ << "action=" << to_string(action);
out << ")";
}
diff --git a/src/idl/rrdb.thrift b/src/idl/rrdb.thrift
index 9b874df..fdf05f8 100644
--- a/src/idl/rrdb.thrift
+++ b/src/idl/rrdb.thrift
@@ -293,7 +293,7 @@
struct hotkey_detect_request {
1: hotkey_type type
- 2: hotkey_detect_action operation
+ 2: hotkey_detect_action action
}
struct hotkey_detect_response {
diff --git a/src/include/rrdb/rrdb_types.h b/src/include/rrdb/rrdb_types.h
index 2c6a210..14683ac 100644
--- a/src/include/rrdb/rrdb_types.h
+++ b/src/include/rrdb/rrdb_types.h
@@ -1967,9 +1967,9 @@
typedef struct _hotkey_detect_request__isset
{
- _hotkey_detect_request__isset() : type(false), operation(false) {}
+ _hotkey_detect_request__isset() : type(false), action(false) {}
bool type : 1;
- bool operation : 1;
+ bool action : 1;
} _hotkey_detect_request__isset;
class hotkey_detect_request
@@ -1979,25 +1979,23 @@
hotkey_detect_request(hotkey_detect_request &&);
hotkey_detect_request &operator=(const hotkey_detect_request &);
hotkey_detect_request &operator=(hotkey_detect_request &&);
- hotkey_detect_request() : type((hotkey_type::type)0), operation((hotkey_detect_action::type)0)
- {
- }
+ hotkey_detect_request() : type((hotkey_type::type)0), action((hotkey_detect_action::type)0) {}
virtual ~hotkey_detect_request() throw();
hotkey_type::type type;
- hotkey_detect_action::type operation;
+ hotkey_detect_action::type action;
_hotkey_detect_request__isset __isset;
void __set_type(const hotkey_type::type val);
- void __set_operation(const hotkey_detect_action::type val);
+ void __set_action(const hotkey_detect_action::type val);
bool operator==(const hotkey_detect_request &rhs) const
{
if (!(type == rhs.type))
return false;
- if (!(operation == rhs.operation))
+ if (!(action == rhs.action))
return false;
return true;
}
diff --git a/src/server/hotspot_partition_calculator.cpp b/src/server/hotspot_partition_calculator.cpp
index 865814e..839092d 100644
--- a/src/server/hotspot_partition_calculator.cpp
+++ b/src/server/hotspot_partition_calculator.cpp
@@ -21,6 +21,13 @@
#include <math.h>
#include <dsn/dist/fmt_logging.h>
#include <dsn/utility/flags.h>
+#include <dsn/tool-api/rpc_address.h>
+#include <dsn/tool-api/group_address.h>
+#include <dsn/utility/error_code.h>
+#include <rrdb/rrdb_types.h>
+#include <dsn/dist/replication/duplication_common.h>
+#include <dsn/tool-api/task_tracker.h>
+#include "pegasus_read_service.h"
namespace pegasus {
namespace server {
@@ -98,5 +105,52 @@
}
}
+// TODO:(TangYanzhao) call this function to start hotkey detection
+/*static*/ void hotspot_partition_calculator::send_hotkey_detect_request(
+ const std::string &app_name,
+ const uint64_t partition_index,
+ const dsn::apps::hotkey_type::type hotkey_type,
+ const dsn::apps::hotkey_detect_action::type action)
+{
+ auto request = std::make_unique<dsn::apps::hotkey_detect_request>();
+ request->type = hotkey_type;
+ request->action = action;
+ ddebug_f("{} {} hotkey detection in {}.{}",
+ (action == dsn::apps::hotkey_detect_action::STOP) ? "Stop" : "Start",
+ (hotkey_type == dsn::apps::hotkey_type::WRITE) ? "write" : "read",
+ app_name,
+ partition_index);
+ dsn::rpc_address meta_server;
+ meta_server.assign_group("meta-servers");
+ std::vector<dsn::rpc_address> meta_servers;
+ replica_helper::load_meta_servers(meta_servers);
+ for (const auto &address : meta_servers) {
+ meta_server.group_address()->add(address);
+ }
+ auto cluster_name = dsn::replication::get_current_cluster_name();
+ auto resolver = partition_resolver::get_resolver(cluster_name, meta_servers, app_name.c_str());
+ dsn::task_tracker tracker;
+ detect_hotkey_rpc rpc(
+ std::move(request), RPC_DETECT_HOTKEY, std::chrono::seconds(10), partition_index);
+ rpc.call(resolver,
+ &tracker,
+ [app_name, partition_index](dsn::error_code error) {
+ if (error != dsn::ERR_OK) {
+ derror_f("Hotkey detect rpc sending failed, in {}.{}, error_hint:{}",
+ app_name,
+ partition_index,
+ error.to_string());
+ }
+ })
+ ->wait();
+ if (rpc.response().err != dsn::ERR_OK) {
+ derror_f("Hotkey detect rpc sending failed, in {}.{}, error_hint:{} {}",
+ app_name,
+ partition_index,
+ rpc.response().err,
+ rpc.response().err_hint);
+ }
+}
+
} // namespace server
} // namespace pegasus
diff --git a/src/server/hotspot_partition_calculator.h b/src/server/hotspot_partition_calculator.h
index c950ebe..8b13718 100644
--- a/src/server/hotspot_partition_calculator.h
+++ b/src/server/hotspot_partition_calculator.h
@@ -37,6 +37,10 @@
void data_aggregate(const std::vector<row_data> &partitions);
// analyse the saved data to find hotspot partition
void data_analyse();
+ static void send_hotkey_detect_request(const std::string &app_name,
+ const uint64_t partition_index,
+ const dsn::apps::hotkey_type::type hotkey_type,
+ const dsn::apps::hotkey_detect_action::type action);
private:
const std::string _app_name;
diff --git a/src/server/pegasus_read_service.h b/src/server/pegasus_read_service.h
index 67b6a80..fe66637 100644
--- a/src/server/pegasus_read_service.h
+++ b/src/server/pegasus_read_service.h
@@ -32,6 +32,8 @@
typedef ::dsn::rpc_holder<::dsn::apps::get_scanner_request, dsn::apps::scan_response>
get_scanner_rpc;
typedef ::dsn::rpc_holder<::dsn::apps::scan_request, dsn::apps::scan_response> scan_rpc;
+typedef ::dsn::rpc_holder<::dsn::apps::hotkey_detect_request, dsn::apps::hotkey_detect_response>
+ detect_hotkey_rpc;
class pegasus_read_service : public dsn::replication::replication_app_base,
public dsn::replication::storage_serverlet<pegasus_read_service>