blob: f20f0cad4610c94831fe7ad38b2f565f9faa36c3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "pegasus_manual_compact_service.h"
#include <dsn/dist/fmt_logging.h>
#include <dsn/dist/replication/replication.codes.h>
#include <dsn/tool-api/async_calls.h>
#include <dsn/utility/string_conv.h>
#include <dsn/utils/time_utils.h>
#include "base/pegasus_const.h"
#include "pegasus_server_impl.h"
namespace pegasus {
namespace server {
DEFINE_TASK_CODE(LPC_MANUAL_COMPACT, TASK_PRIORITY_COMMON, THREAD_POOL_COMPACT)
pegasus_manual_compact_service::pegasus_manual_compact_service(pegasus_server_impl *app)
: replica_base(*app),
_app(app),
_disabled(false),
_max_concurrent_running_count(INT_MAX),
_manual_compact_enqueue_time_ms(0),
_manual_compact_start_running_time_ms(0),
_manual_compact_last_finish_time_ms(0),
_manual_compact_last_time_used_ms(0)
{
_manual_compact_min_interval_seconds = (int32_t)dsn_config_get_value_uint64(
"pegasus.server",
"manual_compact_min_interval_seconds",
0,
"minimal interval time in seconds to start a new manual compaction, "
"<= 0 means no interval limit");
_pfc_manual_compact_enqueue_count.init_app_counter("app.pegasus",
"manual.compact.enqueue.count",
COUNTER_TYPE_NUMBER,
"current manual compact in queue count");
_pfc_manual_compact_running_count.init_app_counter("app.pegasus",
"manual.compact.running.count",
COUNTER_TYPE_NUMBER,
"current manual compact running count");
}
void pegasus_manual_compact_service::init_last_finish_time_ms(uint64_t last_finish_time_ms)
{
_manual_compact_last_finish_time_ms.store(last_finish_time_ms);
}
void pegasus_manual_compact_service::start_manual_compact_if_needed(
const std::map<std::string, std::string> &envs)
{
if (check_compact_disabled(envs)) {
ddebug_replica("ignored compact because disabled");
return;
}
if (check_compact_max_concurrent_running_count(envs) <= 0) {
ddebug_replica("ignored compact because max_concurrent_running_count <= 0");
return;
}
std::string compact_rule;
if (check_once_compact(envs)) {
compact_rule = MANUAL_COMPACT_ONCE_KEY_PREFIX;
}
if (compact_rule.empty() && check_periodic_compact(envs)) {
compact_rule = MANUAL_COMPACT_PERIODIC_KEY_PREFIX;
}
if (compact_rule.empty()) {
return;
}
if (check_manual_compact_state()) {
rocksdb::CompactRangeOptions options;
extract_manual_compact_opts(envs, compact_rule, options);
_pfc_manual_compact_enqueue_count->increment();
dsn::tasking::enqueue(LPC_MANUAL_COMPACT, &_app->_tracker, [this, options]() {
_pfc_manual_compact_enqueue_count->decrement();
manual_compact(options);
});
} else {
ddebug_replica("ignored compact because last one is on going or just finished");
}
}
bool pegasus_manual_compact_service::check_compact_disabled(
const std::map<std::string, std::string> &envs)
{
bool new_disabled = false;
auto find = envs.find(MANUAL_COMPACT_DISABLED_KEY);
if (find != envs.end() && find->second == "true") {
new_disabled = true;
}
bool old_disabled = _disabled.load();
if (new_disabled != old_disabled) {
// flag changed
if (new_disabled) {
ddebug_replica("manual compact is set to disabled now");
_disabled.store(true);
} else {
ddebug_replica("manual compact is set to enabled now");
_disabled.store(false);
}
}
return new_disabled;
}
int pegasus_manual_compact_service::check_compact_max_concurrent_running_count(
const std::map<std::string, std::string> &envs)
{
int new_count = INT_MAX;
auto find = envs.find(MANUAL_COMPACT_MAX_CONCURRENT_RUNNING_COUNT_KEY);
if (find != envs.end() && !dsn::buf2int32(find->second, new_count)) {
derror_replica("{}={} is invalid.", find->first, find->second);
}
int old_count = _max_concurrent_running_count.load();
if (new_count != old_count) {
// count changed
ddebug_replica("max_concurrent_running_count changed from {} to {}", old_count, new_count);
_max_concurrent_running_count.store(new_count);
}
return new_count;
}
bool pegasus_manual_compact_service::check_once_compact(
const std::map<std::string, std::string> &envs)
{
auto find = envs.find(MANUAL_COMPACT_ONCE_TRIGGER_TIME_KEY);
if (find == envs.end()) {
return false;
}
int64_t trigger_time = 0;
if (!dsn::buf2int64(find->second, trigger_time) || trigger_time <= 0) {
derror_replica("{}={} is invalid.", find->first, find->second);
return false;
}
return trigger_time > _manual_compact_last_finish_time_ms.load() / 1000;
}
bool pegasus_manual_compact_service::check_periodic_compact(
const std::map<std::string, std::string> &envs)
{
auto find = envs.find(MANUAL_COMPACT_PERIODIC_TRIGGER_TIME_KEY);
if (find == envs.end()) {
return false;
}
std::list<std::string> trigger_time_strs;
dsn::utils::split_args(find->second.c_str(), trigger_time_strs, ',');
if (trigger_time_strs.empty()) {
derror_replica("{}={} is invalid.", find->first, find->second);
return false;
}
std::set<int64_t> trigger_time;
for (auto &tts : trigger_time_strs) {
int64_t tt = dsn::utils::hh_mm_today_to_unix_sec(tts);
if (tt != -1) {
trigger_time.emplace(tt);
}
}
if (trigger_time.empty()) {
derror_replica("{}={} is invalid.", find->first, find->second);
return false;
}
auto now = static_cast<int64_t>(now_timestamp());
for (auto t : trigger_time) {
auto t_ms = t * 1000;
if (_manual_compact_last_finish_time_ms.load() < t_ms && t_ms < now) {
return true;
}
}
return false;
}
uint64_t pegasus_manual_compact_service::now_timestamp()
{
#ifdef PEGASUS_UNIT_TEST
ddebug_replica("_mock_now_timestamp={}", _mock_now_timestamp);
return _mock_now_timestamp == 0 ? dsn_now_ms() : _mock_now_timestamp;
#else
return dsn_now_ms();
#endif
}
void pegasus_manual_compact_service::extract_manual_compact_opts(
const std::map<std::string, std::string> &envs,
const std::string &key_prefix,
rocksdb::CompactRangeOptions &options)
{
options.exclusive_manual_compaction = true;
options.change_level = true;
options.target_level = -1;
auto find = envs.find(key_prefix + MANUAL_COMPACT_TARGET_LEVEL_KEY);
if (find != envs.end()) {
int32_t target_level;
if (dsn::buf2int32(find->second, target_level) &&
(target_level == -1 ||
(target_level >= 1 && target_level <= _app->_data_cf_opts.num_levels))) {
options.target_level = target_level;
} else {
dwarn_replica("{}={} is invalid, use default value {}",
find->first,
find->second,
options.target_level);
}
}
options.bottommost_level_compaction = rocksdb::BottommostLevelCompaction::kSkip;
find = envs.find(key_prefix + MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_KEY);
if (find != envs.end()) {
const std::string &argv = find->second;
if (argv == MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_FORCE) {
options.bottommost_level_compaction = rocksdb::BottommostLevelCompaction::kForce;
} else if (argv == MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP) {
options.bottommost_level_compaction = rocksdb::BottommostLevelCompaction::kSkip;
} else {
dwarn_replica(
"{}={} is invalid, use default value {}",
find->first,
find->second,
// NOTICE associate with options.bottommost_level_compaction's default value above
MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP);
}
}
}
bool pegasus_manual_compact_service::check_manual_compact_state()
{
uint64_t not_enqueue = 0;
uint64_t now = now_timestamp();
if (_manual_compact_min_interval_seconds <= 0 || // no interval limit
_manual_compact_last_finish_time_ms.load() == 0 || // has not compacted yet
now - _manual_compact_last_finish_time_ms.load() >
(uint64_t)_manual_compact_min_interval_seconds * 1000) { // interval past
// when _manual_compact_enqueue_time_ms is `not_enqueue`(which is 0), return true to allow a
// compact task enqueue, and update the value to `now`,
// otherwise, return false to not allow, and keep the old value.
return _manual_compact_enqueue_time_ms.compare_exchange_strong(not_enqueue,
now); // not enqueue
} else {
return false;
}
}
void pegasus_manual_compact_service::manual_compact(const rocksdb::CompactRangeOptions &options)
{
// if we find manual compaction is disabled when transfer from queue to running,
// it would not to be started.
if (_disabled.load()) {
ddebug_replica("ignored compact because disabled");
_manual_compact_enqueue_time_ms.store(0);
return;
}
// if current running count exceeds the limit, it would not to be started.
_pfc_manual_compact_running_count->increment();
if (_pfc_manual_compact_running_count->get_integer_value() > _max_concurrent_running_count) {
_pfc_manual_compact_running_count->decrement();
ddebug_replica("ignored compact because exceed max_concurrent_running_count({})",
_max_concurrent_running_count.load());
_manual_compact_enqueue_time_ms.store(0);
return;
}
uint64_t start = begin_manual_compact();
uint64_t finish = _app->do_manual_compact(options);
end_manual_compact(start, finish);
_pfc_manual_compact_running_count->decrement();
}
uint64_t pegasus_manual_compact_service::begin_manual_compact()
{
ddebug_replica("start to execute manual compaction");
uint64_t start = now_timestamp();
_manual_compact_start_running_time_ms.store(start);
return start;
}
void pegasus_manual_compact_service::end_manual_compact(uint64_t start, uint64_t finish)
{
ddebug_replica("finish to execute manual compaction, time_used = {}ms", finish - start);
_manual_compact_last_finish_time_ms.store(finish);
_manual_compact_last_time_used_ms.store(finish - start);
_manual_compact_enqueue_time_ms.store(0);
_manual_compact_start_running_time_ms.store(0);
}
std::string pegasus_manual_compact_service::query_compact_state() const
{
uint64_t enqueue_time_ms = _manual_compact_enqueue_time_ms.load();
uint64_t start_time_ms = _manual_compact_start_running_time_ms.load();
uint64_t last_finish_time_ms = _manual_compact_last_finish_time_ms.load();
uint64_t last_time_used_ms = _manual_compact_last_time_used_ms.load();
std::stringstream state;
if (last_finish_time_ms > 0) {
char str[24];
dsn::utils::time_ms_to_string(last_finish_time_ms, str);
state << "last finish at [" << str << "]";
} else {
state << "last finish at [-]";
}
if (last_time_used_ms > 0) {
state << ", last used " << last_time_used_ms << " ms";
}
if (enqueue_time_ms > 0) {
char str[24];
dsn::utils::time_ms_to_string(enqueue_time_ms, str);
state << ", recent enqueue at [" << str << "]";
}
if (start_time_ms > 0) {
char str[24];
dsn::utils::time_ms_to_string(start_time_ms, str);
state << ", recent start at [" << str << "]";
}
return state.str();
}
} // namespace server
} // namespace pegasus