blob: 7f2b5fd814e47a3b0e5a1004f10f61d8d4ab550b [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "server/hotkey_collector.h"
#include <dsn/utility/rand.h>
#include <dsn/utility/flags.h>
#include <dsn/tool-api/task_tracker.h>
#include "pegasus_server_test_base.h"
namespace pegasus {
namespace server {
DSN_DECLARE_uint32(hotkey_buckets_num);
static std::string generate_hash_key_by_random(bool is_hotkey, int probability = 100)
{
if (is_hotkey && (dsn::rand::next_u32(100) < probability)) {
return "ThisisahotkeyThisisahotkey";
}
static const std::string chars("abcdefghijklmnopqrstuvwxyz"
"ABCDEFGHIJKLMNOPQRSTUVWXYZ"
"1234567890"
"!@#$%^&*()"
"`~-_=+[{]{\\|;:'\",<.>/? ");
std::string result;
for (int i = 0; i < 20; i++) {
result += chars[dsn::rand::next_u32(chars.size())];
}
return result;
}
TEST(hotkey_collector_test, get_bucket_id_test)
{
int bucket_id = -1;
for (int i = 0; i < 1000000; i++) {
bucket_id = get_bucket_id(dsn::blob::create_from_bytes(generate_hash_key_by_random(false)),
FLAGS_hotkey_buckets_num);
ASSERT_GE(bucket_id, 0);
ASSERT_LT(bucket_id, FLAGS_hotkey_buckets_num);
}
}
TEST(hotkey_collector_test, find_outlier_index_test)
{
int threshold = 3;
int hot_index;
bool hot_index_found;
hot_index_found = find_outlier_index({1, 2, 3}, threshold, hot_index);
ASSERT_EQ(hot_index_found, false);
ASSERT_EQ(hot_index, -1);
hot_index_found = find_outlier_index({1, 2, 100000}, threshold, hot_index);
ASSERT_EQ(hot_index_found, true);
ASSERT_EQ(hot_index, 2);
hot_index_found = find_outlier_index({1, 10000, 2, 3, 4, 10000000, 6}, threshold, hot_index);
ASSERT_EQ(hot_index_found, true);
ASSERT_EQ(hot_index, 5);
hot_index_found = find_outlier_index(
{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}, threshold, hot_index);
ASSERT_EQ(hot_index_found, false);
ASSERT_EQ(hot_index, -1);
}
class coarse_collector_test : public pegasus_server_test_base
{
public:
coarse_collector_test() : coarse_collector(_server.get(), FLAGS_hotkey_buckets_num){};
hotkey_coarse_data_collector coarse_collector;
bool empty()
{
int empty = true;
for (const auto &iter : coarse_collector._hash_buckets) {
if (iter.load() != 0) {
return false;
}
}
return true;
}
dsn::task_tracker _tracker;
};
TEST_F(coarse_collector_test, coarse_collector)
{
detect_hotkey_result result;
for (int i = 0; i < 1000; i++) {
dsn::tasking::enqueue(LPC_WRITE, &_tracker, [&] {
dsn::blob hash_key =
dsn::blob::create_from_bytes(generate_hash_key_by_random(true, 80));
coarse_collector.capture_data(hash_key, 1);
});
}
coarse_collector.analyse_data(result);
ASSERT_NE(result.coarse_bucket_index, -1);
_tracker.wait_outstanding_tasks();
coarse_collector.clear();
ASSERT_TRUE(empty());
for (int i = 0; i < 1000; i++) {
dsn::tasking::enqueue(LPC_WRITE, &_tracker, [&] {
dsn::blob hash_key = dsn::blob::create_from_bytes(generate_hash_key_by_random(false));
coarse_collector.capture_data(hash_key, 1);
});
}
coarse_collector.analyse_data(result);
_tracker.wait_outstanding_tasks();
ASSERT_EQ(result.coarse_bucket_index, -1);
}
class fine_collector_test : public pegasus_server_test_base
{
public:
int max_queue_size = 1000;
int target_bucket_index = 0;
hotkey_fine_data_collector fine_collector;
fine_collector_test() : fine_collector(_server.get(), 1, max_queue_size)
{
fine_collector.change_target_bucket(0);
};
int now_queue_size()
{
int queue_size = 0;
std::pair<dsn::blob, uint64_t> key_weight_pair;
while (fine_collector._capture_key_queue.try_dequeue(key_weight_pair)) {
queue_size++;
};
return queue_size;
}
dsn::task_tracker _tracker;
};
TEST_F(fine_collector_test, fine_collector)
{
detect_hotkey_result result;
for (int i = 0; i < 1000; i++) {
dsn::tasking::enqueue(RPC_REPLICATION_WRITE_EMPTY, &_tracker, [&] {
dsn::blob hash_key =
dsn::blob::create_from_bytes(generate_hash_key_by_random(true, 80));
fine_collector.capture_data(hash_key, 1);
});
}
fine_collector.analyse_data(result);
_tracker.wait_outstanding_tasks();
ASSERT_EQ(result.hot_hash_key, "ThisisahotkeyThisisahotkey");
fine_collector.clear();
ASSERT_EQ(now_queue_size(), 0);
result.hot_hash_key = "";
for (int i = 0; i < 1000; i++) {
dsn::tasking::enqueue(RPC_REPLICATION_WRITE_EMPTY, &_tracker, [&] {
dsn::blob hash_key = dsn::blob::create_from_bytes(generate_hash_key_by_random(false));
fine_collector.capture_data(hash_key, 1);
});
}
fine_collector.analyse_data(result);
_tracker.wait_outstanding_tasks();
ASSERT_TRUE(result.hot_hash_key.empty());
for (int i = 0; i < 5000; i++) {
dsn::tasking::enqueue(RPC_REPLICATION_WRITE_EMPTY, &_tracker, [&] {
dsn::blob hash_key =
dsn::blob::create_from_bytes(generate_hash_key_by_random(true, 80));
fine_collector.capture_data(hash_key, 1);
});
}
_tracker.wait_outstanding_tasks();
ASSERT_LT(now_queue_size(), max_queue_size * 2);
}
} // namespace server
} // namespace pegasus