blob: c326e72c60f57d46af2ea958ba3dade33899b7d5 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "hotkey_collector.h"
#include <dsn/dist/replication/replication_enums.h>
#include <dsn/utility/smart_pointers.h>
#include <dsn/utility/flags.h>
#include <boost/functional/hash.hpp>
#include <dsn/dist/fmt_logging.h>
#include <dsn/utility/flags.h>
#include "base/pegasus_key_schema.h"
namespace pegasus {
namespace server {
DSN_DEFINE_uint32(
"pegasus.server",
hot_bucket_variance_threshold,
3,
"the variance threshold to detect hot bucket during coarse analysis of hotkey detection");
DSN_DEFINE_uint32(
"pegasus.server",
hot_key_variance_threshold,
3,
"the variance threshold to detect hot key during fine analysis of hotkey detection");
// TODO: (Tangyanzhao) add a limit to avoid changing when detecting
DSN_DEFINE_uint32("pegasus.server",
hotkey_buckets_num,
37,
"the number of data capture hash buckets");
DSN_DEFINE_validator(hotkey_buckets_num, [](int32_t bucket_num) -> bool {
if (bucket_num < 3) {
return false;
}
// hotkey_buckets_num should be a prime number
for (int i = 2; i <= bucket_num / i; i++) {
if (bucket_num % i == 0) {
return false;
}
}
return true;
});
DSN_DEFINE_int32(
"pegasus.server",
max_seconds_to_detect_hotkey,
150,
"the max time (in seconds) allowed to capture hotkey, will stop if hotkey's not found");
// 68–95–99.7 rule, same algorithm as hotspot_partition_calculator::stat_histories_analyse
static bool
find_outlier_index(const std::vector<uint64_t> &captured_keys, int threshold, int &hot_index)
{
dcheck_gt(captured_keys.size(), 2);
int data_size = captured_keys.size();
// empirical rule to calculate hot point of each partition
// same algorithm as hotspot_partition_calculator::stat_histories_analyse
double table_captured_key_sum = 0;
int hot_value = 0;
for (int i = 0; i < data_size; i++) {
table_captured_key_sum += captured_keys[i];
if (captured_keys[i] > hot_value) {
hot_index = i;
hot_value = captured_keys[i];
}
}
// TODO: (Tangyanzhao) increase a judgment of table_captured_key_sum
double captured_keys_avg_count =
(table_captured_key_sum - captured_keys[hot_index]) / (data_size - 1);
double standard_deviation = 0;
for (int i = 0; i < data_size; i++) {
if (i != hot_index) {
standard_deviation += pow((captured_keys[i] - captured_keys_avg_count), 2);
}
}
standard_deviation = sqrt(standard_deviation / (data_size - 2));
double hot_point = (hot_value - captured_keys_avg_count) / standard_deviation;
if (hot_point >= threshold) {
return true;
} else {
hot_index = -1;
return false;
}
}
// TODO: (Tangyanzhao) replace it to xxhash
static int get_bucket_id(dsn::string_view data)
{
size_t hash_value = boost::hash_range(data.begin(), data.end());
return static_cast<int>(hash_value % FLAGS_hotkey_buckets_num);
}
hotkey_collector::hotkey_collector(dsn::replication::hotkey_type::type hotkey_type,
dsn::replication::replica_base *r_base)
: replica_base(r_base),
_state(hotkey_collector_state::STOPPED),
_hotkey_type(hotkey_type),
_internal_collector(std::make_shared<hotkey_empty_data_collector>(this)),
_collector_start_time_second(0)
{
}
void hotkey_collector::handle_rpc(const dsn::replication::detect_hotkey_request &req,
dsn::replication::detect_hotkey_response &resp)
{
switch (req.action) {
case dsn::replication::detect_action::START:
on_start_detect(resp);
return;
case dsn::replication::detect_action::STOP:
on_stop_detect(resp);
return;
default:
std::string hint = fmt::format("{}: can't find this detect action", req.action);
resp.err = dsn::ERR_INVALID_STATE;
resp.__set_err_hint(hint);
derror_replica(hint);
}
}
void hotkey_collector::capture_raw_key(const dsn::blob &raw_key, int64_t weight)
{
dsn::blob hash_key, sort_key;
pegasus_restore_key(raw_key, hash_key, sort_key);
capture_hash_key(hash_key, weight);
}
void hotkey_collector::capture_hash_key(const dsn::blob &hash_key, int64_t weight)
{
// TODO: (Tangyanzhao) add a unit test to ensure data integrity
_internal_collector->capture_data(hash_key, weight);
}
void hotkey_collector::analyse_data()
{
switch (_state.load()) {
case hotkey_collector_state::COARSE_DETECTING:
if (!terminate_if_timeout()) {
_internal_collector->analyse_data(_result);
if (_result.coarse_bucket_index != -1) {
// TODO: (Tangyanzhao) reset _internal_collector to hotkey_fine_data_collector
_state.store(hotkey_collector_state::FINE_DETECTING);
}
}
return;
default:
return;
}
}
void hotkey_collector::on_start_detect(dsn::replication::detect_hotkey_response &resp)
{
auto now_state = _state.load();
std::string hint;
switch (now_state) {
case hotkey_collector_state::COARSE_DETECTING:
case hotkey_collector_state::FINE_DETECTING:
resp.err = dsn::ERR_INVALID_STATE;
hint = fmt::format("still detecting {} hotkey, state is {}",
dsn::enum_to_string(_hotkey_type),
enum_to_string(now_state));
dwarn_replica(hint);
return;
case hotkey_collector_state::FINISHED:
resp.err = dsn::ERR_INVALID_STATE;
hint = fmt::format(
"{} hotkey result has been found, you can send a stop rpc to restart hotkey detection",
dsn::enum_to_string(_hotkey_type));
dwarn_replica(hint);
return;
case hotkey_collector_state::STOPPED:
_collector_start_time_second = dsn_now_s();
_internal_collector.reset(new hotkey_coarse_data_collector(this));
_state.store(hotkey_collector_state::COARSE_DETECTING);
resp.err = dsn::ERR_OK;
hint = fmt::format("starting to detect {} hotkey", dsn::enum_to_string(_hotkey_type));
ddebug_replica(hint);
return;
default:
hint = "invalid collector state";
resp.err = dsn::ERR_INVALID_STATE;
resp.__set_err_hint(hint);
derror_replica(hint);
dassert(false, "invalid collector state");
}
}
void hotkey_collector::on_stop_detect(dsn::replication::detect_hotkey_response &resp)
{
terminate();
resp.err = dsn::ERR_OK;
std::string hint =
fmt::format("{} hotkey stopped, cache cleared", dsn::enum_to_string(_hotkey_type));
ddebug_replica(hint);
}
void hotkey_collector::terminate()
{
_state.store(hotkey_collector_state::STOPPED);
_internal_collector.reset();
_collector_start_time_second = 0;
}
bool hotkey_collector::terminate_if_timeout()
{
if (dsn_now_s() >= _collector_start_time_second + FLAGS_max_seconds_to_detect_hotkey) {
ddebug_replica("hotkey collector work time is exhausted but no hotkey has been found");
terminate();
return true;
}
return false;
}
hotkey_coarse_data_collector::hotkey_coarse_data_collector(replica_base *base)
: internal_collector_base(base), _hash_buckets(FLAGS_hotkey_buckets_num)
{
for (auto &bucket : _hash_buckets) {
bucket.store(0);
}
}
void hotkey_coarse_data_collector::capture_data(const dsn::blob &hash_key, uint64_t weight)
{
_hash_buckets[get_bucket_id(hash_key)].fetch_add(weight);
}
void hotkey_coarse_data_collector::analyse_data(detect_hotkey_result &result)
{
std::vector<uint64_t> buckets(FLAGS_hotkey_buckets_num);
for (int i = 0; i < buckets.size(); i++) {
buckets[i] = _hash_buckets[i].load();
_hash_buckets[i].store(0);
}
if (!find_outlier_index(
buckets, FLAGS_hot_bucket_variance_threshold, result.coarse_bucket_index)) {
result.coarse_bucket_index = -1;
}
}
void hotkey_coarse_data_collector::clear()
{
for (int i = 0; i < FLAGS_hotkey_buckets_num; i++) {
_hash_buckets[i].store(0);
}
}
hotkey_fine_data_collector::hotkey_fine_data_collector(replica_base *base,
int target_bucket_index,
int max_queue_size)
: internal_collector_base(base),
_max_queue_size(max_queue_size),
_target_bucket_index(target_bucket_index),
_capture_key_queue(max_queue_size)
{
}
void hotkey_fine_data_collector::capture_data(const dsn::blob &hash_key, uint64_t weight)
{
if (get_bucket_id(hash_key) != _target_bucket_index) {
return;
}
// abandon the key if enqueue failed (possibly because not enough room to enqueue)
_capture_key_queue.try_enqueue(std::make_pair(hash_key, weight));
}
struct blob_hash
{
std::size_t operator()(const dsn::blob &str) const
{
dsn::string_view cp(str);
return boost::hash_range(cp.begin(), cp.end());
}
};
struct blob_equal
{
std::size_t operator()(const dsn::blob &lhs, const dsn::blob &rhs) const
{
return dsn::string_view(lhs) == dsn::string_view(rhs);
}
};
void hotkey_fine_data_collector::analyse_data(detect_hotkey_result &result)
{
// hashkey -> weight
std::unordered_map<dsn::blob, uint64_t, blob_hash, blob_equal> hash_keys_weight;
std::pair<dsn::blob, uint64_t> key_weight_pair;
// prevent endless loop, limit the number of elements analyzed not to exceed the queue size
uint32_t dequeue_cnt = 0;
while (++dequeue_cnt <= _max_queue_size && _capture_key_queue.try_dequeue(key_weight_pair)) {
hash_keys_weight[key_weight_pair.first] += key_weight_pair.second;
}
if (hash_keys_weight.empty()) {
return;
}
// the weight of all the collected hash keys
std::vector<uint64_t> weights;
weights.reserve(hash_keys_weight.size());
dsn::string_view weight_max_key; // the hashkey with the max weight
uint64_t weight_max = 0; // the max weight by far
for (const auto &iter : hash_keys_weight) {
weights.push_back(iter.second);
if (iter.second > weight_max) {
weight_max = iter.second;
weight_max_key = iter.first;
}
}
// hash_key_counts stores the number of occurrences of each string captured in a period of time
// The size of weights influences our hotkey determination strategy
// weights.size() <= 2: the hotkey must exist (the most weighted key), because
// the two-level filtering significantly reduces the
// possibility that the hottest key is not the actual hotkey.
// weights.size() >= 3: use find_outlier_index to determine whether a hotkey exists
int hot_index;
if (weights.size() < 3 ||
find_outlier_index(weights, FLAGS_hot_key_variance_threshold, hot_index)) {
result.hot_hash_key = std::string(weight_max_key);
}
}
void hotkey_fine_data_collector::clear()
{
std::pair<dsn::blob, uint64_t> key_weight_pair;
while (_capture_key_queue.try_dequeue(key_weight_pair)) {
}
}
} // namespace server
} // namespace pegasus