blob: 0779b6c035568c3d11ff61d056a9b1cfc8f6beb5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "pegasus_server_impl.h"
#include <algorithm>
#include <boost/lexical_cast.hpp>
#include <rocksdb/convenience.h>
#include <rocksdb/utilities/checkpoint.h>
#include <rocksdb/utilities/options_util.h>
#include <dsn/utility/chrono_literals.h>
#include <dsn/utility/utils.h>
#include <dsn/utility/filesystem.h>
#include <dsn/utility/string_conv.h>
#include <dsn/dist/fmt_logging.h>
#include <dsn/dist/replication/replication.codes.h>
#include <dsn/utility/flags.h>
#include "base/pegasus_key_schema.h"
#include "base/pegasus_value_schema.h"
#include "base/pegasus_utils.h"
#include "capacity_unit_calculator.h"
#include "pegasus_server_write.h"
#include "meta_store.h"
#include "hotkey_collector.h"
using namespace dsn::literals::chrono_literals;
namespace pegasus {
namespace server {
DEFINE_TASK_CODE(LPC_PEGASUS_SERVER_DELAY, TASK_PRIORITY_COMMON, ::dsn::THREAD_POOL_DEFAULT)
DSN_DECLARE_int32(read_amp_bytes_per_bit);
DSN_DEFINE_int32("pegasus.server",
hotkey_analyse_time_interval_s,
10,
"hotkey analyse interval in seconds");
static std::string chkpt_get_dir_name(int64_t decree)
{
char buffer[256];
sprintf(buffer, "checkpoint.%" PRId64 "", decree);
return std::string(buffer);
}
static bool chkpt_init_from_dir(const char *name, int64_t &decree)
{
return 1 == sscanf(name, "checkpoint.%" PRId64 "", &decree) &&
std::string(name) == chkpt_get_dir_name(decree);
}
std::shared_ptr<rocksdb::RateLimiter> pegasus_server_impl::_s_rate_limiter;
int64_t pegasus_server_impl::_rocksdb_limiter_last_total_through;
std::shared_ptr<rocksdb::Cache> pegasus_server_impl::_s_block_cache;
std::shared_ptr<rocksdb::WriteBufferManager> pegasus_server_impl::_s_write_buffer_manager;
::dsn::task_ptr pegasus_server_impl::_update_server_rdb_stat;
::dsn::perf_counter_wrapper pegasus_server_impl::_pfc_rdb_block_cache_mem_usage;
::dsn::perf_counter_wrapper pegasus_server_impl::_pfc_rdb_write_limiter_rate_bytes;
const std::string pegasus_server_impl::COMPRESSION_HEADER = "per_level:";
const std::string pegasus_server_impl::DATA_COLUMN_FAMILY_NAME = "default";
const std::string pegasus_server_impl::META_COLUMN_FAMILY_NAME = "pegasus_meta_cf";
const std::chrono::seconds pegasus_server_impl::kServerStatUpdateTimeSec = std::chrono::seconds(10);
void pegasus_server_impl::parse_checkpoints()
{
std::vector<std::string> dirs;
::dsn::utils::filesystem::get_subdirectories(data_dir(), dirs, false);
::dsn::utils::auto_lock<::dsn::utils::ex_lock_nr> l(_checkpoints_lock);
_checkpoints.clear();
for (auto &d : dirs) {
int64_t ci;
std::string d1 = d.substr(data_dir().size() + 1);
if (chkpt_init_from_dir(d1.c_str(), ci)) {
_checkpoints.push_back(ci);
} else if (d1.find("checkpoint") != std::string::npos) {
ddebug("%s: invalid checkpoint directory %s, remove it", replica_name(), d.c_str());
::dsn::utils::filesystem::remove_path(d);
if (!::dsn::utils::filesystem::remove_path(d)) {
derror(
"%s: remove invalid checkpoint directory %s failed", replica_name(), d.c_str());
}
}
}
if (!_checkpoints.empty()) {
std::sort(_checkpoints.begin(), _checkpoints.end());
set_last_durable_decree(_checkpoints.back());
} else {
set_last_durable_decree(0);
}
}
pegasus_server_impl::~pegasus_server_impl()
{
if (_is_open) {
dassert(_db != nullptr, "");
release_db();
}
}
void pegasus_server_impl::gc_checkpoints(bool force_reserve_one)
{
int min_count = force_reserve_one ? 1 : _checkpoint_reserve_min_count;
uint64_t reserve_time = force_reserve_one ? 0 : _checkpoint_reserve_time_seconds;
std::deque<int64_t> temp_list;
{
::dsn::utils::auto_lock<::dsn::utils::ex_lock_nr> l(_checkpoints_lock);
if (_checkpoints.size() <= min_count)
return;
temp_list = _checkpoints;
}
// find the max checkpoint which can be deleted
int64_t max_del_d = -1;
uint64_t current_time = dsn_now_ms() / 1000;
for (int i = 0; i < temp_list.size(); ++i) {
if (i + min_count >= temp_list.size())
break;
int64_t d = temp_list[i];
if (reserve_time > 0) {
// we check last write time of "CURRENT" instead of directory, because the directory's
// last write time may be updated by previous incompleted garbage collection.
auto cpt_dir =
::dsn::utils::filesystem::path_combine(data_dir(), chkpt_get_dir_name(d));
auto current_file = ::dsn::utils::filesystem::path_combine(cpt_dir, "CURRENT");
if (!::dsn::utils::filesystem::file_exists(current_file)) {
max_del_d = d;
continue;
}
time_t tm;
if (!dsn::utils::filesystem::last_write_time(current_file, tm)) {
dwarn("get last write time of file %s failed", current_file.c_str());
break;
}
auto last_write_time = (uint64_t)tm;
if (last_write_time + reserve_time >= current_time) {
// not expired
break;
}
}
max_del_d = d;
}
if (max_del_d == -1) {
// no checkpoint to delete
ddebug("%s: no checkpoint to garbage collection, checkpoints_count = %d",
replica_name(),
(int)temp_list.size());
return;
}
std::list<int64_t> to_delete_list;
int64_t min_d = 0;
int64_t max_d = 0;
int checkpoints_count = 0;
{
::dsn::utils::auto_lock<::dsn::utils::ex_lock_nr> l(_checkpoints_lock);
int delete_max_index = -1;
for (int i = 0; i < _checkpoints.size(); ++i) {
int64_t del_d = _checkpoints[i];
if (i + min_count >= _checkpoints.size() || del_d > max_del_d)
break;
to_delete_list.push_back(del_d);
delete_max_index = i;
}
if (delete_max_index >= 0) {
_checkpoints.erase(_checkpoints.begin(), _checkpoints.begin() + delete_max_index + 1);
}
if (!_checkpoints.empty()) {
min_d = _checkpoints.front();
max_d = _checkpoints.back();
checkpoints_count = _checkpoints.size();
} else {
min_d = 0;
max_d = 0;
checkpoints_count = 0;
}
}
// do delete
std::list<int64_t> put_back_list;
for (auto &del_d : to_delete_list) {
auto cpt_dir =
::dsn::utils::filesystem::path_combine(data_dir(), chkpt_get_dir_name(del_d));
if (::dsn::utils::filesystem::directory_exists(cpt_dir)) {
if (::dsn::utils::filesystem::remove_path(cpt_dir)) {
ddebug("%s: checkpoint directory %s removed by garbage collection",
replica_name(),
cpt_dir.c_str());
} else {
derror("%s: checkpoint directory %s remove failed by garbage collection",
replica_name(),
cpt_dir.c_str());
put_back_list.push_back(del_d);
}
} else {
ddebug("%s: checkpoint directory %s does not exist, ignored by garbage collection",
replica_name(),
cpt_dir.c_str());
}
}
// put back checkpoints which is not deleted, to make it delete again in the next gc time.
// ATTENTION: the put back checkpoint may be incomplete, which will cause failure on load. But
// it would not cause data lost, because incomplete checkpoint can not be loaded successfully.
if (!put_back_list.empty()) {
::dsn::utils::auto_lock<::dsn::utils::ex_lock_nr> l(_checkpoints_lock);
if (_checkpoints.empty() || put_back_list.back() < _checkpoints.front()) {
// just insert to front will hold the order
_checkpoints.insert(_checkpoints.begin(), put_back_list.begin(), put_back_list.end());
} else {
// need to re-sort
_checkpoints.insert(_checkpoints.begin(), put_back_list.begin(), put_back_list.end());
std::sort(_checkpoints.begin(), _checkpoints.end());
}
if (!_checkpoints.empty()) {
min_d = _checkpoints.front();
max_d = _checkpoints.back();
checkpoints_count = _checkpoints.size();
} else {
min_d = 0;
max_d = 0;
checkpoints_count = 0;
}
}
ddebug("%s: after checkpoint garbage collection, checkpoints_count = %d, "
"min_checkpoint = %" PRId64 ", max_checkpoint = %" PRId64,
replica_name(),
checkpoints_count,
min_d,
max_d);
}
int pegasus_server_impl::on_batched_write_requests(int64_t decree,
uint64_t timestamp,
dsn::message_ex **requests,
int count)
{
dassert(_is_open, "");
dassert(requests != nullptr, "");
return _server_write->on_batched_write_requests(requests, count, decree, timestamp);
}
void pegasus_server_impl::on_get(get_rpc rpc)
{
dassert(_is_open, "");
_pfc_get_qps->increment();
uint64_t start_time = dsn_now_ns();
const auto &key = rpc.request();
auto &resp = rpc.response();
resp.app_id = _gpid.get_app_id();
resp.partition_index = _gpid.get_partition_index();
resp.server = _primary_address;
rocksdb::Slice skey(key.data(), key.length());
std::string value;
rocksdb::Status status = _db->Get(_data_cf_rd_opts, _data_cf, skey, &value);
if (status.ok()) {
if (check_if_record_expired(utils::epoch_now(), value)) {
_pfc_recent_expire_count->increment();
if (_verbose_log) {
derror("%s: rocksdb data expired for get from %s",
replica_name(),
rpc.remote_address().to_string());
}
status = rocksdb::Status::NotFound();
}
}
if (!status.ok()) {
if (_verbose_log) {
::dsn::blob hash_key, sort_key;
pegasus_restore_key(key, hash_key, sort_key);
derror("%s: rocksdb get failed for get from %s: "
"hash_key = \"%s\", sort_key = \"%s\", error = %s",
replica_name(),
rpc.remote_address().to_string(),
::pegasus::utils::c_escape_string(hash_key).c_str(),
::pegasus::utils::c_escape_string(sort_key).c_str(),
status.ToString().c_str());
} else if (!status.IsNotFound()) {
derror("%s: rocksdb get failed for get from %s: error = %s",
replica_name(),
rpc.remote_address().to_string(),
status.ToString().c_str());
}
}
#ifdef PEGASUS_UNIT_TEST
// sleep 10ms for unit test,
// so when we set slow_query_threshold <= 10ms, it will be a slow query
usleep(10 * 1000);
#endif
uint64_t time_used = dsn_now_ns() - start_time;
if (is_get_abnormal(time_used, value.size())) {
::dsn::blob hash_key, sort_key;
pegasus_restore_key(key, hash_key, sort_key);
dwarn_replica("rocksdb abnormal get from {}: "
"hash_key = {}, sort_key = {}, return = {}, "
"value_size = {}, time_used = {} ns",
rpc.remote_address().to_string(),
::pegasus::utils::c_escape_string(hash_key),
::pegasus::utils::c_escape_string(sort_key),
status.ToString(),
value.size(),
time_used);
_pfc_recent_abnormal_count->increment();
}
resp.error = status.code();
if (status.ok()) {
pegasus_extract_user_data(_pegasus_data_version, std::move(value), resp.value);
}
_cu_calculator->add_get_cu(rpc.dsn_request(), resp.error, key, resp.value);
_pfc_get_latency->set(dsn_now_ns() - start_time);
}
void pegasus_server_impl::on_multi_get(multi_get_rpc rpc)
{
dassert(_is_open, "");
_pfc_multi_get_qps->increment();
uint64_t start_time = dsn_now_ns();
const auto &request = rpc.request();
dsn::message_ex *req = rpc.dsn_request();
auto &resp = rpc.response();
resp.app_id = _gpid.get_app_id();
resp.partition_index = _gpid.get_partition_index();
resp.server = _primary_address;
if (!is_filter_type_supported(request.sort_key_filter_type)) {
derror("%s: invalid argument for multi_get from %s: "
"sort key filter type %d not supported",
replica_name(),
rpc.remote_address().to_string(),
request.sort_key_filter_type);
resp.error = rocksdb::Status::kInvalidArgument;
_cu_calculator->add_multi_get_cu(req, resp.error, request.hash_key, resp.kvs);
_pfc_multi_get_latency->set(dsn_now_ns() - start_time);
return;
}
uint32_t max_kv_count = request.max_kv_count > 0 ? request.max_kv_count : INT_MAX;
uint32_t max_iteration_count =
std::min(max_kv_count, _rng_rd_opts.multi_get_max_iteration_count);
int32_t max_kv_size = request.max_kv_size > 0 ? request.max_kv_size : INT_MAX;
int32_t max_iteration_size_config = _rng_rd_opts.multi_get_max_iteration_size > 0
? _rng_rd_opts.multi_get_max_iteration_size
: INT_MAX;
int32_t max_iteration_size = std::min(max_kv_size, max_iteration_size_config);
uint32_t epoch_now = ::pegasus::utils::epoch_now();
int32_t count = 0;
int64_t size = 0;
int32_t iteration_count = 0;
int32_t expire_count = 0;
int32_t filter_count = 0;
if (request.sort_keys.empty()) {
::dsn::blob range_start_key, range_stop_key;
pegasus_generate_key(range_start_key, request.hash_key, request.start_sortkey);
bool start_inclusive = request.start_inclusive;
bool stop_inclusive;
if (request.stop_sortkey.length() == 0) {
pegasus_generate_next_blob(range_stop_key, request.hash_key);
stop_inclusive = false;
} else {
pegasus_generate_key(range_stop_key, request.hash_key, request.stop_sortkey);
stop_inclusive = request.stop_inclusive;
}
rocksdb::Slice start(range_start_key.data(), range_start_key.length());
rocksdb::Slice stop(range_stop_key.data(), range_stop_key.length());
// limit key range by prefix filter
::dsn::blob prefix_start_key, prefix_stop_key;
if (request.sort_key_filter_type == ::dsn::apps::filter_type::FT_MATCH_PREFIX &&
request.sort_key_filter_pattern.length() > 0) {
pegasus_generate_key(
prefix_start_key, request.hash_key, request.sort_key_filter_pattern);
pegasus_generate_next_blob(
prefix_stop_key, request.hash_key, request.sort_key_filter_pattern);
rocksdb::Slice prefix_start(prefix_start_key.data(), prefix_start_key.length());
if (prefix_start.compare(start) > 0) {
start = prefix_start;
start_inclusive = true;
}
rocksdb::Slice prefix_stop(prefix_stop_key.data(), prefix_stop_key.length());
if (prefix_stop.compare(stop) <= 0) {
stop = prefix_stop;
stop_inclusive = false;
}
}
// check if range is empty
int c = start.compare(stop);
if (c > 0 || (c == 0 && (!start_inclusive || !stop_inclusive))) {
// empty sort key range
if (_verbose_log) {
dwarn("%s: empty sort key range for multi_get from %s: hash_key = \"%s\", "
"start_sort_key = \"%s\" (%s), stop_sort_key = \"%s\" (%s), "
"sort_key_filter_type = %s, sort_key_filter_pattern = \"%s\", "
"final_start = \"%s\" (%s), final_stop = \"%s\" (%s)",
replica_name(),
rpc.remote_address().to_string(),
::pegasus::utils::c_escape_string(request.hash_key).c_str(),
::pegasus::utils::c_escape_string(request.start_sortkey).c_str(),
request.start_inclusive ? "inclusive" : "exclusive",
::pegasus::utils::c_escape_string(request.stop_sortkey).c_str(),
request.stop_inclusive ? "inclusive" : "exclusive",
::dsn::apps::_filter_type_VALUES_TO_NAMES.find(request.sort_key_filter_type)
->second,
::pegasus::utils::c_escape_string(request.sort_key_filter_pattern).c_str(),
::pegasus::utils::c_escape_string(start).c_str(),
start_inclusive ? "inclusive" : "exclusive",
::pegasus::utils::c_escape_string(stop).c_str(),
stop_inclusive ? "inclusive" : "exclusive");
}
resp.error = rocksdb::Status::kOk;
_cu_calculator->add_multi_get_cu(req, resp.error, request.hash_key, resp.kvs);
_pfc_multi_get_latency->set(dsn_now_ns() - start_time);
return;
}
std::unique_ptr<rocksdb::Iterator> it;
bool complete = false;
std::unique_ptr<range_read_limiter> limiter =
dsn::make_unique<range_read_limiter>(max_iteration_count,
max_iteration_size,
_rng_rd_opts.rocksdb_iteration_threshold_time_ms);
if (!request.reverse) {
it.reset(_db->NewIterator(_data_cf_rd_opts, _data_cf));
it->Seek(start);
bool first_exclusive = !start_inclusive;
while (limiter->valid() && it->Valid()) {
// check stop sort key
int c = it->key().compare(stop);
if (c > 0 || (c == 0 && !stop_inclusive)) {
// out of range
complete = true;
break;
}
// check start sort key
if (first_exclusive) {
first_exclusive = false;
if (it->key().compare(start) == 0) {
// discard start_sortkey
it->Next();
continue;
}
}
limiter->add_count();
// extract value
auto state = append_key_value_for_multi_get(resp.kvs,
it->key(),
it->value(),
request.sort_key_filter_type,
request.sort_key_filter_pattern,
epoch_now,
request.no_value);
switch (state) {
case range_iteration_state::kNormal: {
count++;
auto &kv = resp.kvs.back();
uint64_t kv_size = kv.key.length() + kv.value.length();
size += kv_size;
limiter->add_size(kv_size);
} break;
case range_iteration_state::kExpired:
expire_count++;
break;
case range_iteration_state::kFiltered:
filter_count++;
break;
default:
break;
}
if (c == 0) {
// if arrived to the last position
complete = true;
break;
}
it->Next();
}
} else { // reverse
rocksdb::ReadOptions rd_opts(_data_cf_rd_opts);
if (_data_cf_opts.prefix_extractor) {
// NOTE: Prefix bloom filter is not supported in reverse seek mode (see
// https://github.com/facebook/rocksdb/wiki/Prefix-Seek-API-Changes#limitation for
// more details), and we have to do total order seek on rocksdb which might be worse
// performance. However we consider that reverse scan is a rare use case, and if
// your workload has many reverse scans, you'd better use 'common' bloom filter (by
// set [pegasus.server]rocksdb_filter_type to 'common').
rd_opts.total_order_seek = true;
rd_opts.prefix_same_as_start = false;
}
it.reset(_db->NewIterator(rd_opts, _data_cf));
it->SeekForPrev(stop);
bool first_exclusive = !stop_inclusive;
std::vector<::dsn::apps::key_value> reverse_kvs;
while (limiter->valid() && it->Valid()) {
// check start sort key
int c = it->key().compare(start);
if (c < 0 || (c == 0 && !start_inclusive)) {
// out of range
complete = true;
break;
}
// check stop sort key
if (first_exclusive) {
first_exclusive = false;
if (it->key().compare(stop) == 0) {
// discard stop_sortkey
it->Prev();
continue;
}
}
limiter->add_count();
// extract value
auto state = append_key_value_for_multi_get(reverse_kvs,
it->key(),
it->value(),
request.sort_key_filter_type,
request.sort_key_filter_pattern,
epoch_now,
request.no_value);
switch (state) {
case range_iteration_state::kNormal: {
count++;
auto &kv = reverse_kvs.back();
uint64_t kv_size = kv.key.length() + kv.value.length();
size += kv_size;
limiter->add_size(kv_size);
} break;
case range_iteration_state::kExpired:
expire_count++;
break;
case range_iteration_state::kFiltered:
filter_count++;
break;
default:
break;
}
if (c == 0) {
// if arrived to the last position
complete = true;
break;
}
it->Prev();
}
if (it->status().ok() && !reverse_kvs.empty()) {
// revert order to make resp.kvs ordered in sort_key
resp.kvs.reserve(reverse_kvs.size());
for (int i = reverse_kvs.size() - 1; i >= 0; i--) {
resp.kvs.emplace_back(std::move(reverse_kvs[i]));
}
}
}
iteration_count = limiter->get_iteration_count();
resp.error = it->status().code();
if (!it->status().ok()) {
// error occur
if (_verbose_log) {
derror("%s: rocksdb scan failed for multi_get from %s: "
"hash_key = \"%s\", reverse = %s, error = %s",
replica_name(),
rpc.remote_address().to_string(),
::pegasus::utils::c_escape_string(request.hash_key).c_str(),
request.reverse ? "true" : "false",
it->status().ToString().c_str());
} else {
derror("%s: rocksdb scan failed for multi_get from %s: "
"reverse = %s, error = %s",
replica_name(),
rpc.remote_address().to_string(),
request.reverse ? "true" : "false",
it->status().ToString().c_str());
}
resp.kvs.clear();
} else if (it->Valid() && !complete) {
// scan not completed
resp.error = rocksdb::Status::kIncomplete;
if (limiter->exceed_limit()) {
dwarn_replica(
"rocksdb abnormal scan from {}: time_used({}ns) VS time_threshold({}ns)",
rpc.remote_address().to_string(),
limiter->duration_time(),
limiter->max_duration_time());
}
}
} else { // condition: !request.sort_keys.empty()
bool error_occurred = false;
rocksdb::Status final_status;
bool exceed_limit = false;
std::vector<::dsn::blob> keys_holder;
std::vector<rocksdb::Slice> keys;
std::vector<std::string> values;
keys_holder.reserve(request.sort_keys.size());
keys.reserve(request.sort_keys.size());
for (auto &sort_key : request.sort_keys) {
::dsn::blob raw_key;
pegasus_generate_key(raw_key, request.hash_key, sort_key);
keys.emplace_back(raw_key.data(), raw_key.length());
keys_holder.emplace_back(std::move(raw_key));
}
std::vector<rocksdb::Status> statuses = _db->MultiGet(_data_cf_rd_opts, keys, &values);
for (int i = 0; i < keys.size(); i++) {
rocksdb::Status &status = statuses[i];
std::string &value = values[i];
// print log
if (!status.ok()) {
if (_verbose_log) {
derror("%s: rocksdb get failed for multi_get from %s: "
"hash_key = \"%s\", sort_key = \"%s\", error = %s",
replica_name(),
rpc.remote_address().to_string(),
::pegasus::utils::c_escape_string(request.hash_key).c_str(),
::pegasus::utils::c_escape_string(request.sort_keys[i]).c_str(),
status.ToString().c_str());
} else if (!status.IsNotFound()) {
derror("%s: rocksdb get failed for multi_get from %s: error = %s",
replica_name(),
rpc.remote_address().to_string(),
status.ToString().c_str());
}
}
// check ttl
if (status.ok()) {
uint32_t expire_ts = pegasus_extract_expire_ts(_pegasus_data_version, value);
if (expire_ts > 0 && expire_ts <= epoch_now) {
expire_count++;
if (_verbose_log) {
derror("%s: rocksdb data expired for multi_get from %s",
replica_name(),
rpc.remote_address().to_string());
}
status = rocksdb::Status::NotFound();
}
}
// extract value
if (status.ok()) {
// check if exceed limit
if (count >= max_kv_count || size >= max_kv_size) {
exceed_limit = true;
break;
}
::dsn::apps::key_value kv;
kv.key = request.sort_keys[i];
if (!request.no_value) {
pegasus_extract_user_data(_pegasus_data_version, std::move(value), kv.value);
}
count++;
size += kv.key.length() + kv.value.length();
resp.kvs.emplace_back(std::move(kv));
}
// if error occurred
if (!status.ok() && !status.IsNotFound()) {
error_occurred = true;
final_status = status;
break;
}
}
if (error_occurred) {
resp.error = final_status.code();
resp.kvs.clear();
} else if (exceed_limit) {
resp.error = rocksdb::Status::kIncomplete;
} else {
resp.error = rocksdb::Status::kOk;
}
}
#ifdef PEGASUS_UNIT_TEST
// sleep 10ms for unit test
usleep(10 * 1000);
#endif
uint64_t time_used = dsn_now_ns() - start_time;
if (is_multi_get_abnormal(time_used, size, iteration_count)) {
dwarn_replica(
"rocksdb abnormal multi_get from {}: hash_key = {}, "
"start_sort_key = {} ({}), stop_sort_key = {} ({}), "
"sort_key_filter_type = {}, sort_key_filter_pattern = {}, "
"max_kv_count = {}, max_kv_size = {}, reverse = {}, "
"result_count = {}, result_size = {}, iteration_count = {}, "
"expire_count = {}, filter_count = {}, time_used = {} ns",
rpc.remote_address().to_string(),
::pegasus::utils::c_escape_string(request.hash_key),
::pegasus::utils::c_escape_string(request.start_sortkey),
request.start_inclusive ? "inclusive" : "exclusive",
::pegasus::utils::c_escape_string(request.stop_sortkey),
request.stop_inclusive ? "inclusive" : "exclusive",
::dsn::apps::_filter_type_VALUES_TO_NAMES.find(request.sort_key_filter_type)->second,
::pegasus::utils::c_escape_string(request.sort_key_filter_pattern),
request.max_kv_count,
request.max_kv_size,
request.reverse ? "true" : "false",
count,
size,
iteration_count,
expire_count,
filter_count,
time_used);
_pfc_recent_abnormal_count->increment();
}
if (expire_count > 0) {
_pfc_recent_expire_count->add(expire_count);
}
if (filter_count > 0) {
_pfc_recent_filter_count->add(filter_count);
}
_cu_calculator->add_multi_get_cu(req, resp.error, request.hash_key, resp.kvs);
_pfc_multi_get_latency->set(dsn_now_ns() - start_time);
}
void pegasus_server_impl::on_sortkey_count(sortkey_count_rpc rpc)
{
dassert(_is_open, "");
_pfc_scan_qps->increment();
uint64_t start_time = dsn_now_ns();
const auto &hash_key = rpc.request();
auto &resp = rpc.response();
resp.app_id = _gpid.get_app_id();
resp.partition_index = _gpid.get_partition_index();
resp.server = _primary_address;
// scan
::dsn::blob start_key, stop_key;
pegasus_generate_key(start_key, hash_key, ::dsn::blob());
pegasus_generate_next_blob(stop_key, hash_key);
rocksdb::Slice start(start_key.data(), start_key.length());
rocksdb::Slice stop(stop_key.data(), stop_key.length());
rocksdb::ReadOptions options = _data_cf_rd_opts;
options.iterate_upper_bound = &stop;
std::unique_ptr<rocksdb::Iterator> it(_db->NewIterator(options, _data_cf));
it->Seek(start);
resp.count = 0;
uint32_t epoch_now = ::pegasus::utils::epoch_now();
uint64_t expire_count = 0;
std::unique_ptr<range_read_limiter> limiter =
dsn::make_unique<range_read_limiter>(_rng_rd_opts.rocksdb_max_iteration_count,
0,
_rng_rd_opts.rocksdb_iteration_threshold_time_ms);
while (limiter->time_check() && it->Valid()) {
limiter->add_count();
if (check_if_record_expired(epoch_now, it->value())) {
expire_count++;
if (_verbose_log) {
derror("%s: rocksdb data expired for sortkey_count from %s",
replica_name(),
rpc.remote_address().to_string());
}
} else {
resp.count++;
}
it->Next();
}
if (expire_count > 0) {
_pfc_recent_expire_count->add(expire_count);
}
resp.error = it->status().code();
if (!it->status().ok()) {
// error occur
if (_verbose_log) {
derror("%s: rocksdb scan failed for sortkey_count from %s: "
"hash_key = \"%s\", error = %s",
replica_name(),
rpc.remote_address().to_string(),
::pegasus::utils::c_escape_string(hash_key).c_str(),
it->status().ToString().c_str());
} else {
derror("%s: rocksdb scan failed for sortkey_count from %s: error = %s",
replica_name(),
rpc.remote_address().to_string(),
it->status().ToString().c_str());
}
resp.count = 0;
} else if (limiter->exceed_limit()) {
dwarn_replica("rocksdb abnormal scan from {}: time_used({}ns) VS time_threshold({}ns)",
rpc.remote_address().to_string(),
limiter->duration_time(),
limiter->max_duration_time());
resp.count = -1;
}
_cu_calculator->add_sortkey_count_cu(rpc.dsn_request(), resp.error, hash_key);
_pfc_scan_latency->set(dsn_now_ns() - start_time);
}
void pegasus_server_impl::on_ttl(ttl_rpc rpc)
{
dassert(_is_open, "");
const auto &key = rpc.request();
auto &resp = rpc.response();
resp.app_id = _gpid.get_app_id();
resp.partition_index = _gpid.get_partition_index();
resp.server = _primary_address;
rocksdb::Slice skey(key.data(), key.length());
std::string value;
rocksdb::Status status = _db->Get(_data_cf_rd_opts, _data_cf, skey, &value);
uint32_t expire_ts = 0;
uint32_t now_ts = ::pegasus::utils::epoch_now();
if (status.ok()) {
expire_ts = pegasus_extract_expire_ts(_pegasus_data_version, value);
if (check_if_ts_expired(now_ts, expire_ts)) {
_pfc_recent_expire_count->increment();
if (_verbose_log) {
derror("%s: rocksdb data expired for ttl from %s",
replica_name(),
rpc.remote_address().to_string());
}
status = rocksdb::Status::NotFound();
}
}
if (!status.ok()) {
if (_verbose_log) {
::dsn::blob hash_key, sort_key;
pegasus_restore_key(key, hash_key, sort_key);
derror("%s: rocksdb get failed for ttl from %s: "
"hash_key = \"%s\", sort_key = \"%s\", error = %s",
replica_name(),
rpc.remote_address().to_string(),
::pegasus::utils::c_escape_string(hash_key).c_str(),
::pegasus::utils::c_escape_string(sort_key).c_str(),
status.ToString().c_str());
} else if (!status.IsNotFound()) {
derror("%s: rocksdb get failed for ttl from %s: error = %s",
replica_name(),
rpc.remote_address().to_string(),
status.ToString().c_str());
}
}
resp.error = status.code();
if (status.ok()) {
if (expire_ts > 0) {
resp.ttl_seconds = expire_ts - now_ts;
} else {
// no ttl
resp.ttl_seconds = -1;
}
}
_cu_calculator->add_ttl_cu(rpc.dsn_request(), resp.error, key);
}
void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)
{
dassert(_is_open, "");
_pfc_scan_qps->increment();
uint64_t start_time = dsn_now_ns();
const auto &request = rpc.request();
dsn::message_ex *req = rpc.dsn_request();
auto &resp = rpc.response();
resp.app_id = _gpid.get_app_id();
resp.partition_index = _gpid.get_partition_index();
resp.server = _primary_address;
if (!is_filter_type_supported(request.hash_key_filter_type)) {
derror("%s: invalid argument for get_scanner from %s: "
"hash key filter type %d not supported",
replica_name(),
rpc.remote_address().to_string(),
request.hash_key_filter_type);
resp.error = rocksdb::Status::kInvalidArgument;
_cu_calculator->add_scan_cu(req, resp.error, resp.kvs);
_pfc_scan_latency->set(dsn_now_ns() - start_time);
return;
}
if (!is_filter_type_supported(request.sort_key_filter_type)) {
derror("%s: invalid argument for get_scanner from %s: "
"sort key filter type %d not supported",
replica_name(),
rpc.remote_address().to_string(),
request.sort_key_filter_type);
resp.error = rocksdb::Status::kInvalidArgument;
_cu_calculator->add_scan_cu(req, resp.error, resp.kvs);
_pfc_scan_latency->set(dsn_now_ns() - start_time);
return;
}
rocksdb::ReadOptions rd_opts(_data_cf_rd_opts);
if (_data_cf_opts.prefix_extractor) {
::dsn::blob start_hash_key, tmp;
pegasus_restore_key(request.start_key, start_hash_key, tmp);
if (start_hash_key.size() == 0) {
// hash_key is not passed, only happened when do full scan (scanners got by
// get_unordered_scanners) on a partition, we have to do total order seek on rocksDB.
rd_opts.total_order_seek = true;
rd_opts.prefix_same_as_start = false;
}
}
bool start_inclusive = request.start_inclusive;
bool stop_inclusive = request.stop_inclusive;
rocksdb::Slice start(request.start_key.data(), request.start_key.length());
rocksdb::Slice stop(request.stop_key.data(), request.stop_key.length());
// limit key range by prefix filter
// because data is not ordered by hash key (hash key "aa" is greater than "b"),
// so we can only limit the start range by hash key filter.
::dsn::blob prefix_start_key;
if (request.hash_key_filter_type == ::dsn::apps::filter_type::FT_MATCH_PREFIX &&
request.hash_key_filter_pattern.length() > 0) {
pegasus_generate_key(prefix_start_key, request.hash_key_filter_pattern, ::dsn::blob());
rocksdb::Slice prefix_start(prefix_start_key.data(), prefix_start_key.length());
if (prefix_start.compare(start) > 0) {
start = prefix_start;
start_inclusive = true;
// Now 'start' is generated by 'request.hash_key_filter_pattern', it may be not a real
// hashkey, we should not seek this prefix by prefix bloom filter. However, it only
// happen when do full scan (scanners got by get_unordered_scanners), in which case the
// following flags has been updated.
dassert(!_data_cf_opts.prefix_extractor || rd_opts.total_order_seek, "Invalid option");
dassert(!_data_cf_opts.prefix_extractor || !rd_opts.prefix_same_as_start,
"Invalid option");
}
}
// check if range is empty
int c = start.compare(stop);
if (c > 0 || (c == 0 && (!start_inclusive || !stop_inclusive))) {
// empty key range
if (_verbose_log) {
dwarn("%s: empty key range for get_scanner from %s: "
"start_key = \"%s\" (%s), stop_key = \"%s\" (%s)",
replica_name(),
rpc.remote_address().to_string(),
::pegasus::utils::c_escape_string(request.start_key).c_str(),
request.start_inclusive ? "inclusive" : "exclusive",
::pegasus::utils::c_escape_string(request.stop_key).c_str(),
request.stop_inclusive ? "inclusive" : "exclusive");
}
resp.error = rocksdb::Status::kOk;
_cu_calculator->add_scan_cu(req, resp.error, resp.kvs);
_pfc_scan_latency->set(dsn_now_ns() - start_time);
return;
}
std::unique_ptr<rocksdb::Iterator> it(_db->NewIterator(rd_opts, _data_cf));
it->Seek(start);
bool complete = false;
bool first_exclusive = !start_inclusive;
uint32_t epoch_now = ::pegasus::utils::epoch_now();
uint64_t expire_count = 0;
uint64_t filter_count = 0;
int32_t count = 0;
uint32_t request_batch_size = request.batch_size > 0 ? request.batch_size : INT_MAX;
uint32_t batch_count = std::min(request_batch_size, _rng_rd_opts.rocksdb_max_iteration_count);
resp.kvs.reserve(batch_count);
bool return_expire_ts = request.__isset.return_expire_ts ? request.return_expire_ts : false;
std::unique_ptr<range_read_limiter> limiter = dsn::make_unique<range_read_limiter>(
batch_count, 0, _rng_rd_opts.rocksdb_iteration_threshold_time_ms);
while (limiter->valid() && it->Valid()) {
int c = it->key().compare(stop);
if (c > 0 || (c == 0 && !stop_inclusive)) {
// out of range
complete = true;
break;
}
if (first_exclusive) {
first_exclusive = false;
if (it->key().compare(start) == 0) {
// discard start_sortkey
it->Next();
continue;
}
}
limiter->add_count();
auto state = append_key_value_for_scan(
resp.kvs,
it->key(),
it->value(),
request.hash_key_filter_type,
request.hash_key_filter_pattern,
request.sort_key_filter_type,
request.sort_key_filter_pattern,
epoch_now,
request.no_value,
request.__isset.validate_partition_hash ? request.validate_partition_hash : true,
return_expire_ts);
switch (state) {
case range_iteration_state::kNormal:
count++;
break;
case range_iteration_state::kExpired:
expire_count++;
break;
case range_iteration_state::kFiltered:
filter_count++;
break;
default:
break;
}
if (c == 0) {
// seek to the last position
complete = true;
break;
}
it->Next();
}
// check iteration time whether exceed limit
if (!complete) {
limiter->time_check_after_incomplete_scan();
}
resp.error = it->status().code();
if (!it->status().ok()) {
// error occur
if (_verbose_log) {
derror("%s: rocksdb scan failed for get_scanner from %s: "
"start_key = \"%s\" (%s), stop_key = \"%s\" (%s), "
"batch_size = %d, read_count = %d, error = %s",
replica_name(),
rpc.remote_address().to_string(),
::pegasus::utils::c_escape_string(start).c_str(),
request.start_inclusive ? "inclusive" : "exclusive",
::pegasus::utils::c_escape_string(stop).c_str(),
request.stop_inclusive ? "inclusive" : "exclusive",
batch_count,
count,
it->status().ToString().c_str());
} else {
derror("%s: rocksdb scan failed for get_scanner from %s: error = %s",
replica_name(),
rpc.remote_address().to_string(),
it->status().ToString().c_str());
}
resp.kvs.clear();
} else if (limiter->exceed_limit()) {
// scan exceed limit time
resp.error = rocksdb::Status::kIncomplete;
dwarn_replica("rocksdb abnormal scan from {}: batch_count={}, time_used_ns({}) VS "
"time_threshold_ns({})",
rpc.remote_address().to_string(),
batch_count,
limiter->duration_time(),
limiter->max_duration_time());
} else if (it->Valid() && !complete) {
// scan not completed
std::unique_ptr<pegasus_scan_context> context(new pegasus_scan_context(
std::move(it),
std::string(stop.data(), stop.size()),
request.stop_inclusive,
request.hash_key_filter_type,
std::string(request.hash_key_filter_pattern.data(),
request.hash_key_filter_pattern.length()),
request.sort_key_filter_type,
std::string(request.sort_key_filter_pattern.data(),
request.sort_key_filter_pattern.length()),
batch_count,
request.no_value,
request.__isset.validate_partition_hash ? request.validate_partition_hash : true,
return_expire_ts));
int64_t handle = _context_cache.put(std::move(context));
resp.context_id = handle;
// if the context is used, it will be fetched and re-put into cache,
// which will change the handle,
// then the delayed task will fetch null context by old handle, and do nothing.
::dsn::tasking::enqueue(LPC_PEGASUS_SERVER_DELAY,
&_tracker,
[this, handle]() { _context_cache.fetch(handle); },
0,
std::chrono::minutes(5));
} else {
// scan completed
resp.context_id = pegasus::SCAN_CONTEXT_ID_COMPLETED;
}
if (expire_count > 0) {
_pfc_recent_expire_count->add(expire_count);
}
if (filter_count > 0) {
_pfc_recent_filter_count->add(filter_count);
}
_cu_calculator->add_scan_cu(req, resp.error, resp.kvs);
_pfc_scan_latency->set(dsn_now_ns() - start_time);
}
void pegasus_server_impl::on_scan(scan_rpc rpc)
{
dassert(_is_open, "");
_pfc_scan_qps->increment();
uint64_t start_time = dsn_now_ns();
const auto &request = rpc.request();
dsn::message_ex *req = rpc.dsn_request();
auto &resp = rpc.response();
resp.app_id = _gpid.get_app_id();
resp.partition_index = _gpid.get_partition_index();
resp.server = _primary_address;
std::unique_ptr<pegasus_scan_context> context = _context_cache.fetch(request.context_id);
if (context) {
rocksdb::Iterator *it = context->iterator.get();
const rocksdb::Slice &stop = context->stop;
bool stop_inclusive = context->stop_inclusive;
::dsn::apps::filter_type::type hash_key_filter_type = context->hash_key_filter_type;
const ::dsn::blob &hash_key_filter_pattern = context->hash_key_filter_pattern;
::dsn::apps::filter_type::type sort_key_filter_type = context->sort_key_filter_type;
const ::dsn::blob &sort_key_filter_pattern = context->sort_key_filter_pattern;
bool no_value = context->no_value;
bool validate_hash = context->validate_partition_hash;
bool return_expire_ts = context->return_expire_ts;
bool complete = false;
uint32_t epoch_now = ::pegasus::utils::epoch_now();
uint64_t expire_count = 0;
uint64_t filter_count = 0;
int32_t count = 0;
uint32_t context_batch_size = context->batch_size > 0 ? context->batch_size : INT_MAX;
uint32_t batch_count =
std::min(context_batch_size, _rng_rd_opts.rocksdb_max_iteration_count);
std::unique_ptr<range_read_limiter> limiter = dsn::make_unique<range_read_limiter>(
batch_count, 0, _rng_rd_opts.rocksdb_iteration_threshold_time_ms);
while (limiter->valid() && it->Valid()) {
int c = it->key().compare(stop);
if (c > 0 || (c == 0 && !stop_inclusive)) {
// out of range
complete = true;
break;
}
limiter->add_count();
auto state = append_key_value_for_scan(resp.kvs,
it->key(),
it->value(),
hash_key_filter_type,
hash_key_filter_pattern,
sort_key_filter_type,
sort_key_filter_pattern,
epoch_now,
no_value,
validate_hash,
return_expire_ts);
switch (state) {
case range_iteration_state::kNormal:
count++;
break;
case range_iteration_state::kExpired:
expire_count++;
break;
case range_iteration_state::kFiltered:
filter_count++;
break;
default:
break;
}
if (c == 0) {
// seek to the last position
complete = true;
break;
}
it->Next();
}
// check iteration time whether exceed limit
if (!complete) {
limiter->time_check_after_incomplete_scan();
}
resp.error = it->status().code();
if (!it->status().ok()) {
// error occur
if (_verbose_log) {
derror("%s: rocksdb scan failed for scan from %s: "
"context_id= %" PRId64 ", stop_key = \"%s\" (%s), "
"batch_size = %d, read_count = %d, error = %s",
replica_name(),
rpc.remote_address().to_string(),
request.context_id,
::pegasus::utils::c_escape_string(stop).c_str(),
stop_inclusive ? "inclusive" : "exclusive",
batch_count,
count,
it->status().ToString().c_str());
} else {
derror("%s: rocksdb scan failed for scan from %s: error = %s",
replica_name(),
rpc.remote_address().to_string(),
it->status().ToString().c_str());
}
resp.kvs.clear();
} else if (limiter->exceed_limit()) {
// scan exceed limit time
resp.error = rocksdb::Status::kIncomplete;
dwarn_replica("rocksdb abnormal scan from {}: batch_count={}, time_used({}ns) VS "
"time_threshold({}ns)",
rpc.remote_address().to_string(),
batch_count,
limiter->duration_time(),
limiter->max_duration_time());
} else if (it->Valid() && !complete) {
// scan not completed
int64_t handle = _context_cache.put(std::move(context));
resp.context_id = handle;
::dsn::tasking::enqueue(LPC_PEGASUS_SERVER_DELAY,
&_tracker,
[this, handle]() { _context_cache.fetch(handle); },
0,
std::chrono::minutes(5));
} else {
// scan completed
resp.context_id = pegasus::SCAN_CONTEXT_ID_COMPLETED;
}
if (expire_count > 0) {
_pfc_recent_expire_count->add(expire_count);
}
if (filter_count > 0) {
_pfc_recent_filter_count->add(filter_count);
}
} else {
resp.error = rocksdb::Status::Code::kNotFound;
}
_cu_calculator->add_scan_cu(req, resp.error, resp.kvs);
_pfc_scan_latency->set(dsn_now_ns() - start_time);
}
void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache.fetch(args); }
::dsn::error_code pegasus_server_impl::start(int argc, char **argv)
{
dassert_replica(!_is_open, "replica is already opened.");
ddebug_replica("start to open app {}", data_dir());
// parse envs for parameters
// envs is compounded in replication_app_base::open() function
std::map<std::string, std::string> envs;
if (argc > 0) {
if ((argc - 1) % 2 != 0) {
derror_replica("parse envs failed, invalid argc = {}", argc);
return ::dsn::ERR_INVALID_PARAMETERS;
}
if (argv == nullptr) {
derror_replica("parse envs failed, invalid argv = nullptr");
return ::dsn::ERR_INVALID_PARAMETERS;
}
int idx = 1;
while (idx < argc) {
const char *key = argv[idx++];
const char *value = argv[idx++];
envs.emplace(key, value);
}
}
// Update all envs before opening db, ensure all envs are effective for the newly opened db.
update_app_envs_before_open_db(envs);
// TODO(yingchun): refactor the following code
//
// here, we must distinguish three cases, such as:
// case 1: we open the db that already exist
// case 2: we open a new db
// case 3: we restore the db base on old data
//
// if we want to restore the db base on old data, only all of the restore preconditions are
// satisfied
// restore preconditions:
// 1, rdb isn't exist
// 2, we can parse restore info from app env, which is stored in argv
// 3, restore_dir is exist
//
bool db_exist = true;
auto path = ::dsn::utils::filesystem::path_combine(data_dir(), "rdb");
if (::dsn::utils::filesystem::path_exists(path)) {
// only case 1
ddebug("%s: rdb is already exist, path = %s", replica_name(), path.c_str());
} else {
std::pair<std::string, bool> restore_info = get_restore_dir_from_env(envs);
const std::string &restore_dir = restore_info.first;
bool force_restore = restore_info.second;
if (restore_dir.empty()) {
// case 2
if (force_restore) {
derror("%s: try to restore, but we can't combine restore_dir from envs",
replica_name());
return ::dsn::ERR_FILE_OPERATION_FAILED;
} else {
db_exist = false;
dinfo("%s: open a new db, path = %s", replica_name(), path.c_str());
}
} else {
// case 3
ddebug("%s: try to restore from restore_dir = %s", replica_name(), restore_dir.c_str());
if (::dsn::utils::filesystem::directory_exists(restore_dir)) {
// here, we just rename restore_dir to rdb, then continue the normal process
if (::dsn::utils::filesystem::rename_path(restore_dir.c_str(), path.c_str())) {
ddebug("%s: rename restore_dir(%s) to rdb(%s) succeed",
replica_name(),
restore_dir.c_str(),
path.c_str());
} else {
derror("%s: rename restore_dir(%s) to rdb(%s) failed",
replica_name(),
restore_dir.c_str(),
path.c_str());
return ::dsn::ERR_FILE_OPERATION_FAILED;
}
} else {
if (force_restore) {
derror("%s: try to restore, but restore_dir isn't exist, restore_dir = %s",
replica_name(),
restore_dir.c_str());
return ::dsn::ERR_FILE_OPERATION_FAILED;
} else {
db_exist = false;
dwarn(
"%s: try to restore and restore_dir(%s) isn't exist, but we don't force "
"it, the role of this replica must not primary, so we open a new db on the "
"path(%s)",
replica_name(),
restore_dir.c_str(),
path.c_str());
}
}
}
}
ddebug("%s: start to open rocksDB's rdb(%s)", replica_name(), path.c_str());
// Here we create a `tmp_data_cf_opts` because we don't want to modify `_data_cf_opts`, which
// will be used elsewhere.
rocksdb::ColumnFamilyOptions tmp_data_cf_opts = _data_cf_opts;
bool has_incompatible_db_options = false;
if (db_exist) {
// When DB exists, meta CF and data CF must be present.
bool missing_meta_cf = true;
bool missing_data_cf = true;
if (check_column_families(path, &missing_meta_cf, &missing_data_cf) != ::dsn::ERR_OK) {
derror_replica("check column families failed");
return ::dsn::ERR_LOCAL_APP_FAILURE;
}
dassert_replica(!missing_meta_cf, "You must upgrade Pegasus server from 2.0");
dassert_replica(!missing_data_cf, "Missing data column family");
// Load latest options from option file stored in the db directory.
rocksdb::DBOptions loaded_db_opt;
std::vector<rocksdb::ColumnFamilyDescriptor> loaded_cf_descs;
rocksdb::ColumnFamilyOptions loaded_data_cf_opts;
// Set `ignore_unknown_options` true for forward compatibility.
auto status = rocksdb::LoadLatestOptions(path,
rocksdb::Env::Default(),
&loaded_db_opt,
&loaded_cf_descs,
/*ignore_unknown_options=*/true);
if (!status.ok()) {
// Here we ignore an invalid argument error related to `pegasus_data_version` and
// `pegasus_data` options, which were used in old version rocksdbs (before 2.1.0).
if (status.code() != rocksdb::Status::kInvalidArgument ||
status.ToString().find("pegasus_data") == std::string::npos) {
derror_replica("load latest option file failed: {}.", status.ToString());
return ::dsn::ERR_LOCAL_APP_FAILURE;
}
has_incompatible_db_options = true;
dwarn_replica("The latest option file has incompatible db options: {}, use default "
"options to open db.",
status.ToString());
}
if (!has_incompatible_db_options) {
for (int i = 0; i < loaded_cf_descs.size(); ++i) {
if (loaded_cf_descs[i].name == DATA_COLUMN_FAMILY_NAME) {
loaded_data_cf_opts = loaded_cf_descs[i].options;
}
}
// Reset usage scenario related options according to loaded_data_cf_opts.
// We don't use `loaded_data_cf_opts` directly because pointer-typed options will
// only be initialized with default values when calling 'LoadLatestOptions', see
// 'rocksdb/utilities/options_util.h'.
reset_usage_scenario_options(loaded_data_cf_opts, &tmp_data_cf_opts);
}
} else {
// When create new DB, we have to create a new column family to store meta data (meta column
// family).
_db_opts.create_missing_column_families = true;
}
std::vector<rocksdb::ColumnFamilyDescriptor> column_families(
{{DATA_COLUMN_FAMILY_NAME, tmp_data_cf_opts}, {META_COLUMN_FAMILY_NAME, _meta_cf_opts}});
auto s = rocksdb::CheckOptionsCompatibility(
path, rocksdb::Env::Default(), _db_opts, column_families, /*ignore_unknown_options=*/true);
if (!s.ok() && !s.IsNotFound() && !has_incompatible_db_options) {
derror_replica("rocksdb::CheckOptionsCompatibility failed, error = {}", s.ToString());
return ::dsn::ERR_LOCAL_APP_FAILURE;
}
std::vector<rocksdb::ColumnFamilyHandle *> handles_opened;
auto status = rocksdb::DB::Open(_db_opts, path, column_families, &handles_opened, &_db);
if (!status.ok()) {
derror_replica("rocksdb::DB::Open failed, error = {}", status.ToString());
return ::dsn::ERR_LOCAL_APP_FAILURE;
}
dcheck_eq_replica(2, handles_opened.size());
dcheck_eq_replica(handles_opened[0]->GetName(), DATA_COLUMN_FAMILY_NAME);
dcheck_eq_replica(handles_opened[1]->GetName(), META_COLUMN_FAMILY_NAME);
_data_cf = handles_opened[0];
_meta_cf = handles_opened[1];
// Create _meta_store which provide Pegasus meta data read and write.
_meta_store = dsn::make_unique<meta_store>(this, _db, _meta_cf);
if (db_exist) {
_last_committed_decree = _meta_store->get_last_flushed_decree();
_pegasus_data_version = _meta_store->get_data_version();
_usage_scenario = _meta_store->get_usage_scenario();
uint64_t last_manual_compact_finish_time =
_meta_store->get_last_manual_compact_finish_time();
if (_pegasus_data_version > PEGASUS_DATA_VERSION_MAX) {
derror_replica("open app failed, unsupported data version {}", _pegasus_data_version);
release_db();
return ::dsn::ERR_LOCAL_APP_FAILURE;
}
// update last manual compact finish timestamp
_manual_compact_svc.init_last_finish_time_ms(last_manual_compact_finish_time);
} else {
// Write initial meta data to meta CF and flush when create new DB.
_meta_store->set_data_version(PEGASUS_DATA_VERSION_MAX);
_meta_store->set_last_flushed_decree(0);
_meta_store->set_last_manual_compact_finish_time(0);
flush_all_family_columns(true);
}
// only enable filter after correct pegasus_data_version set
_key_ttl_compaction_filter_factory->SetPegasusDataVersion(_pegasus_data_version);
_key_ttl_compaction_filter_factory->SetPartitionIndex(_gpid.get_partition_index());
_key_ttl_compaction_filter_factory->SetPartitionVersion(_gpid.get_partition_index() - 1);
_key_ttl_compaction_filter_factory->EnableFilter();
parse_checkpoints();
// checkpoint if necessary to make last_durable_decree() fresh.
// only need async checkpoint because we sure that memtable is empty now.
int64_t last_flushed = static_cast<int64_t>(_last_committed_decree);
if (last_flushed != last_durable_decree()) {
ddebug_replica(
"start to do async checkpoint, last_durable_decree = {}, last_flushed_decree = {}",
last_durable_decree(),
last_flushed);
auto err = async_checkpoint(false);
if (err != ::dsn::ERR_OK) {
derror_replica("create checkpoint failed, error = {}", err.to_string());
release_db();
return err;
}
dcheck_eq_replica(last_flushed, last_durable_decree());
}
ddebug_replica("open app succeed, pegasus_data_version = {}, last_durable_decree = {}",
_pegasus_data_version,
last_durable_decree());
_is_open = true;
if (!db_exist) {
// When create a new db, update usage scenario according to app envs.
update_usage_scenario(envs);
}
dinfo_replica("start the update replica-level rocksdb statistics timer task");
_update_replica_rdb_stat =
::dsn::tasking::enqueue_timer(LPC_REPLICATION_LONG_COMMON,
&_tracker,
[this]() { this->update_replica_rocksdb_statistics(); },
_update_rdb_stat_interval);
// These counters are singletons on this server shared by all replicas, their metrics update
// task should be scheduled once an interval on the server view.
static std::once_flag flag;
std::call_once(flag, [&]() {
// The timer task will always running even though there is no replicas
dassert_f(kServerStatUpdateTimeSec.count() != 0,
"kServerStatUpdateTimeSec shouldn't be zero");
_update_server_rdb_stat = ::dsn::tasking::enqueue_timer(
LPC_REPLICATION_LONG_COMMON,
nullptr, // TODO: the tracker is nullptr, we will fix it later
[]() { update_server_rocksdb_statistics(); },
kServerStatUpdateTimeSec);
});
// initialize cu calculator and write service after server being initialized.
_cu_calculator = dsn::make_unique<capacity_unit_calculator>(
this, _read_hotkey_collector, _write_hotkey_collector);
_server_write = dsn::make_unique<pegasus_server_write>(this, _verbose_log);
::dsn::tasking::enqueue_timer(LPC_ANALYZE_HOTKEY,
&_tracker,
[this]() { _read_hotkey_collector->analyse_data(); },
std::chrono::seconds(FLAGS_hotkey_analyse_time_interval_s));
::dsn::tasking::enqueue_timer(LPC_ANALYZE_HOTKEY,
&_tracker,
[this]() { _write_hotkey_collector->analyse_data(); },
std::chrono::seconds(FLAGS_hotkey_analyse_time_interval_s));
return ::dsn::ERR_OK;
}
void pegasus_server_impl::cancel_background_work(bool wait)
{
if (_is_open) {
dassert(_db != nullptr, "");
rocksdb::CancelAllBackgroundWork(_db, wait);
}
}
::dsn::error_code pegasus_server_impl::stop(bool clear_state)
{
if (!_is_open) {
dassert(_db == nullptr, "");
dassert(!clear_state, "should not be here if do clear");
return ::dsn::ERR_OK;
}
if (!clear_state) {
flush_all_family_columns(true);
}
// stop all tracked tasks when pegasus server is stopped.
if (_update_replica_rdb_stat != nullptr) {
_update_replica_rdb_stat->cancel(true);
_update_replica_rdb_stat = nullptr;
}
if (_update_server_rdb_stat != nullptr) {
_update_server_rdb_stat->cancel(true);
_update_server_rdb_stat = nullptr;
}
_tracker.cancel_outstanding_tasks();
_context_cache.clear();
_is_open = false;
release_db();
std::deque<int64_t> reserved_checkpoints;
{
::dsn::utils::auto_lock<::dsn::utils::ex_lock_nr> l(_checkpoints_lock);
std::swap(reserved_checkpoints, _checkpoints);
set_last_durable_decree(0);
}
if (clear_state) {
// when clean the data dir, please clean the checkpoints first.
// otherwise, if the "rdb" is removed but the checkpoints remains,
// the storage engine can't be opened again
for (auto iter = reserved_checkpoints.begin(); iter != reserved_checkpoints.end(); ++iter) {
std::string chkpt_path =
dsn::utils::filesystem::path_combine(data_dir(), chkpt_get_dir_name(*iter));
if (!dsn::utils::filesystem::remove_path(chkpt_path)) {
derror("%s: rmdir %s failed when stop app", replica_name(), chkpt_path.c_str());
}
}
if (!dsn::utils::filesystem::remove_path(data_dir())) {
derror("%s: rmdir %s failed when stop app", replica_name(), data_dir().c_str());
return ::dsn::ERR_FILE_OPERATION_FAILED;
}
_pfc_rdb_sst_count->set(0);
_pfc_rdb_sst_size->set(0);
_pfc_rdb_block_cache_hit_count->set(0);
_pfc_rdb_block_cache_total_count->set(0);
_pfc_rdb_block_cache_mem_usage->set(0);
_pfc_rdb_index_and_filter_blocks_mem_usage->set(0);
_pfc_rdb_memtable_mem_usage->set(0);
}
ddebug(
"%s: close app succeed, clear_state = %s", replica_name(), clear_state ? "true" : "false");
return ::dsn::ERR_OK;
}
class CheckpointingTokenHelper
{
public:
CheckpointingTokenHelper(std::atomic_bool &flag) : _flag(flag)
{
bool expected = false;
_token_got = _flag.compare_exchange_strong(expected, true);
}
~CheckpointingTokenHelper()
{
if (_token_got)
_flag.store(false);
}
bool token_got() const { return _token_got; }
private:
std::atomic_bool &_flag;
bool _token_got;
};
::dsn::error_code pegasus_server_impl::sync_checkpoint()
{
CheckpointingTokenHelper token_helper(_is_checkpointing);
if (!token_helper.token_got())
return ::dsn::ERR_WRONG_TIMING;
int64_t last_durable = last_durable_decree();
int64_t last_commit = last_committed_decree();
dcheck_le_replica(last_durable, last_commit);
// case 1: last_durable == last_commit
// no need to do checkpoint
if (last_durable == last_commit) {
ddebug_replica(
"no need to do checkpoint because last_durable_decree = last_committed_decree = {}",
last_durable);
return ::dsn::ERR_OK;
}
// case 2: last_durable < last_commit
// need to do checkpoint
rocksdb::Checkpoint *chkpt_raw = nullptr;
auto status = rocksdb::Checkpoint::Create(_db, &chkpt_raw);
if (!status.ok()) {
derror_replica("create Checkpoint object failed, error = {}", status.ToString());
return ::dsn::ERR_LOCAL_APP_FAILURE;
}
std::unique_ptr<rocksdb::Checkpoint> chkpt(chkpt_raw);
auto dir = chkpt_get_dir_name(last_commit);
auto checkpoint_dir = ::dsn::utils::filesystem::path_combine(data_dir(), dir);
if (::dsn::utils::filesystem::directory_exists(checkpoint_dir)) {
ddebug_replica("checkpoint directory {} is already existed, remove it first",
checkpoint_dir);
if (!::dsn::utils::filesystem::remove_path(checkpoint_dir)) {
derror_replica("remove checkpoint directory {} failed", checkpoint_dir);
return ::dsn::ERR_FILE_OPERATION_FAILED;
}
}
// log_size_for_flush = 0 means always flush memtable before recording the live files
status = chkpt->CreateCheckpoint(checkpoint_dir, 0 /* log_size_for_flush */);
if (!status.ok()) {
// sometimes checkpoint may fail, and try again will succeed
derror_replica("CreateCheckpoint failed, error = {}, try again", status.ToString());
// TODO(yingchun): fail and return
status = chkpt->CreateCheckpoint(checkpoint_dir, 0);
}
if (!status.ok()) {
derror_replica("CreateCheckpoint failed, error = {}", status.ToString());
if (!::dsn::utils::filesystem::remove_path(checkpoint_dir)) {
derror_replica("remove checkpoint directory {} failed", checkpoint_dir);
}
return ::dsn::ERR_LOCAL_APP_FAILURE;
}
{
::dsn::utils::auto_lock<::dsn::utils::ex_lock_nr> l(_checkpoints_lock);
dcheck_gt_replica(last_commit, last_durable_decree());
int64_t last_flushed = static_cast<int64_t>(_meta_store->get_last_flushed_decree());
dcheck_eq_replica(last_commit, last_flushed);
if (!_checkpoints.empty()) {
dcheck_gt_replica(last_commit, _checkpoints.back());
}
_checkpoints.push_back(last_commit);
set_last_durable_decree(_checkpoints.back());
}
ddebug_replica("sync create checkpoint succeed, last_durable_decree = {}",
last_durable_decree());
gc_checkpoints();
return ::dsn::ERR_OK;
}
// Must be thread safe.
::dsn::error_code pegasus_server_impl::async_checkpoint(bool flush_memtable)
{
CheckpointingTokenHelper token_helper(_is_checkpointing);
if (!token_helper.token_got())
return ::dsn::ERR_WRONG_TIMING;
int64_t last_durable = last_durable_decree();
int64_t last_flushed = static_cast<int64_t>(_meta_store->get_last_flushed_decree());
int64_t last_commit = last_committed_decree();
dcheck_le_replica(last_durable, last_flushed);
dcheck_le_replica(last_flushed, last_commit);
// case 1: last_durable == last_flushed == last_commit
// no need to do checkpoint
if (last_durable == last_commit) {
dcheck_eq_replica(last_durable, last_flushed);
dcheck_eq_replica(last_flushed, last_commit);
ddebug_replica(
"no need to checkpoint because last_durable_decree = last_committed_decree = {}",
last_durable);
return ::dsn::ERR_OK;
}
// case 2: last_durable == last_flushed < last_commit
// no need to do checkpoint, but need to flush memtable if required
if (last_durable == last_flushed) {
dcheck_lt_replica(last_flushed, last_commit);
if (!flush_memtable) {
// no flush required
return ::dsn::ERR_OK;
}
// flush required, but not wait
if (::dsn::ERR_OK == flush_all_family_columns(false)) {
ddebug_replica("trigger flushing memtable succeed");
return ::dsn::ERR_TRY_AGAIN;
} else {
derror_replica("trigger flushing memtable failed");
return ::dsn::ERR_LOCAL_APP_FAILURE;
}
}
// case 3: last_durable < last_flushed <= last_commit
// need to do checkpoint
dcheck_lt_replica(last_durable, last_flushed);
std::string tmp_dir = ::dsn::utils::filesystem::path_combine(
data_dir(), std::string("checkpoint.tmp.") + std::to_string(dsn_now_us()));
if (::dsn::utils::filesystem::directory_exists(tmp_dir)) {
ddebug_replica("temporary checkpoint directory {} is already existed, remove it first",
tmp_dir);
if (!::dsn::utils::filesystem::remove_path(tmp_dir)) {
derror_replica("remove temporary checkpoint directory {} failed", tmp_dir);
return ::dsn::ERR_FILE_OPERATION_FAILED;
}
}
int64_t checkpoint_decree = 0;
::dsn::error_code err = copy_checkpoint_to_dir_unsafe(tmp_dir.c_str(), &checkpoint_decree);
if (err != ::dsn::ERR_OK) {
derror_replica("copy_checkpoint_to_dir_unsafe failed with err = {}", err.to_string());
return ::dsn::ERR_LOCAL_APP_FAILURE;
}
auto checkpoint_dir =
::dsn::utils::filesystem::path_combine(data_dir(), chkpt_get_dir_name(checkpoint_decree));
if (::dsn::utils::filesystem::directory_exists(checkpoint_dir)) {
ddebug_replica("checkpoint directory {} is already existed, remove it first",
checkpoint_dir);
if (!::dsn::utils::filesystem::remove_path(checkpoint_dir)) {
derror_replica("remove old checkpoint directory {} failed", checkpoint_dir);
if (!::dsn::utils::filesystem::remove_path(tmp_dir)) {
derror_replica("remove temporary checkpoint directory {} failed", tmp_dir);
}
return ::dsn::ERR_FILE_OPERATION_FAILED;
}
}
if (!::dsn::utils::filesystem::rename_path(tmp_dir, checkpoint_dir)) {
derror_replica("rename checkpoint directory from {} to {} failed", tmp_dir, checkpoint_dir);
if (!::dsn::utils::filesystem::remove_path(tmp_dir)) {
derror_replica("remove temporary checkpoint directory {} failed", tmp_dir);
}
return ::dsn::ERR_FILE_OPERATION_FAILED;
}
{
::dsn::utils::auto_lock<::dsn::utils::ex_lock_nr> l(_checkpoints_lock);
dcheck_gt_replica(checkpoint_decree, last_durable_decree());
if (!_checkpoints.empty()) {
dcheck_gt_replica(checkpoint_decree, _checkpoints.back());
}
_checkpoints.push_back(checkpoint_decree);
set_last_durable_decree(_checkpoints.back());
}
ddebug_replica("async create checkpoint succeed, last_durable_decree = {}",
last_durable_decree());
gc_checkpoints();
return ::dsn::ERR_OK;
}
// Must be thread safe.
::dsn::error_code pegasus_server_impl::copy_checkpoint_to_dir(const char *checkpoint_dir,
/*output*/ int64_t *last_decree,
bool flush_memtable)
{
CheckpointingTokenHelper token_helper(_is_checkpointing);
if (!token_helper.token_got()) {
return ::dsn::ERR_WRONG_TIMING;
}
return copy_checkpoint_to_dir_unsafe(checkpoint_dir, last_decree, flush_memtable);
}
// not thread safe, should be protected by caller
::dsn::error_code pegasus_server_impl::copy_checkpoint_to_dir_unsafe(const char *checkpoint_dir,
int64_t *checkpoint_decree,
bool flush_memtable)
{
rocksdb::Checkpoint *chkpt_raw = nullptr;
auto status = rocksdb::Checkpoint::Create(_db, &chkpt_raw);
if (!status.ok()) {
derror_replica("create Checkpoint object failed, error = {}", status.ToString());
return ::dsn::ERR_LOCAL_APP_FAILURE;
}
std::unique_ptr<rocksdb::Checkpoint> chkpt(chkpt_raw);
if (::dsn::utils::filesystem::directory_exists(checkpoint_dir)) {
ddebug_replica("checkpoint directory {} is already existed, remove it first",
checkpoint_dir);
if (!::dsn::utils::filesystem::remove_path(checkpoint_dir)) {
derror_replica("remove checkpoint directory {} failed", checkpoint_dir);
return ::dsn::ERR_FILE_OPERATION_FAILED;
}
}
// CreateCheckpoint() will not flush memtable when log_size_for_flush = max
status = chkpt->CreateCheckpoint(checkpoint_dir,
flush_memtable ? 0 : std::numeric_limits<uint64_t>::max());
if (!status.ok()) {
derror_replica("CreateCheckpoint failed, error = {}", status.ToString());
if (!::dsn::utils::filesystem::remove_path(checkpoint_dir)) {
derror_replica("remove checkpoint directory {} failed", checkpoint_dir);
}
return ::dsn::ERR_LOCAL_APP_FAILURE;
}
ddebug_replica("copy checkpoint to dir({}) succeed", checkpoint_dir);
if (checkpoint_decree != nullptr) {
rocksdb::DB *snapshot_db = nullptr;
std::vector<rocksdb::ColumnFamilyHandle *> handles_opened;
auto cleanup = [&](bool remove_checkpoint) {
if (remove_checkpoint && !::dsn::utils::filesystem::remove_path(checkpoint_dir)) {
derror_replica("remove checkpoint directory {} failed", checkpoint_dir);
}
if (snapshot_db) {
for (auto handle : handles_opened) {
if (handle) {
snapshot_db->DestroyColumnFamilyHandle(handle);
handle = nullptr;
}
}
delete snapshot_db;
snapshot_db = nullptr;
}
};
// Because of RocksDB's restriction, we have to to open default column family even though
// not use it
std::vector<rocksdb::ColumnFamilyDescriptor> column_families(
{{DATA_COLUMN_FAMILY_NAME, rocksdb::ColumnFamilyOptions()},
{META_COLUMN_FAMILY_NAME, rocksdb::ColumnFamilyOptions()}});
status = rocksdb::DB::OpenForReadOnly(
rocksdb::DBOptions(), checkpoint_dir, column_families, &handles_opened, &snapshot_db);
if (!status.ok()) {
derror_replica(
"OpenForReadOnly from {} failed, error = {}", checkpoint_dir, status.ToString());
snapshot_db = nullptr;
cleanup(true);
return ::dsn::ERR_LOCAL_APP_FAILURE;
}
dcheck_eq_replica(handles_opened.size(), 2);
dcheck_eq_replica(handles_opened[1]->GetName(), META_COLUMN_FAMILY_NAME);
uint64_t last_flushed_decree =
_meta_store->get_decree_from_readonly_db(snapshot_db, handles_opened[1]);
*checkpoint_decree = last_flushed_decree;
cleanup(false);
}
return ::dsn::ERR_OK;
}
::dsn::error_code pegasus_server_impl::get_checkpoint(int64_t learn_start,
const dsn::blob &learn_request,
dsn::replication::learn_state &state)
{
dassert(_is_open, "");
int64_t ci = last_durable_decree();
if (ci == 0) {
derror("%s: no checkpoint found", replica_name());
return ::dsn::ERR_OBJECT_NOT_FOUND;
}
auto chkpt_dir = ::dsn::utils::filesystem::path_combine(data_dir(), chkpt_get_dir_name(ci));
state.files.clear();
if (!::dsn::utils::filesystem::get_subfiles(chkpt_dir, state.files, true)) {
derror("%s: list files in checkpoint dir %s failed", replica_name(), chkpt_dir.c_str());
return ::dsn::ERR_FILE_OPERATION_FAILED;
}
state.from_decree_excluded = 0;
state.to_decree_included = ci;
ddebug("%s: get checkpoint succeed, from_decree_excluded = 0, to_decree_included = %" PRId64 "",
replica_name(),
state.to_decree_included);
return ::dsn::ERR_OK;
}
::dsn::error_code
pegasus_server_impl::storage_apply_checkpoint(chkpt_apply_mode mode,
const dsn::replication::learn_state &state)
{
::dsn::error_code err;
int64_t ci = state.to_decree_included;
if (mode == chkpt_apply_mode::copy) {
dassert(ci > last_durable_decree(),
"state.to_decree_included(%" PRId64 ") <= last_durable_decree(%" PRId64 ")",
ci,
last_durable_decree());
auto learn_dir = ::dsn::utils::filesystem::remove_file_name(state.files[0]);
auto chkpt_dir = ::dsn::utils::filesystem::path_combine(data_dir(), chkpt_get_dir_name(ci));
if (::dsn::utils::filesystem::rename_path(learn_dir, chkpt_dir)) {
::dsn::utils::auto_lock<::dsn::utils::ex_lock_nr> l(_checkpoints_lock);
dassert(ci > last_durable_decree(),
"%" PRId64 " VS %" PRId64 "",
ci,
last_durable_decree());
_checkpoints.push_back(ci);
if (!_checkpoints.empty()) {
dassert(ci > _checkpoints.back(),
"%" PRId64 " VS %" PRId64 "",
ci,
_checkpoints.back());
}
set_last_durable_decree(ci);
err = ::dsn::ERR_OK;
} else {
derror("%s: rename directory %s to %s failed",
replica_name(),
learn_dir.c_str(),
chkpt_dir.c_str());
err = ::dsn::ERR_FILE_OPERATION_FAILED;
}
return err;
}
if (_is_open) {
err = stop(true);
if (err != ::dsn::ERR_OK) {
derror("%s: close rocksdb %s failed, error = %s", replica_name(), err.to_string());
return err;
}
}
// clear data dir
if (!::dsn::utils::filesystem::remove_path(data_dir())) {
derror("%s: clear data directory %s failed", replica_name(), data_dir().c_str());
return ::dsn::ERR_FILE_OPERATION_FAILED;
}
// reopen the db with the new checkpoint files
if (state.files.size() > 0) {
// create data dir
if (!::dsn::utils::filesystem::create_directory(data_dir())) {
derror("%s: create data directory %s failed", replica_name(), data_dir().c_str());
return ::dsn::ERR_FILE_OPERATION_FAILED;
}
// move learned files from learn_dir to data_dir/rdb
std::string learn_dir = ::dsn::utils::filesystem::remove_file_name(state.files[0]);
std::string new_dir = ::dsn::utils::filesystem::path_combine(data_dir(), "rdb");
if (!::dsn::utils::filesystem::rename_path(learn_dir, new_dir)) {
derror("%s: rename directory %s to %s failed",
replica_name(),
learn_dir.c_str(),
new_dir.c_str());
return ::dsn::ERR_FILE_OPERATION_FAILED;
}
err = start(0, nullptr);
} else {
ddebug("%s: apply empty checkpoint, create new rocksdb", replica_name());
err = start(0, nullptr);
}
if (err != ::dsn::ERR_OK) {
derror("%s: open rocksdb failed, error = %s", replica_name(), err.to_string());
return err;
}
dassert(_is_open, "");
dassert(ci == last_durable_decree(), "%" PRId64 " VS %" PRId64 "", ci, last_durable_decree());
ddebug("%s: apply checkpoint succeed, last_durable_decree = %" PRId64,
replica_name(),
last_durable_decree());
return ::dsn::ERR_OK;
}
bool pegasus_server_impl::validate_filter(::dsn::apps::filter_type::type filter_type,
const ::dsn::blob &filter_pattern,
const ::dsn::blob &value)
{
switch (filter_type) {
case ::dsn::apps::filter_type::FT_NO_FILTER:
return true;
case ::dsn::apps::filter_type::FT_MATCH_ANYWHERE:
case ::dsn::apps::filter_type::FT_MATCH_PREFIX:
case ::dsn::apps::filter_type::FT_MATCH_POSTFIX: {
if (filter_pattern.length() == 0)
return true;
if (value.length() < filter_pattern.length())
return false;
if (filter_type == ::dsn::apps::filter_type::FT_MATCH_ANYWHERE) {
return dsn::string_view(value).find(filter_pattern) != dsn::string_view::npos;
} else if (filter_type == ::dsn::apps::filter_type::FT_MATCH_PREFIX) {
return ::memcmp(value.data(), filter_pattern.data(), filter_pattern.length()) == 0;
} else { // filter_type == ::dsn::apps::filter_type::FT_MATCH_POSTFIX
return ::memcmp(value.data() + value.length() - filter_pattern.length(),
filter_pattern.data(),
filter_pattern.length()) == 0;
}
}
default:
dassert(false, "unsupported filter type: %d", filter_type);
}
return false;
}
range_iteration_state
pegasus_server_impl::append_key_value_for_scan(std::vector<::dsn::apps::key_value> &kvs,
const rocksdb::Slice &key,
const rocksdb::Slice &value,
::dsn::apps::filter_type::type hash_key_filter_type,
const ::dsn::blob &hash_key_filter_pattern,
::dsn::apps::filter_type::type sort_key_filter_type,
const ::dsn::blob &sort_key_filter_pattern,
uint32_t epoch_now,
bool no_value,
bool request_validate_hash,
bool request_expire_ts)
{
if (check_if_record_expired(epoch_now, value)) {
if (_verbose_log) {
derror("%s: rocksdb data expired for scan", replica_name());
}
return range_iteration_state::kExpired;
}
if (request_validate_hash && _validate_partition_hash) {
if (_partition_version < 0 || _gpid.get_partition_index() > _partition_version ||
!check_pegasus_key_hash(key, _gpid.get_partition_index(), _partition_version)) {
if (_verbose_log) {
derror_replica("not serve hash key while scan");
}
return range_iteration_state::kHashInvalid;
}
}
::dsn::apps::key_value kv;
// extract raw key
::dsn::blob raw_key(key.data(), 0, key.size());
if (hash_key_filter_type != ::dsn::apps::filter_type::FT_NO_FILTER ||
sort_key_filter_type != ::dsn::apps::filter_type::FT_NO_FILTER) {
::dsn::blob hash_key, sort_key;
pegasus_restore_key(raw_key, hash_key, sort_key);
if (hash_key_filter_type != ::dsn::apps::filter_type::FT_NO_FILTER &&
!validate_filter(hash_key_filter_type, hash_key_filter_pattern, hash_key)) {
if (_verbose_log) {
derror("%s: hash key filtered for scan", replica_name());
}
return range_iteration_state::kFiltered;
}
if (sort_key_filter_type != ::dsn::apps::filter_type::FT_NO_FILTER &&
!validate_filter(sort_key_filter_type, sort_key_filter_pattern, sort_key)) {
if (_verbose_log) {
derror("%s: sort key filtered for scan", replica_name());
}
return range_iteration_state::kFiltered;
}
}
std::shared_ptr<char> key_buf(::dsn::utils::make_shared_array<char>(raw_key.length()));
::memcpy(key_buf.get(), raw_key.data(), raw_key.length());
kv.key.assign(std::move(key_buf), 0, raw_key.length());
// extract expire ts if necessary
if (request_expire_ts) {
auto expire_ts_seconds =
pegasus_extract_expire_ts(_pegasus_data_version, utils::to_string_view(value));
kv.__set_expire_ts_seconds(static_cast<int32_t>(expire_ts_seconds));
}
// extract value
if (!no_value) {
std::string value_buf(value.data(), value.size());
pegasus_extract_user_data(_pegasus_data_version, std::move(value_buf), kv.value);
}
kvs.emplace_back(std::move(kv));
return range_iteration_state::kNormal;
}
range_iteration_state pegasus_server_impl::append_key_value_for_multi_get(
std::vector<::dsn::apps::key_value> &kvs,
const rocksdb::Slice &key,
const rocksdb::Slice &value,
::dsn::apps::filter_type::type sort_key_filter_type,
const ::dsn::blob &sort_key_filter_pattern,
uint32_t epoch_now,
bool no_value)
{
if (check_if_record_expired(epoch_now, value)) {
if (_verbose_log) {
derror("%s: rocksdb data expired for multi get", replica_name());
}
return range_iteration_state::kExpired;
}
::dsn::apps::key_value kv;
// extract sort_key
::dsn::blob raw_key(key.data(), 0, key.size());
::dsn::blob hash_key, sort_key;
pegasus_restore_key(raw_key, hash_key, sort_key);
if (sort_key_filter_type != ::dsn::apps::filter_type::FT_NO_FILTER &&
!validate_filter(sort_key_filter_type, sort_key_filter_pattern, sort_key)) {
if (_verbose_log) {
derror("%s: sort key filtered for multi get", replica_name());
}
return range_iteration_state::kFiltered;
}
std::shared_ptr<char> sort_key_buf(::dsn::utils::make_shared_array<char>(sort_key.length()));
::memcpy(sort_key_buf.get(), sort_key.data(), sort_key.length());
kv.key.assign(std::move(sort_key_buf), 0, sort_key.length());
// extract value
if (!no_value) {
std::string value_buf(value.data(), value.size());
pegasus_extract_user_data(_pegasus_data_version, std::move(value_buf), kv.value);
}
kvs.emplace_back(std::move(kv));
return range_iteration_state::kNormal;
}
void pegasus_server_impl::update_replica_rocksdb_statistics()
{
std::string str_val;
uint64_t val = 0;
// Update _pfc_rdb_sst_count
for (int i = 0; i < _data_cf_opts.num_levels; ++i) {
int cur_level_count = 0;
if (_db->GetProperty(rocksdb::DB::Properties::kNumFilesAtLevelPrefix + std::to_string(i),
&str_val) &&
dsn::buf2int32(str_val, cur_level_count)) {
val += cur_level_count;
}
}
_pfc_rdb_sst_count->set(val);
dinfo_replica("_pfc_rdb_sst_count: {}", val);
// Update _pfc_rdb_sst_size
if (_db->GetProperty(_data_cf, rocksdb::DB::Properties::kTotalSstFilesSize, &str_val) &&
dsn::buf2uint64(str_val, val)) {
static uint64_t bytes_per_mb = 1U << 20U;
_pfc_rdb_sst_size->set(val / bytes_per_mb);
dinfo_replica("_pfc_rdb_sst_size: {} bytes", val);
}
// Update _pfc_rdb_write_amplification
std::map<std::string, std::string> props;
if (_db->GetMapProperty(_data_cf, "rocksdb.cfstats", &props)) {
auto write_amplification_iter = props.find("compaction.Sum.WriteAmp");
auto write_amplification = write_amplification_iter == props.end()
? 1
: std::stod(write_amplification_iter->second);
_pfc_rdb_write_amplification->set(write_amplification);
dinfo_replica("_pfc_rdb_write_amplification: {}", write_amplification);
}
// Update _pfc_rdb_index_and_filter_blocks_mem_usage
if (_db->GetProperty(_data_cf, rocksdb::DB::Properties::kEstimateTableReadersMem, &str_val) &&
dsn::buf2uint64(str_val, val)) {
_pfc_rdb_index_and_filter_blocks_mem_usage->set(val);
dinfo_replica("_pfc_rdb_index_and_filter_blocks_mem_usage: {} bytes", val);
}
// Update _pfc_rdb_memtable_mem_usage
if (_db->GetProperty(_data_cf, rocksdb::DB::Properties::kCurSizeAllMemTables, &str_val) &&
dsn::buf2uint64(str_val, val)) {
_pfc_rdb_memtable_mem_usage->set(val);
dinfo_replica("_pfc_rdb_memtable_mem_usage: {} bytes", val);
}
// Update _pfc_rdb_estimate_num_keys
// NOTE: for the same n kv pairs, kEstimateNumKeys will be counted n times, you need compaction
// to remove duplicate
if (_db->GetProperty(_data_cf, rocksdb::DB::Properties::kEstimateNumKeys, &str_val) &&
dsn::buf2uint64(str_val, val)) {
_pfc_rdb_estimate_num_keys->set(val);
dinfo_replica("_pfc_rdb_estimate_num_keys: {}", val);
}
// the follow stats is related to `read`, so only primary need update it,ignore
// `backup-request` case
if (!is_primary()) {
return;
}
// Update _pfc_rdb_read_amplification
if (FLAGS_read_amp_bytes_per_bit > 0) {
auto estimate_useful_bytes =
_statistics->getTickerCount(rocksdb::READ_AMP_ESTIMATE_USEFUL_BYTES);
if (estimate_useful_bytes) {
auto read_amplification =
_statistics->getTickerCount(rocksdb::READ_AMP_TOTAL_READ_BYTES) /
estimate_useful_bytes;
_pfc_rdb_read_amplification->set(read_amplification);
dinfo_replica("_pfc_rdb_read_amplification: {}", read_amplification);
}
}
// Update _pfc_rdb_bf_seek_negatives
auto bf_seek_negatives = _statistics->getTickerCount(rocksdb::BLOOM_FILTER_PREFIX_USEFUL);
_pfc_rdb_bf_seek_negatives->set(bf_seek_negatives);
dinfo_replica("_pfc_rdb_bf_seek_negatives: {}", bf_seek_negatives);
// Update _pfc_rdb_bf_seek_total
auto bf_seek_total = _statistics->getTickerCount(rocksdb::BLOOM_FILTER_PREFIX_CHECKED);
_pfc_rdb_bf_seek_total->set(bf_seek_total);
dinfo_replica("_pfc_rdb_bf_seek_total: {}", bf_seek_total);
// Update _pfc_rdb_bf_point_positive_true
auto bf_point_positive_true =
_statistics->getTickerCount(rocksdb::BLOOM_FILTER_FULL_TRUE_POSITIVE);
_pfc_rdb_bf_point_positive_true->set(bf_point_positive_true);
dinfo_replica("_pfc_rdb_bf_point_positive_true: {}", bf_point_positive_true);
// Update _pfc_rdb_bf_point_positive_total
auto bf_point_positive_total = _statistics->getTickerCount(rocksdb::BLOOM_FILTER_FULL_POSITIVE);
_pfc_rdb_bf_point_positive_total->set(bf_point_positive_total);
dinfo_replica("_pfc_rdb_bf_point_positive_total: {}", bf_point_positive_total);
// Update _pfc_rdb_bf_point_negatives
auto bf_point_negatives = _statistics->getTickerCount(rocksdb::BLOOM_FILTER_USEFUL);
_pfc_rdb_bf_point_negatives->set(bf_point_negatives);
dinfo_replica("_pfc_rdb_bf_point_negatives: {}", bf_point_negatives);
// Update _pfc_rdb_block_cache_hit_count and _pfc_rdb_block_cache_total_count
auto block_cache_hit = _statistics->getTickerCount(rocksdb::BLOCK_CACHE_HIT);
_pfc_rdb_block_cache_hit_count->set(block_cache_hit);
dinfo_replica("_pfc_rdb_block_cache_hit_count: {}", block_cache_hit);
auto block_cache_miss = _statistics->getTickerCount(rocksdb::BLOCK_CACHE_MISS);
auto block_cache_total = block_cache_hit + block_cache_miss;
_pfc_rdb_block_cache_total_count->set(block_cache_total);
dinfo_replica("_pfc_rdb_block_cache_total_count: {}", block_cache_total);
// update block memtable/l0/l1/l2andup hit rate under block cache up level
auto memtable_hit_count = _statistics->getTickerCount(rocksdb::MEMTABLE_HIT);
_pfc_rdb_memtable_hit_count->set(memtable_hit_count);
dinfo_replica("_pfc_rdb_memtable_hit_count: {}", memtable_hit_count);
auto memtable_miss_count = _statistics->getTickerCount(rocksdb::MEMTABLE_MISS);
auto memtable_total = memtable_hit_count + memtable_miss_count;
_pfc_rdb_memtable_total_count->set(memtable_total);
dinfo_replica("_pfc_rdb_memtable_total_count: {}", memtable_total);
auto l0_hit_count = _statistics->getTickerCount(rocksdb::GET_HIT_L0);
_pfc_rdb_l0_hit_count->set(l0_hit_count);
dinfo_replica("_pfc_rdb_l0_hit_count: {}", l0_hit_count);
auto l1_hit_count = _statistics->getTickerCount(rocksdb::GET_HIT_L1);
_pfc_rdb_l1_hit_count->set(l1_hit_count);
dinfo_replica("_pfc_rdb_l1_hit_count: {}", l1_hit_count);
auto l2andup_hit_count = _statistics->getTickerCount(rocksdb::GET_HIT_L2_AND_UP);
_pfc_rdb_l2andup_hit_count->set(l2andup_hit_count);
dinfo_replica("_pfc_rdb_l2andup_hit_count: {}", l2andup_hit_count);
}
void pegasus_server_impl::update_server_rocksdb_statistics()
{
// Update _pfc_rdb_block_cache_mem_usage
if (_s_block_cache) {
uint64_t val = _s_block_cache->GetUsage();
_pfc_rdb_block_cache_mem_usage->set(val);
}
// Update _pfc_rdb_write_limiter_rate_bytes
if (_s_rate_limiter) {
uint64_t current_total_through = _s_rate_limiter->GetTotalBytesThrough();
uint64_t through_bytes_per_sec =
(current_total_through - _rocksdb_limiter_last_total_through) /
kServerStatUpdateTimeSec.count();
_pfc_rdb_write_limiter_rate_bytes->set(through_bytes_per_sec);
_rocksdb_limiter_last_total_through = current_total_through;
}
}
std::pair<std::string, bool>
pegasus_server_impl::get_restore_dir_from_env(const std::map<std::string, std::string> &env_kvs)
{
std::pair<std::string, bool> res;
std::stringstream os;
os << "restore.";
auto it = env_kvs.find(ROCKSDB_ENV_RESTORE_FORCE_RESTORE);
if (it != env_kvs.end()) {
ddebug("%s: found %s in envs", replica_name(), ROCKSDB_ENV_RESTORE_FORCE_RESTORE.c_str());
res.second = true;
}
it = env_kvs.find(ROCKSDB_ENV_RESTORE_POLICY_NAME);
if (it != env_kvs.end()) {
ddebug("%s: found %s in envs: %s",
replica_name(),
ROCKSDB_ENV_RESTORE_POLICY_NAME.c_str(),
it->second.c_str());
os << it->second << ".";
} else {
return res;
}
it = env_kvs.find(ROCKSDB_ENV_RESTORE_BACKUP_ID);
if (it != env_kvs.end()) {
ddebug("%s: found %s in envs: %s",
replica_name(),
ROCKSDB_ENV_RESTORE_BACKUP_ID.c_str(),
it->second.c_str());
os << it->second;
} else {
return res;
}
std::string parent_dir = ::dsn::utils::filesystem::remove_file_name(data_dir());
res.first = ::dsn::utils::filesystem::path_combine(parent_dir, os.str());
return res;
}
void pegasus_server_impl::update_app_envs(const std::map<std::string, std::string> &envs)
{
update_usage_scenario(envs);
update_default_ttl(envs);
update_checkpoint_reserve(envs);
update_slow_query_threshold(envs);
update_rocksdb_iteration_threshold(envs);
update_validate_partition_hash(envs);
update_user_specified_compaction(envs);
_manual_compact_svc.start_manual_compact_if_needed(envs);
}
int64_t pegasus_server_impl::last_flushed_decree() const
{
return _meta_store->get_last_flushed_decree();
}
void pegasus_server_impl::update_app_envs_before_open_db(
const std::map<std::string, std::string> &envs)
{
// we do not update usage scenario because it depends on opened db.
update_default_ttl(envs);
update_checkpoint_reserve(envs);
update_slow_query_threshold(envs);
update_rocksdb_iteration_threshold(envs);
update_validate_partition_hash(envs);
update_user_specified_compaction(envs);
_manual_compact_svc.start_manual_compact_if_needed(envs);
}
void pegasus_server_impl::query_app_envs(/*out*/ std::map<std::string, std::string> &envs)
{
envs[ROCKSDB_ENV_USAGE_SCENARIO_KEY] = _usage_scenario;
}
void pegasus_server_impl::update_usage_scenario(const std::map<std::string, std::string> &envs)
{
// update usage scenario
// if not specified, default is normal
auto find = envs.find(ROCKSDB_ENV_USAGE_SCENARIO_KEY);
std::string new_usage_scenario =
(find != envs.end() ? find->second : ROCKSDB_ENV_USAGE_SCENARIO_NORMAL);
if (new_usage_scenario != _usage_scenario) {
std::string old_usage_scenario = _usage_scenario;
if (set_usage_scenario(new_usage_scenario)) {
ddebug_replica("update app env[{}] from \"{}\" to \"{}\" succeed",
ROCKSDB_ENV_USAGE_SCENARIO_KEY,
old_usage_scenario,
new_usage_scenario);
} else {
derror_replica("update app env[{}] from \"{}\" to \"{}\" failed",
ROCKSDB_ENV_USAGE_SCENARIO_KEY,
old_usage_scenario,
new_usage_scenario);
}
}
}
void pegasus_server_impl::update_default_ttl(const std::map<std::string, std::string> &envs)
{
auto find = envs.find(TABLE_LEVEL_DEFAULT_TTL);
if (find != envs.end()) {
int32_t ttl = 0;
if (!dsn::buf2int32(find->second, ttl) || ttl < 0) {
derror_replica("{}={} is invalid.", find->first, find->second);
return;
}
_server_write->set_default_ttl(static_cast<uint32_t>(ttl));
_key_ttl_compaction_filter_factory->SetDefaultTTL(static_cast<uint32_t>(ttl));
}
}
void pegasus_server_impl::update_checkpoint_reserve(const std::map<std::string, std::string> &envs)
{
int32_t count = _checkpoint_reserve_min_count_in_config;
int32_t time = _checkpoint_reserve_time_seconds_in_config;
auto find = envs.find(ROCKDB_CHECKPOINT_RESERVE_MIN_COUNT);
if (find != envs.end()) {
if (!dsn::buf2int32(find->second, count) || count <= 0) {
derror_replica("{}={} is invalid.", find->first, find->second);
return;
}
}
find = envs.find(ROCKDB_CHECKPOINT_RESERVE_TIME_SECONDS);
if (find != envs.end()) {
if (!dsn::buf2int32(find->second, time) || time < 0) {
derror_replica("{}={} is invalid.", find->first, find->second);
return;
}
}
if (count != _checkpoint_reserve_min_count) {
ddebug_replica("update app env[{}] from \"{}\" to \"{}\" succeed",
ROCKDB_CHECKPOINT_RESERVE_MIN_COUNT,
_checkpoint_reserve_min_count,
count);
_checkpoint_reserve_min_count = count;
}
if (time != _checkpoint_reserve_time_seconds) {
ddebug_replica("update app env[{}] from \"{}\" to \"{}\" succeed",
ROCKDB_CHECKPOINT_RESERVE_TIME_SECONDS,
_checkpoint_reserve_time_seconds,
time);
_checkpoint_reserve_time_seconds = time;
}
}
void pegasus_server_impl::update_slow_query_threshold(
const std::map<std::string, std::string> &envs)
{
uint64_t threshold_ns = _slow_query_threshold_ns_in_config;
auto find = envs.find(ROCKSDB_ENV_SLOW_QUERY_THRESHOLD);
if (find != envs.end()) {
// get slow query from env(the unit of slow query from env is ms)
uint64_t threshold_ms;
if (!dsn::buf2uint64(find->second, threshold_ms) || threshold_ms <= 0) {
derror_replica("{}={} is invalid.", find->first, find->second);
return;
}
threshold_ns = threshold_ms * 1e6;
}
// check if they are changed
if (_slow_query_threshold_ns != threshold_ns) {
ddebug_replica("update app env[{}] from \"{}\" to \"{}\" succeed",
ROCKSDB_ENV_SLOW_QUERY_THRESHOLD,
_slow_query_threshold_ns,
threshold_ns);
_slow_query_threshold_ns = threshold_ns;
}
}
void pegasus_server_impl::update_rocksdb_iteration_threshold(
const std::map<std::string, std::string> &envs)
{
uint64_t threshold_ms = _rng_rd_opts.rocksdb_iteration_threshold_time_ms_in_config;
auto find = envs.find(ROCKSDB_ITERATION_THRESHOLD_TIME_MS);
if (find != envs.end()) {
// the unit of iteration threshold from env is ms
if (!dsn::buf2uint64(find->second, threshold_ms) || threshold_ms < 0) {
derror_replica("{}={} is invalid.", find->first, find->second);
return;
}
}
if (_rng_rd_opts.rocksdb_iteration_threshold_time_ms != threshold_ms) {
ddebug_replica("update app env[{}] from \"{}\" to \"{}\" succeed",
ROCKSDB_ITERATION_THRESHOLD_TIME_MS,
_rng_rd_opts.rocksdb_iteration_threshold_time_ms,
threshold_ms);
_rng_rd_opts.rocksdb_iteration_threshold_time_ms = threshold_ms;
}
}
void pegasus_server_impl::update_rocksdb_block_cache_enabled(
const std::map<std::string, std::string> &envs)
{
// default of ReadOptions:fill_cache is true
bool cache_enabled = true;
auto find = envs.find(ROCKSDB_BLOCK_CACHE_ENABLED);
if (find != envs.end()) {
if (!dsn::buf2bool(find->second, cache_enabled)) {
derror_replica("{}={} is invalid.", find->first, find->second);
return;
}
}
if (_data_cf_rd_opts.fill_cache != cache_enabled) {
ddebug_replica("update app env[{}] from \"{}\" to \"{}\" succeed",
ROCKSDB_BLOCK_CACHE_ENABLED,
_data_cf_rd_opts.fill_cache,
cache_enabled);
_data_cf_rd_opts.fill_cache = cache_enabled;
}
}
void pegasus_server_impl::update_validate_partition_hash(
const std::map<std::string, std::string> &envs)
{
bool new_value = false;
auto iter = envs.find(SPLIT_VALIDATE_PARTITION_HASH);
if (iter != envs.end()) {
if (!dsn::buf2bool(iter->second, new_value)) {
derror_replica("{}={} is invalid.", iter->first, iter->second);
return;
}
}
if (new_value != _validate_partition_hash) {
ddebug_replica(
"update '_validate_partition_hash' from {} to {}", _validate_partition_hash, new_value);
_validate_partition_hash = new_value;
_key_ttl_compaction_filter_factory->SetValidatePartitionHash(_validate_partition_hash);
}
}
void pegasus_server_impl::update_user_specified_compaction(
const std::map<std::string, std::string> &envs)
{
auto iter = envs.find(USER_SPECIFIED_COMPACTION);
if (dsn_unlikely(iter != envs.end() && iter->second != _user_specified_compaction)) {
_key_ttl_compaction_filter_factory->extract_user_specified_ops(iter->second);
_user_specified_compaction = iter->second;
}
}
bool pegasus_server_impl::parse_compression_types(
const std::string &config, std::vector<rocksdb::CompressionType> &compression_per_level)
{
std::vector<rocksdb::CompressionType> tmp(_data_cf_opts.num_levels, rocksdb::kNoCompression);
size_t i = config.find(COMPRESSION_HEADER);
if (i != std::string::npos) {
// New compression config style.
// 'per_level:[none|snappy|zstd|lz4],[none|snappy|zstd|lz4],...' for each level 0,1,...
// The last compression type will be used for levels not specified in the list.
std::vector<std::string> compression_types;
dsn::utils::split_args(
config.substr(COMPRESSION_HEADER.length()).c_str(), compression_types, ',');
rocksdb::CompressionType last_type = rocksdb::kNoCompression;
for (int i = 0; i < _data_cf_opts.num_levels; ++i) {
if (i < compression_types.size()) {
if (!compression_str_to_type(compression_types[i], last_type)) {
return false;
}
}
tmp[i] = last_type;
}
} else {
// Old compression config style.
// '[none|snappy|zstd|lz4]' for all level 2 and higher levels
rocksdb::CompressionType compression;
if (!compression_str_to_type(config, compression)) {
return false;
}
if (compression != rocksdb::kNoCompression) {
// only compress levels >= 2
// refer to ColumnFamilyOptions::OptimizeLevelStyleCompaction()
for (int i = 0; i < _data_cf_opts.num_levels; ++i) {
if (i >= 2) {
tmp[i] = compression;
}
}
}
}
compression_per_level = tmp;
return true;
}
bool pegasus_server_impl::compression_str_to_type(const std::string &compression_str,
rocksdb::CompressionType &type)
{
if (compression_str == "none") {
type = rocksdb::kNoCompression;
} else if (compression_str == "snappy") {
type = rocksdb::kSnappyCompression;
} else if (compression_str == "lz4") {
type = rocksdb::kLZ4Compression;
} else if (compression_str == "zstd") {
type = rocksdb::kZSTD;
} else {
derror_replica("Unsupported compression type: {}.", compression_str);
return false;
}
return true;
}
std::string pegasus_server_impl::compression_type_to_str(rocksdb::CompressionType type)
{
switch (type) {
case rocksdb::kNoCompression:
return "none";
case rocksdb::kSnappyCompression:
return "snappy";
case rocksdb::kLZ4Compression:
return "lz4";
case rocksdb::kZSTD:
return "zstd";
default:
derror_replica("Unsupported compression type: {}.", type);
return "<unsupported>";
}
}
bool pegasus_server_impl::set_usage_scenario(const std::string &usage_scenario)
{
if (usage_scenario == _usage_scenario)
return false;
std::string old_usage_scenario = _usage_scenario;
std::unordered_map<std::string, std::string> new_options;
if (usage_scenario == ROCKSDB_ENV_USAGE_SCENARIO_NORMAL ||
usage_scenario == ROCKSDB_ENV_USAGE_SCENARIO_PREFER_WRITE) {
if (_usage_scenario == ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD) {
// old usage scenario is bulk load, reset first
new_options["level0_file_num_compaction_trigger"] =
std::to_string(_data_cf_opts.level0_file_num_compaction_trigger);
new_options["level0_slowdown_writes_trigger"] =
std::to_string(_data_cf_opts.level0_slowdown_writes_trigger);
new_options["level0_stop_writes_trigger"] =
std::to_string(_data_cf_opts.level0_stop_writes_trigger);
new_options["soft_pending_compaction_bytes_limit"] =
std::to_string(_data_cf_opts.soft_pending_compaction_bytes_limit);
new_options["hard_pending_compaction_bytes_limit"] =
std::to_string(_data_cf_opts.hard_pending_compaction_bytes_limit);
new_options["disable_auto_compactions"] = "false";
new_options["max_compaction_bytes"] =
std::to_string(_data_cf_opts.max_compaction_bytes);
new_options["write_buffer_size"] = std::to_string(_data_cf_opts.write_buffer_size);
new_options["max_write_buffer_number"] =
std::to_string(_data_cf_opts.max_write_buffer_number);
}
if (usage_scenario == ROCKSDB_ENV_USAGE_SCENARIO_NORMAL) {
new_options["write_buffer_size"] =
std::to_string(get_random_nearby(_data_cf_opts.write_buffer_size));
new_options["level0_file_num_compaction_trigger"] =
std::to_string(_data_cf_opts.level0_file_num_compaction_trigger);
} else { // ROCKSDB_ENV_USAGE_SCENARIO_PREFER_WRITE
uint64_t buffer_size = dsn::rand::next_u64(_data_cf_opts.write_buffer_size,
_data_cf_opts.write_buffer_size * 2);
new_options["write_buffer_size"] = std::to_string(buffer_size);
uint64_t max_size = get_random_nearby(_data_cf_opts.max_bytes_for_level_base);
new_options["level0_file_num_compaction_trigger"] =
std::to_string(std::max(4UL, max_size / buffer_size));
}
} else if (usage_scenario == ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD) {
// refer to Options::PrepareForBulkLoad()
new_options["level0_file_num_compaction_trigger"] = "1000000000";
new_options["level0_slowdown_writes_trigger"] = "1000000000";
new_options["level0_stop_writes_trigger"] = "1000000000";
new_options["soft_pending_compaction_bytes_limit"] = "0";
new_options["hard_pending_compaction_bytes_limit"] = "0";
new_options["disable_auto_compactions"] = "true";
new_options["max_compaction_bytes"] = std::to_string(static_cast<uint64_t>(1) << 60);
new_options["write_buffer_size"] =
std::to_string(get_random_nearby(_data_cf_opts.write_buffer_size * 4));
new_options["max_write_buffer_number"] =
std::to_string(std::max(_data_cf_opts.max_write_buffer_number, 6));
} else {
derror("%s: invalid usage scenario: %s", replica_name(), usage_scenario.c_str());
return false;
}
if (set_options(new_options)) {
_meta_store->set_usage_scenario(usage_scenario);
_usage_scenario = usage_scenario;
ddebug_replica(
"set usage scenario from \"{}\" to \"{}\" succeed", old_usage_scenario, usage_scenario);
return true;
} else {
derror_replica(
"set usage scenario from \"{}\" to \"{}\" failed", old_usage_scenario, usage_scenario);
return false;
}
}
void pegasus_server_impl::reset_usage_scenario_options(
const rocksdb::ColumnFamilyOptions &base_opts, rocksdb::ColumnFamilyOptions *target_opts)
{
// reset usage scenario related options, refer to options set in 'set_usage_scenario' function.
target_opts->level0_file_num_compaction_trigger = base_opts.level0_file_num_compaction_trigger;
target_opts->level0_slowdown_writes_trigger = base_opts.level0_slowdown_writes_trigger;
target_opts->level0_stop_writes_trigger = base_opts.level0_stop_writes_trigger;
target_opts->soft_pending_compaction_bytes_limit =
base_opts.soft_pending_compaction_bytes_limit;
target_opts->hard_pending_compaction_bytes_limit =
base_opts.hard_pending_compaction_bytes_limit;
target_opts->disable_auto_compactions = base_opts.disable_auto_compactions;
target_opts->max_compaction_bytes = base_opts.max_compaction_bytes;
target_opts->write_buffer_size = base_opts.write_buffer_size;
target_opts->max_write_buffer_number = base_opts.max_write_buffer_number;
}
bool pegasus_server_impl::set_options(
const std::unordered_map<std::string, std::string> &new_options)
{
if (!_is_open) {
dwarn_replica("set_options failed, db is not open");
return false;
}
std::ostringstream oss;
int i = 0;
for (auto &kv : new_options) {
if (i > 0)
oss << ",";
oss << kv.first << "=" << kv.second;
i++;
}
rocksdb::Status status = _db->SetOptions(_data_cf, new_options);
if (status == rocksdb::Status::OK()) {
ddebug("%s: rocksdb set options returns %s: {%s}",
replica_name(),
status.ToString().c_str(),
oss.str().c_str());
return true;
} else {
derror("%s: rocksdb set options returns %s: {%s}",
replica_name(),
status.ToString().c_str(),
oss.str().c_str());
return false;
}
}
::dsn::error_code pegasus_server_impl::check_column_families(const std::string &path,
bool *missing_meta_cf,
bool *missing_data_cf)
{
*missing_meta_cf = true;
*missing_data_cf = true;
std::vector<std::string> column_families;
auto s = rocksdb::DB::ListColumnFamilies(rocksdb::DBOptions(), path, &column_families);
if (!s.ok()) {
derror_replica("rocksdb::DB::ListColumnFamilies failed, error = {}", s.ToString());
return ::dsn::ERR_LOCAL_APP_FAILURE;
}
for (const auto &column_family : column_families) {
if (column_family == META_COLUMN_FAMILY_NAME) {
*missing_meta_cf = false;
} else if (column_family == DATA_COLUMN_FAMILY_NAME) {
*missing_data_cf = false;
} else {
derror_replica("unknown column family name: {}", column_family);
return ::dsn::ERR_LOCAL_APP_FAILURE;
}
}
return ::dsn::ERR_OK;
}
uint64_t pegasus_server_impl::do_manual_compact(const rocksdb::CompactRangeOptions &options)
{
// wait flush before compact to make all data compacted.
uint64_t start_time = dsn_now_ms();
flush_all_family_columns(true);
ddebug_replica("finish flush_all_family_columns, time_used = {} ms", dsn_now_ms() - start_time);
// do compact
ddebug_replica("start CompactRange, target_level = {}, bottommost_level_compaction = {}",
options.target_level,
options.bottommost_level_compaction == rocksdb::BottommostLevelCompaction::kForce
? "force"
: "skip");
start_time = dsn_now_ms();
auto status = _db->CompactRange(options, _data_cf, nullptr, nullptr);
auto end_time = dsn_now_ms();
ddebug_replica("finish CompactRange, status = {}, time_used = {}ms",
status.ToString(),
end_time - start_time);
_meta_store->set_last_manual_compact_finish_time(end_time);
// generate new checkpoint and remove old checkpoints, in order to release storage asap
if (!release_storage_after_manual_compact()) {
// it is possible that the new checkpoint is not generated, if there was no data
// written into rocksdb when doing manual compact.
// we will try to generate it again, and it will probably succeed because at least some
// empty data is written into rocksdb by periodic group check.
ddebug_replica("release storage failed after manual compact, will retry after 5 minutes");
::dsn::tasking::enqueue(LPC_PEGASUS_SERVER_DELAY,
&_tracker,
[this]() {
ddebug_replica("retry release storage after manual compact");
release_storage_after_manual_compact();
},
0,
std::chrono::minutes(5));
}
// update rocksdb statistics immediately
update_replica_rocksdb_statistics();
return _meta_store->get_last_manual_compact_finish_time();
}
bool pegasus_server_impl::release_storage_after_manual_compact()
{
int64_t old_last_durable = last_durable_decree();
// wait flush before async checkpoint to make all data compacted
uint64_t start_time = dsn_now_ms();
flush_all_family_columns(true);
ddebug_replica("finish flush_all_family_columns, time_used = {} ms", dsn_now_ms() - start_time);
// async checkpoint
ddebug_replica("start async_checkpoint");
start_time = dsn_now_ms();
::dsn::error_code err = async_checkpoint(false);
ddebug_replica("finish async_checkpoint, return = {}, time_used = {}ms",
err.to_string(),
dsn_now_ms() - start_time);
// gc checkpoints
ddebug_replica("start gc_checkpoints");
start_time = dsn_now_ms();
gc_checkpoints(true);
ddebug_replica("finish gc_checkpoints, time_used = {}ms", dsn_now_ms() - start_time);
int64_t new_last_durable = last_flushed_decree();
if (new_last_durable > old_last_durable) {
ddebug_replica("release storage succeed, last_durable_decree changed from {} to {}",
old_last_durable,
new_last_durable);
return true;
} else {
ddebug_replica("release storage failed, last_durable_decree remains {}", new_last_durable);
return false;
}
}
std::string pegasus_server_impl::query_compact_state() const
{
return _manual_compact_svc.query_compact_state();
}
void pegasus_server_impl::set_partition_version(int32_t partition_version)
{
int32_t old_partition_version = _partition_version.exchange(partition_version);
ddebug_replica(
"update partition version from {} to {}", old_partition_version, partition_version);
_key_ttl_compaction_filter_factory->SetPartitionVersion(partition_version);
}
::dsn::error_code pegasus_server_impl::flush_all_family_columns(bool wait)
{
rocksdb::FlushOptions options;
options.wait = wait;
rocksdb::Status status = _db->Flush(options, {_meta_cf, _data_cf});
if (!status.ok()) {
derror_replica("flush failed, error = {}", status.ToString());
return ::dsn::ERR_LOCAL_APP_FAILURE;
}
return ::dsn::ERR_OK;
}
void pegasus_server_impl::release_db()
{
if (_db) {
dassert_replica(_data_cf != nullptr && _meta_cf != nullptr, "");
_db->DestroyColumnFamilyHandle(_data_cf);
_data_cf = nullptr;
_db->DestroyColumnFamilyHandle(_meta_cf);
_meta_cf = nullptr;
delete _db;
_db = nullptr;
}
}
std::string pegasus_server_impl::dump_write_request(dsn::message_ex *request)
{
dsn::task_code rpc_code(request->rpc_code());
if (rpc_code == dsn::apps::RPC_RRDB_RRDB_PUT) {
auto put = put_rpc(request).request();
::dsn::blob hash_key, sort_key;
pegasus_restore_key(put.key, hash_key, sort_key);
return fmt::format("put: hash_key={}, sort_key={}",
pegasus::utils::c_escape_string(hash_key),
pegasus::utils::c_escape_string(sort_key));
}
if (rpc_code == dsn::apps::RPC_RRDB_RRDB_MULTI_PUT) {
auto multi_put = multi_put_rpc(request).request();
return fmt::format("multi_put: hash_key={}, multi_put_count={}",
pegasus::utils::c_escape_string(multi_put.hash_key),
multi_put.kvs.size());
}
if (rpc_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET) {
auto check_and_set = check_and_set_rpc(request).request();
return fmt::format("check_and_set: hash_key={}, check_sort_key={}, set_sort_key={}",
pegasus::utils::c_escape_string(check_and_set.hash_key),
pegasus::utils::c_escape_string(check_and_set.check_sort_key),
pegasus::utils::c_escape_string(check_and_set.set_sort_key));
}
if (rpc_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE) {
auto check_and_mutate = check_and_mutate_rpc(request).request();
return fmt::format("check_and_mutate: hash_key={}, check_sort_key={}, set_value_count={}",
pegasus::utils::c_escape_string(check_and_mutate.hash_key),
pegasus::utils::c_escape_string(check_and_mutate.check_sort_key),
check_and_mutate.mutate_list.size());
}
return "default";
}
void pegasus_server_impl::set_ingestion_status(dsn::replication::ingestion_status::type status)
{
ddebug_replica("ingestion status from {} to {}",
dsn::enum_to_string(_ingestion_status),
dsn::enum_to_string(status));
_ingestion_status = status;
}
void pegasus_server_impl::on_detect_hotkey(const dsn::replication::detect_hotkey_request &req,
dsn::replication::detect_hotkey_response &resp)
{
if (dsn_unlikely(req.action != dsn::replication::detect_action::START &&
req.action != dsn::replication::detect_action::STOP &&
req.action != dsn::replication::detect_action::QUERY)) {
resp.err = dsn::ERR_INVALID_PARAMETERS;
resp.__set_err_hint("invalid detect_action");
return;
}
if (dsn_unlikely(req.type != dsn::replication::hotkey_type::READ &&
req.type != dsn::replication::hotkey_type::WRITE)) {
resp.err = dsn::ERR_INVALID_PARAMETERS;
resp.__set_err_hint("invalid hotkey_type");
return;
}
auto collector = req.type == dsn::replication::hotkey_type::READ ? _read_hotkey_collector
: _write_hotkey_collector;
collector->handle_rpc(req, resp);
}
uint32_t pegasus_server_impl::query_data_version() const { return _pegasus_data_version; }
} // namespace server
} // namespace pegasus