blob: 24db51445efc48d44664885d4e8bf51fdfdd7f23 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "pegasus_server_impl.h"
#include <unordered_map>
#include <dsn/utility/flags.h>
#include <rocksdb/filter_policy.h>
#include <dsn/dist/fmt_logging.h>
#include "capacity_unit_calculator.h"
#include "hashkey_transform.h"
#include "meta_store.h"
#include "pegasus_event_listener.h"
#include "pegasus_server_write.h"
#include "hotkey_collector.h"
namespace pegasus {
namespace server {
DSN_DEFINE_int64(
"pegasus.server",
rocksdb_limiter_max_write_megabytes_per_sec,
500,
"max rate of rocksdb flush and compaction(MB/s), if less than or equal to 0 means close limit");
DSN_DEFINE_bool("pegasus.server",
rocksdb_limiter_enable_auto_tune,
false,
"whether to enable write rate auto tune when open rocksdb write limit");
static const std::unordered_map<std::string, rocksdb::BlockBasedTableOptions::IndexType>
INDEX_TYPE_STRING_MAP = {
{"binary_search", rocksdb::BlockBasedTableOptions::IndexType::kBinarySearch},
{"hash_search", rocksdb::BlockBasedTableOptions::IndexType::kHashSearch},
{"two_level_index_search",
rocksdb::BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch},
{"binary_search_with_first_key",
rocksdb::BlockBasedTableOptions::IndexType::kBinarySearchWithFirstKey}};
pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
: pegasus_read_service(r),
_db(nullptr),
_data_cf(nullptr),
_meta_cf(nullptr),
_is_open(false),
_pegasus_data_version(data_version::VERSION_MAX),
_last_durable_decree(0),
_is_checkpointing(false),
_manual_compact_svc(this),
_partition_version(0)
{
_primary_address = dsn::rpc_address(dsn_primary_address()).to_string();
_gpid = get_gpid();
_read_hotkey_collector =
std::make_shared<hotkey_collector>(dsn::replication::hotkey_type::READ, this);
_write_hotkey_collector =
std::make_shared<hotkey_collector>(dsn::replication::hotkey_type::WRITE, this);
_verbose_log = dsn_config_get_value_bool("pegasus.server",
"rocksdb_verbose_log",
false,
"whether to print verbose log for debugging");
_slow_query_threshold_ns_in_config = dsn_config_get_value_uint64(
"pegasus.server",
"rocksdb_slow_query_threshold_ns",
100000000,
"get/multi-get operation duration exceed this threshold will be logged");
_slow_query_threshold_ns = _slow_query_threshold_ns_in_config;
dassert(_slow_query_threshold_ns > 0, "slow query threshold must be greater than 0");
_abnormal_get_size_threshold = dsn_config_get_value_uint64(
"pegasus.server",
"rocksdb_abnormal_get_size_threshold",
1000000,
"get operation value size exceed this threshold will be logged, 0 means no check");
_abnormal_multi_get_size_threshold =
dsn_config_get_value_uint64("pegasus.server",
"rocksdb_abnormal_multi_get_size_threshold",
10000000,
"multi-get operation total key-value size exceed this "
"threshold will be logged, 0 means no check");
_abnormal_multi_get_iterate_count_threshold = dsn_config_get_value_uint64(
"pegasus.server",
"rocksdb_abnormal_multi_get_iterate_count_threshold",
1000,
"multi-get operation iterate count exceed this threshold will be logged, 0 means no check");
_rng_rd_opts.multi_get_max_iteration_count = (uint32_t)dsn_config_get_value_uint64(
"pegasus.server",
"rocksdb_multi_get_max_iteration_count",
3000,
"max iteration count for each range read for multi-get operation, if "
"exceed this threshold,"
"iterator will be stopped");
_rng_rd_opts.multi_get_max_iteration_size =
dsn_config_get_value_uint64("pegasus.server",
"rocksdb_multi_get_max_iteration_size",
30 << 20,
"multi-get operation total key-value size exceed "
"this threshold will stop iterating rocksdb, 0 means no check");
_rng_rd_opts.rocksdb_max_iteration_count =
(uint32_t)dsn_config_get_value_uint64("pegasus.server",
"rocksdb_max_iteration_count",
1000,
"max iteration count for each range "
"read, if exceed this threshold, "
"iterator will be stopped");
_rng_rd_opts.rocksdb_iteration_threshold_time_ms_in_config = dsn_config_get_value_uint64(
"pegasus.server",
"rocksdb_iteration_threshold_time_ms",
30000,
"max duration for handling one pegasus scan request(sortkey_count/multiget/scan) if exceed "
"this threshold, iterator will be stopped, 0 means no check");
_rng_rd_opts.rocksdb_iteration_threshold_time_ms =
_rng_rd_opts.rocksdb_iteration_threshold_time_ms_in_config;
// init rocksdb::DBOptions
_db_opts.create_if_missing = true;
// atomic flush data CF and meta CF, aim to keep consistency of 'last flushed decree' in meta CF
// and data in data CF.
_db_opts.atomic_flush = true;
_db_opts.use_direct_reads = dsn_config_get_value_bool(
"pegasus.server", "rocksdb_use_direct_reads", false, "rocksdb options.use_direct_reads");
_db_opts.use_direct_io_for_flush_and_compaction =
dsn_config_get_value_bool("pegasus.server",
"rocksdb_use_direct_io_for_flush_and_compaction",
false,
"rocksdb options.use_direct_io_for_flush_and_compaction");
_db_opts.compaction_readahead_size =
dsn_config_get_value_uint64("pegasus.server",
"rocksdb_compaction_readahead_size",
2 * 1024 * 1024,
"rocksdb options.compaction_readahead_size");
_db_opts.writable_file_max_buffer_size =
dsn_config_get_value_uint64("pegasus.server",
"rocksdb_writable_file_max_buffer_size",
1024 * 1024,
"rocksdb options.writable_file_max_buffer_size");
_statistics = rocksdb::CreateDBStatistics();
_statistics->set_stats_level(rocksdb::kExceptDetailedTimers);
_db_opts.statistics = _statistics;
_db_opts.listeners.emplace_back(new pegasus_event_listener(this));
// flush threads are shared among all rocksdb instances in one process.
_db_opts.max_background_flushes =
(int)dsn_config_get_value_int64("pegasus.server",
"rocksdb_max_background_flushes",
4,
"rocksdb options.max_background_flushes");
// compaction threads are shared among all rocksdb instances in one process.
_db_opts.max_background_compactions =
(int)dsn_config_get_value_int64("pegasus.server",
"rocksdb_max_background_compactions",
12,
"rocksdb options.max_background_compactions");
// init rocksdb::ColumnFamilyOptions for data column family
_data_cf_opts.write_buffer_size =
(size_t)dsn_config_get_value_uint64("pegasus.server",
"rocksdb_write_buffer_size",
64 * 1024 * 1024,
"rocksdb options.write_buffer_size");
_data_cf_opts.max_write_buffer_number =
(int)dsn_config_get_value_int64("pegasus.server",
"rocksdb_max_write_buffer_number",
3,
"rocksdb options.max_write_buffer_number");
_data_cf_opts.num_levels = (int)dsn_config_get_value_int64(
"pegasus.server", "rocksdb_num_levels", 6, "rocksdb options.num_levels");
_data_cf_opts.target_file_size_base =
dsn_config_get_value_uint64("pegasus.server",
"rocksdb_target_file_size_base",
64 * 1024 * 1024,
"rocksdb options.target_file_size_base");
_data_cf_opts.target_file_size_multiplier =
(int)dsn_config_get_value_int64("pegasus.server",
"rocksdb_target_file_size_multiplier",
1,
"rocksdb options.target_file_size_multiplier");
_data_cf_opts.max_bytes_for_level_base =
dsn_config_get_value_uint64("pegasus.server",
"rocksdb_max_bytes_for_level_base",
10 * 64 * 1024 * 1024,
"rocksdb options.max_bytes_for_level_base");
_data_cf_opts.max_bytes_for_level_multiplier =
dsn_config_get_value_double("pegasus.server",
"rocksdb_max_bytes_for_level_multiplier",
10,
"rocksdb options.rocksdb_max_bytes_for_level_multiplier");
// we need set max_compaction_bytes definitely because set_usage_scenario() depends on it.
_data_cf_opts.max_compaction_bytes = _data_cf_opts.target_file_size_base * 25;
_data_cf_opts.level0_file_num_compaction_trigger =
(int)dsn_config_get_value_int64("pegasus.server",
"rocksdb_level0_file_num_compaction_trigger",
4,
"rocksdb options.level0_file_num_compaction_trigger");
_data_cf_opts.level0_slowdown_writes_trigger = (int)dsn_config_get_value_int64(
"pegasus.server",
"rocksdb_level0_slowdown_writes_trigger",
30,
"rocksdb options.level0_slowdown_writes_trigger, default 30");
_data_cf_opts.level0_stop_writes_trigger =
(int)dsn_config_get_value_int64("pegasus.server",
"rocksdb_level0_stop_writes_trigger",
60,
"rocksdb options.level0_stop_writes_trigger");
std::string compression_str = dsn_config_get_value_string(
"pegasus.server",
"rocksdb_compression_type",
"lz4",
"rocksdb options.compression. Available config: '[none|snappy|zstd|lz4]' "
"for all level 2 and higher levels, and "
"'per_level:[none|snappy|zstd|lz4],[none|snappy|zstd|lz4],...' for each level 0,1,..., the "
"last compression type will be used for levels not specified in the list.");
dassert(parse_compression_types(compression_str, _data_cf_opts.compression_per_level),
"parse rocksdb_compression_type failed.");
_meta_cf_opts = _data_cf_opts;
// Set level0_file_num_compaction_trigger of meta CF as 10 to reduce frequent compaction.
_meta_cf_opts.level0_file_num_compaction_trigger = 10;
// Data in meta CF is very little, disable compression to save CPU load.
dassert(parse_compression_types("none", _meta_cf_opts.compression_per_level),
"parse rocksdb_compression_type failed.");
rocksdb::BlockBasedTableOptions tbl_opts;
if (dsn_config_get_value_bool("pegasus.server",
"rocksdb_disable_table_block_cache",
false,
"rocksdb tbl_opts.no_block_cache")) {
tbl_opts.no_block_cache = true;
tbl_opts.block_restart_interval = 4;
} else {
// If block cache is enabled, all replicas on this server will share the same block cache
// object. It's convenient to control the total memory used by this server, and the LRU
// algorithm used by the block cache object can be more efficient in this way.
static std::once_flag flag;
std::call_once(flag, [&]() {
uint64_t capacity = dsn_config_get_value_uint64(
"pegasus.server",
"rocksdb_block_cache_capacity",
10 * 1024 * 1024 * 1024ULL,
"block cache capacity for one pegasus server, shared by all rocksdb instances");
// block cache num shard bits, default -1(auto)
int num_shard_bits = (int)dsn_config_get_value_int64(
"pegasus.server",
"rocksdb_block_cache_num_shard_bits",
-1,
"block cache will be sharded into 2^num_shard_bits shards");
// init block cache
_s_block_cache = rocksdb::NewLRUCache(capacity, num_shard_bits);
});
// every replica has the same block cache
tbl_opts.block_cache = _s_block_cache;
}
// FLAGS_rocksdb_limiter_max_write_megabytes_per_sec <= 0 means close the rate limit.
// For more detail arguments see
// https://github.com/facebook/rocksdb/blob/v6.6.4/include/rocksdb/rate_limiter.h#L111-L137
if (FLAGS_rocksdb_limiter_max_write_megabytes_per_sec > 0) {
static std::once_flag flag;
std::call_once(flag, [&]() {
_s_rate_limiter = std::shared_ptr<rocksdb::RateLimiter>(rocksdb::NewGenericRateLimiter(
FLAGS_rocksdb_limiter_max_write_megabytes_per_sec << 20,
100 * 1000, // refill_period_us
10, // fairness
rocksdb::RateLimiter::Mode::kWritesOnly,
FLAGS_rocksdb_limiter_enable_auto_tune));
});
_db_opts.rate_limiter = _s_rate_limiter;
}
bool enable_write_buffer_manager =
dsn_config_get_value_bool("pegasus.server",
"rocksdb_enable_write_buffer_manager",
false,
"enable write buffer manager to limit total memory "
"used by memtables and block caches across multiple replicas");
ddebug_replica("rocksdb_enable_write_buffer_manager = {}", enable_write_buffer_manager);
if (enable_write_buffer_manager) {
// If write buffer manager is enabled, all replicas(one DB instance for each
// replica) on this server will share the same write buffer manager object,
// thus the same block cache object. It's convenient to control the total memory
// of memtables and block caches used by this server.
//
// While write buffer manager is enabled, total_size_across_write_buffer = 0
// indicates no limit on memory, for details see:
// https://github.com/facebook/rocksdb/blob/v6.6.4/include/rocksdb/write_buffer_manager.h#L23-24
static std::once_flag flag;
std::call_once(flag, [&]() {
uint64_t total_size_across_write_buffer = dsn_config_get_value_uint64(
"pegasus.server",
"rocksdb_total_size_across_write_buffer",
0,
"total size limit used by memtables across multiple replicas");
ddebug_replica("rocksdb_total_size_across_write_buffer = {}",
total_size_across_write_buffer);
_s_write_buffer_manager = std::make_shared<rocksdb::WriteBufferManager>(
static_cast<size_t>(total_size_across_write_buffer), tbl_opts.block_cache);
});
_db_opts.write_buffer_manager = _s_write_buffer_manager;
}
int64_t max_open_files = dsn_config_get_value_int64(
"pegasus.server",
"rocksdb_max_open_files",
-1, /* always keep files opened, default by rocksdb */
"number of opened files that can be used by a replica(namely a DB instance)");
_db_opts.max_open_files = static_cast<int>(max_open_files);
ddebug_replica("rocksdb_max_open_files = {}", _db_opts.max_open_files);
std::string index_type =
dsn_config_get_value_string("pegasus.server",
"rocksdb_index_type",
"binary_search",
"The index type that will be used for this table.");
auto index_type_item = INDEX_TYPE_STRING_MAP.find(index_type);
dassert(index_type_item != INDEX_TYPE_STRING_MAP.end(),
"[pegasus.server]rocksdb_index_type should be one among binary_search, "
"hash_search, two_level_index_search or binary_search_with_first_key.");
tbl_opts.index_type = index_type_item->second;
ddebug_replica("rocksdb_index_type = {}", index_type.c_str());
tbl_opts.partition_filters = dsn_config_get_value_bool(
"pegasus.server",
"rocksdb_partition_filters",
false,
"Note: currently this option requires two_level_index_search to be set as well. "
"Use partitioned full filters for each SST file. This option is "
"incompatibile with block-based filters.");
ddebug_replica("rocksdb_partition_filters = {}", tbl_opts.partition_filters);
tbl_opts.metadata_block_size = dsn_config_get_value_uint64(
"pegasus.server",
"rocksdb_metadata_block_size",
4096,
"Block size for partitioned metadata. Currently applied to indexes when "
"two_level_index_search is used and to filters when partition_filters is used. "
"Note: Since in the current implementation the filters and index partitions "
"are aligned, an index/filter block is created when either index or filter "
"block size reaches the specified limit. "
"Note: this limit is currently applied to only index blocks; a filter "
"partition is cut right after an index block is cut");
ddebug_replica("rocksdb_metadata_block_size = {}", tbl_opts.metadata_block_size);
tbl_opts.cache_index_and_filter_blocks = dsn_config_get_value_bool(
"pegasus.server",
"rocksdb_cache_index_and_filter_blocks",
false,
"Indicating if we'd put index/filter blocks to the block cache. "
"If not specified, each \"table reader\" object will pre-load index/filter "
"block during table initialization.");
ddebug_replica("rocksdb_cache_index_and_filter_blocks = {}",
tbl_opts.cache_index_and_filter_blocks);
tbl_opts.pin_top_level_index_and_filter = dsn_config_get_value_bool(
"pegasus.server",
"rocksdb_pin_top_level_index_and_filter",
true,
"If cache_index_and_filter_blocks is true and the below is true, then "
"the top-level index of partitioned filter and index blocks are stored in "
"the cache, but a reference is held in the \"table reader\" object so the "
"blocks are pinned and only evicted from cache when the table reader is "
"freed. This is not limited to l0 in LSM tree.");
ddebug_replica("rocksdb_pin_top_level_index_and_filter = {}",
tbl_opts.pin_top_level_index_and_filter);
tbl_opts.cache_index_and_filter_blocks_with_high_priority = dsn_config_get_value_bool(
"pegasus.server",
"rocksdb_cache_index_and_filter_blocks_with_high_priority",
true,
"If cache_index_and_filter_blocks is enabled, cache index and filter "
"blocks with high priority. If set to true, depending on implementation of "
"block cache, index and filter blocks may be less likely to be evicted "
"than data blocks.");
ddebug_replica("rocksdb_cache_index_and_filter_blocks_with_high_priority = {}",
tbl_opts.cache_index_and_filter_blocks_with_high_priority);
tbl_opts.pin_l0_filter_and_index_blocks_in_cache = dsn_config_get_value_bool(
"pegasus.server",
"rocksdb_pin_l0_filter_and_index_blocks_in_cache",
false,
"if cache_index_and_filter_blocks is true and the below is true, then "
"filter and index blocks are stored in the cache, but a reference is "
"held in the \"table reader\" object so the blocks are pinned and only "
"evicted from cache when the table reader is freed.");
ddebug_replica("rocksdb_pin_l0_filter_and_index_blocks_in_cache = {}",
tbl_opts.pin_l0_filter_and_index_blocks_in_cache);
// Bloom filter configurations.
bool disable_bloom_filter = dsn_config_get_value_bool(
"pegasus.server", "rocksdb_disable_bloom_filter", false, "Whether to disable bloom filter");
if (!disable_bloom_filter) {
// average bits allocated per key in bloom filter.
// bits_per_key | false positive rate
// | format_version < 5 | format_version = 5
// 6 5.70953 5.69888
// 8 2.45766 2.29709
// 10 1.13977 0.959254
// 12 0.662498 0.411593
// 16 0.353023 0.0873754
// 24 0.261552 0.0060971
// 50 0.225453 ~0.00003
// Recommend using no more than three decimal digits after the decimal point, as in 6.667.
// More details: https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter
double bits_per_key =
dsn_config_get_value_double("pegasus.server",
"rocksdb_bloom_filter_bits_per_key",
10,
"average bits allocated per key in bloom filter");
// COMPATIBILITY ATTENTION:
// Although old releases would see the new structure as corrupt filter data and read the
// table as if there's no filter, we've decided only to enable the new Bloom filter with new
// format_version=5. This provides a smooth path for automatic adoption over time, with an
// option for early opt-in.
// Reference from rocksdb commit:
// https://github.com/facebook/rocksdb/commit/f059c7d9b96300091e07429a60f4ad55dac84859
int format_version =
(int)dsn_config_get_value_int64("pegasus.server",
"rocksdb_format_version",
2,
"block based table data format version, "
"only 2 and 5 is supported in Pegasus. "
"2 is the old version, 5 is the new "
"version supported since rocksdb "
"v6.6.4");
dassert(format_version == 2 || format_version == 5,
"[pegasus.server]rocksdb_format_version should be either '2' or '5'.");
tbl_opts.format_version = format_version;
tbl_opts.filter_policy.reset(rocksdb::NewBloomFilterPolicy(bits_per_key, false));
std::string filter_type =
dsn_config_get_value_string("pegasus.server",
"rocksdb_filter_type",
"prefix",
"Bloom filter type, should be either 'common' or 'prefix'");
dassert(filter_type == "common" || filter_type == "prefix",
"[pegasus.server]rocksdb_filter_type should be either 'common' or 'prefix'.");
if (filter_type == "prefix") {
_data_cf_opts.prefix_extractor.reset(new HashkeyTransform());
_data_cf_opts.memtable_prefix_bloom_size_ratio = 0.1;
_data_cf_rd_opts.prefix_same_as_start = true;
}
}
_data_cf_opts.table_factory.reset(NewBlockBasedTableFactory(tbl_opts));
_meta_cf_opts.table_factory.reset(NewBlockBasedTableFactory(tbl_opts));
_key_ttl_compaction_filter_factory = std::make_shared<KeyWithTTLCompactionFilterFactory>();
_data_cf_opts.compaction_filter_factory = _key_ttl_compaction_filter_factory;
// get the checkpoint reserve options.
_checkpoint_reserve_min_count_in_config = (uint32_t)dsn_config_get_value_uint64(
"pegasus.server", "checkpoint_reserve_min_count", 2, "checkpoint_reserve_min_count");
_checkpoint_reserve_min_count = _checkpoint_reserve_min_count_in_config;
_checkpoint_reserve_time_seconds_in_config =
(uint32_t)dsn_config_get_value_uint64("pegasus.server",
"checkpoint_reserve_time_seconds",
1800,
"checkpoint_reserve_time_seconds, 0 means no check");
_checkpoint_reserve_time_seconds = _checkpoint_reserve_time_seconds_in_config;
_update_rdb_stat_interval = std::chrono::seconds(dsn_config_get_value_uint64(
"pegasus.server", "update_rdb_stat_interval", 600, "update_rdb_stat_interval, in seconds"));
// TODO: move the qps/latency counters and it's statistics to replication_app_base layer
std::string str_gpid = _gpid.to_string();
char name[256];
// register the perf counters
snprintf(name, 255, "get_qps@%s", str_gpid.c_str());
_pfc_get_qps.init_app_counter(
"app.pegasus", name, COUNTER_TYPE_RATE, "statistic the qps of GET request");
snprintf(name, 255, "multi_get_qps@%s", str_gpid.c_str());
_pfc_multi_get_qps.init_app_counter(
"app.pegasus", name, COUNTER_TYPE_RATE, "statistic the qps of MULTI_GET request");
snprintf(name, 255, "scan_qps@%s", str_gpid.c_str());
_pfc_scan_qps.init_app_counter(
"app.pegasus", name, COUNTER_TYPE_RATE, "statistic the qps of SCAN request");
snprintf(name, 255, "get_latency@%s", str_gpid.c_str());
_pfc_get_latency.init_app_counter("app.pegasus",
name,
COUNTER_TYPE_NUMBER_PERCENTILES,
"statistic the latency of GET request");
snprintf(name, 255, "multi_get_latency@%s", str_gpid.c_str());
_pfc_multi_get_latency.init_app_counter("app.pegasus",
name,
COUNTER_TYPE_NUMBER_PERCENTILES,
"statistic the latency of MULTI_GET request");
snprintf(name, 255, "scan_latency@%s", str_gpid.c_str());
_pfc_scan_latency.init_app_counter("app.pegasus",
name,
COUNTER_TYPE_NUMBER_PERCENTILES,
"statistic the latency of SCAN request");
snprintf(name, 255, "recent.expire.count@%s", str_gpid.c_str());
_pfc_recent_expire_count.init_app_counter("app.pegasus",
name,
COUNTER_TYPE_VOLATILE_NUMBER,
"statistic the recent expired value read count");
snprintf(name, 255, "recent.filter.count@%s", str_gpid.c_str());
_pfc_recent_filter_count.init_app_counter("app.pegasus",
name,
COUNTER_TYPE_VOLATILE_NUMBER,
"statistic the recent filtered value read count");
snprintf(name, 255, "recent.abnormal.count@%s", str_gpid.c_str());
_pfc_recent_abnormal_count.init_app_counter("app.pegasus",
name,
COUNTER_TYPE_VOLATILE_NUMBER,
"statistic the recent abnormal read count");
snprintf(name, 255, "disk.storage.sst.count@%s", str_gpid.c_str());
_pfc_rdb_sst_count.init_app_counter(
"app.pegasus", name, COUNTER_TYPE_NUMBER, "statistic the count of sstable files");
snprintf(name, 255, "disk.storage.sst(MB)@%s", str_gpid.c_str());
_pfc_rdb_sst_size.init_app_counter(
"app.pegasus", name, COUNTER_TYPE_NUMBER, "statistic the size of sstable files");
snprintf(name, 255, "rdb.block_cache.hit_count@%s", str_gpid.c_str());
_pfc_rdb_block_cache_hit_count.init_app_counter(
"app.pegasus", name, COUNTER_TYPE_NUMBER, "statistic the hit count of rocksdb block cache");
snprintf(name, 255, "rdb.block_cache.total_count@%s", str_gpid.c_str());
_pfc_rdb_block_cache_total_count.init_app_counter(
"app.pegasus",
name,
COUNTER_TYPE_NUMBER,
"statistic the total count of rocksdb block cache");
// These counters are singletons on this server shared by all replicas, so we initialize
// them only once.
static std::once_flag flag;
std::call_once(flag, [&]() {
_pfc_rdb_block_cache_mem_usage.init_global_counter(
"replica",
"app.pegasus",
"rdb.block_cache.memory_usage",
COUNTER_TYPE_NUMBER,
"statistic the memory usage of rocksdb block cache");
_pfc_rdb_write_limiter_rate_bytes.init_global_counter(
"replica",
"app.pegasus",
"rdb.write_limiter_rate_bytes",
COUNTER_TYPE_NUMBER,
"statistic the through bytes of rocksdb write rate limiter");
});
snprintf(name, 255, "rdb.index_and_filter_blocks.memory_usage@%s", str_gpid.c_str());
_pfc_rdb_index_and_filter_blocks_mem_usage.init_app_counter(
"app.pegasus",
name,
COUNTER_TYPE_NUMBER,
"statistic the memory usage of rocksdb index and filter blocks");
snprintf(name, 255, "rdb.memtable.memory_usage@%s", str_gpid.c_str());
_pfc_rdb_memtable_mem_usage.init_app_counter(
"app.pegasus", name, COUNTER_TYPE_NUMBER, "statistic the memory usage of rocksdb memtable");
snprintf(name, 255, "rdb.estimate_num_keys@%s", str_gpid.c_str());
_pfc_rdb_estimate_num_keys.init_app_counter(
"app.pegasus",
name,
COUNTER_TYPE_NUMBER,
"statistics the estimated number of keys inside the rocksdb");
snprintf(name, 255, "rdb.bf_seek_negatives@%s", str_gpid.c_str());
_pfc_rdb_bf_seek_negatives.init_app_counter("app.pegasus",
name,
COUNTER_TYPE_NUMBER,
"statistics the number of times bloom filter was "
"checked before creating iterator on a file and "
"useful in avoiding iterator creation (and thus "
"likely IOPs)");
snprintf(name, 255, "rdb.bf_seek_total@%s", str_gpid.c_str());
_pfc_rdb_bf_seek_total.init_app_counter("app.pegasus",
name,
COUNTER_TYPE_NUMBER,
"statistics the number of times bloom filter was "
"checked before creating iterator on a file");
snprintf(name, 255, "rdb.bf_point_positive_true@%s", str_gpid.c_str());
_pfc_rdb_bf_point_positive_true.init_app_counter(
"app.pegasus",
name,
COUNTER_TYPE_NUMBER,
"statistics the number of times bloom filter has avoided file reads, i.e., negatives");
snprintf(name, 255, "rdb.bf_point_positive_total@%s", str_gpid.c_str());
_pfc_rdb_bf_point_positive_total.init_app_counter(
"app.pegasus",
name,
COUNTER_TYPE_NUMBER,
"statistics the number of times bloom FullFilter has not avoided the reads");
snprintf(name, 255, "rdb.bf_point_negatives@%s", str_gpid.c_str());
_pfc_rdb_bf_point_negatives.init_app_counter("app.pegasus",
name,
COUNTER_TYPE_NUMBER,
"statistics the number of times bloom FullFilter "
"has not avoided the reads and data actually "
"exist");
}
} // namespace server
} // namespace pegasus