blob: a457a1e8222b77023ad035f36901605eee47c860 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "capacity_unit_calculator.h"
#include <dsn/utility/config_api.h>
#include <rocksdb/status.h>
#include "hotkey_collector.h"
namespace pegasus {
namespace server {
capacity_unit_calculator::capacity_unit_calculator(
replica_base *r,
std::shared_ptr<hotkey_collector> read_hotkey_collector,
std::shared_ptr<hotkey_collector> write_hotkey_collector)
: replica_base(r),
_read_hotkey_collector(read_hotkey_collector),
_write_hotkey_collector(write_hotkey_collector)
{
dassert(_read_hotkey_collector != nullptr, "read hotkey collector is a nullptr");
dassert(_write_hotkey_collector != nullptr, "write hotkey collector is a nullptr");
_read_capacity_unit_size =
dsn_config_get_value_uint64("pegasus.server",
"perf_counter_read_capacity_unit_size",
4 * 1024,
"capacity unit size of read requests, default 4KB");
_write_capacity_unit_size =
dsn_config_get_value_uint64("pegasus.server",
"perf_counter_write_capacity_unit_size",
4 * 1024,
"capacity unit size of write requests, default 4KB");
dassert(powerof2(_read_capacity_unit_size),
"'perf_counter_read_capacity_unit_size' must be a power of 2");
dassert(powerof2(_write_capacity_unit_size),
"'perf_counter_write_capacity_unit_size' must be a power of 2");
_log_read_cu_size = log(_read_capacity_unit_size) / log(2);
_log_write_cu_size = log(_write_capacity_unit_size) / log(2);
std::string str_gpid = r->get_gpid().to_string();
char name[256];
snprintf(name, 255, "recent.read.cu@%s", str_gpid.c_str());
_pfc_recent_read_cu.init_app_counter("app.pegasus",
name,
COUNTER_TYPE_VOLATILE_NUMBER,
"statistic the recent read capacity units");
snprintf(name, 255, "recent.write.cu@%s", str_gpid.c_str());
_pfc_recent_write_cu.init_app_counter("app.pegasus",
name,
COUNTER_TYPE_VOLATILE_NUMBER,
"statistic the recent write capacity units");
snprintf(name, 255, "get_bytes@%s", str_gpid.c_str());
_pfc_get_bytes.init_app_counter(
"app.pegasus", name, COUNTER_TYPE_RATE, "statistic the get bytes");
snprintf(name, 255, "multi_get_bytes@%s", str_gpid.c_str());
_pfc_multi_get_bytes.init_app_counter(
"app.pegasus", name, COUNTER_TYPE_RATE, "statistic the multi get bytes");
snprintf(name, 255, "scan_bytes@%s", str_gpid.c_str());
_pfc_scan_bytes.init_app_counter(
"app.pegasus", name, COUNTER_TYPE_RATE, "statistic the scan bytes");
snprintf(name, 255, "put_bytes@%s", str_gpid.c_str());
_pfc_put_bytes.init_app_counter(
"app.pegasus", name, COUNTER_TYPE_RATE, "statistic the put bytes");
snprintf(name, 255, "multi_put_bytes@%s", str_gpid.c_str());
_pfc_multi_put_bytes.init_app_counter(
"app.pegasus", name, COUNTER_TYPE_RATE, "statistic the multi put bytes");
snprintf(name, 255, "check_and_set_bytes@%s", str_gpid.c_str());
_pfc_check_and_set_bytes.init_app_counter(
"app.pegasus", name, COUNTER_TYPE_RATE, "statistic the check and set bytes");
snprintf(name, 255, "check_and_mutate_bytes@%s", str_gpid.c_str());
_pfc_check_and_mutate_bytes.init_app_counter(
"app.pegasus", name, COUNTER_TYPE_RATE, "statistic the check and mutate bytes");
}
int64_t capacity_unit_calculator::add_read_cu(int64_t read_data_size)
{
int64_t read_cu = read_data_size > 0
? (read_data_size + _read_capacity_unit_size - 1) >> _log_read_cu_size
: 1;
_pfc_recent_read_cu->add(read_cu);
return read_cu;
}
int64_t capacity_unit_calculator::add_write_cu(int64_t write_data_size)
{
int64_t write_cu = write_data_size > 0
? (write_data_size + _write_capacity_unit_size - 1) >> _log_write_cu_size
: 1;
_pfc_recent_write_cu->add(write_cu);
return write_cu;
}
void capacity_unit_calculator::add_get_cu(int32_t status,
const dsn::blob &key,
const dsn::blob &value)
{
_pfc_get_bytes->add(key.size() + value.size());
if (status != rocksdb::Status::kOk && status != rocksdb::Status::kNotFound) {
return;
}
if (status == rocksdb::Status::kNotFound) {
add_read_cu(1);
_read_hotkey_collector->capture_raw_key(key, 1);
return;
}
add_read_cu(key.size() + value.size());
_read_hotkey_collector->capture_raw_key(key, 1);
}
void capacity_unit_calculator::add_multi_get_cu(int32_t status,
const dsn::blob &hash_key,
const std::vector<::dsn::apps::key_value> &kvs)
{
int64_t data_size = 0;
int64_t multi_get_bytes = 0;
for (const auto &kv : kvs) {
multi_get_bytes += kv.key.size() + kv.value.size();
data_size += hash_key.size() + kv.key.size() + kv.value.size();
}
_pfc_multi_get_bytes->add(hash_key.size() + multi_get_bytes);
if (status != rocksdb::Status::kOk && status != rocksdb::Status::kNotFound &&
status != rocksdb::Status::kIncomplete && status != rocksdb::Status::kInvalidArgument) {
return;
}
uint64_t key_count = kvs.size();
if (status == rocksdb::Status::kNotFound) {
add_read_cu(1);
_read_hotkey_collector->capture_hash_key(hash_key, key_count);
return;
}
add_read_cu(data_size);
_read_hotkey_collector->capture_hash_key(hash_key, key_count);
}
void capacity_unit_calculator::add_scan_cu(int32_t status,
const std::vector<::dsn::apps::key_value> &kvs)
{
if (status != rocksdb::Status::kOk && status != rocksdb::Status::kNotFound &&
status != rocksdb::Status::kIncomplete && status != rocksdb::Status::kInvalidArgument) {
return;
}
if (status == rocksdb::Status::kNotFound) {
add_read_cu(1);
return;
}
// TODO: (Tangyanzhao) hotkey detect in scan
int64_t data_size = 0;
for (const auto &kv : kvs) {
data_size += kv.key.size() + kv.value.size();
}
add_read_cu(data_size);
_pfc_scan_bytes->add(data_size);
}
void capacity_unit_calculator::add_sortkey_count_cu(int32_t status, const dsn::blob &hash_key)
{
if (status != rocksdb::Status::kOk && status != rocksdb::Status::kNotFound) {
return;
}
add_read_cu(1);
_read_hotkey_collector->capture_hash_key(hash_key, 1);
}
void capacity_unit_calculator::add_ttl_cu(int32_t status, const dsn::blob &key)
{
if (status != rocksdb::Status::kOk && status != rocksdb::Status::kNotFound) {
return;
}
add_read_cu(1);
_read_hotkey_collector->capture_raw_key(key, 1);
}
void capacity_unit_calculator::add_put_cu(int32_t status,
const dsn::blob &key,
const dsn::blob &value)
{
_pfc_put_bytes->add(key.size() + value.size());
if (status != rocksdb::Status::kOk) {
return;
}
add_write_cu(key.size() + value.size());
_write_hotkey_collector->capture_raw_key(key, 1);
}
void capacity_unit_calculator::add_remove_cu(int32_t status, const dsn::blob &key)
{
if (status != rocksdb::Status::kOk) {
return;
}
add_write_cu(key.size());
_write_hotkey_collector->capture_raw_key(key, 1);
}
void capacity_unit_calculator::add_multi_put_cu(int32_t status,
const dsn::blob &hash_key,
const std::vector<::dsn::apps::key_value> &kvs)
{
int64_t data_size = 0;
int64_t multi_put_bytes = 0;
for (const auto &kv : kvs) {
multi_put_bytes += kv.key.size() + kv.value.size();
data_size += hash_key.size() + kv.key.size() + kv.value.size();
}
_pfc_multi_put_bytes->add(hash_key.size() + multi_put_bytes);
uint64_t key_count = kvs.size();
_write_hotkey_collector->capture_hash_key(hash_key, key_count);
if (status != rocksdb::Status::kOk) {
return;
}
add_write_cu(data_size);
}
void capacity_unit_calculator::add_multi_remove_cu(int32_t status,
const dsn::blob &hash_key,
const std::vector<::dsn::blob> &sort_keys)
{
if (status != rocksdb::Status::kOk) {
return;
}
int64_t data_size = 0;
for (const auto &sort_key : sort_keys) {
data_size += hash_key.size() + sort_key.size();
}
uint64_t key_count = sort_keys.size();
_write_hotkey_collector->capture_hash_key(hash_key, key_count);
add_write_cu(data_size);
}
void capacity_unit_calculator::add_incr_cu(int32_t status, const dsn::blob &key)
{
if (status != rocksdb::Status::kOk && status != rocksdb::Status::kInvalidArgument) {
return;
}
if (status == rocksdb::Status::kOk) {
add_write_cu(1);
_write_hotkey_collector->capture_raw_key(key, 1);
}
add_read_cu(1);
_read_hotkey_collector->capture_raw_key(key, 1);
}
void capacity_unit_calculator::add_check_and_set_cu(int32_t status,
const dsn::blob &hash_key,
const dsn::blob &check_sort_key,
const dsn::blob &set_sort_key,
const dsn::blob &value)
{
_pfc_check_and_set_bytes->add(hash_key.size() + check_sort_key.size() + set_sort_key.size() +
value.size());
if (status != rocksdb::Status::kOk && status != rocksdb::Status::kInvalidArgument &&
status != rocksdb::Status::kTryAgain) {
return;
}
if (status == rocksdb::Status::kOk) {
add_write_cu(hash_key.size() + set_sort_key.size() + value.size());
_write_hotkey_collector->capture_hash_key(hash_key, 1);
}
add_read_cu(hash_key.size() + check_sort_key.size());
_read_hotkey_collector->capture_hash_key(hash_key, 1);
}
void capacity_unit_calculator::add_check_and_mutate_cu(
int32_t status,
const dsn::blob &hash_key,
const dsn::blob &check_sort_key,
const std::vector<::dsn::apps::mutate> &mutate_list)
{
int64_t data_size = 0;
int64_t check_and_mutate_bytes = 0;
for (const auto &m : mutate_list) {
check_and_mutate_bytes += m.sort_key.size() + m.value.size();
data_size += hash_key.size() + m.sort_key.size() + m.value.size();
}
_pfc_check_and_mutate_bytes->add(hash_key.size() + check_sort_key.size() +
check_and_mutate_bytes);
if (status != rocksdb::Status::kOk && status != rocksdb::Status::kInvalidArgument &&
status != rocksdb::Status::kTryAgain) {
return;
}
uint64_t key_count = mutate_list.size();
if (status == rocksdb::Status::kOk) {
add_write_cu(data_size);
_write_hotkey_collector->capture_hash_key(hash_key, key_count);
}
add_read_cu(hash_key.size() + check_sort_key.size());
_read_hotkey_collector->capture_hash_key(hash_key, 1);
}
} // namespace server
} // namespace pegasus