refactor(command_manager): Simplify command_manager code (#1875)
- Simplify the registration of boolean remote command, now it can be registered
by `ommand_manager::register_bool_command(bool &value, const std::string &command, const std::string &help)`
- Change the type of `node_live_percentage_threshold_for_update` from
uint64 to int32, there is no impact bacause it's a percentage which
is range in [0, 100].
- Change the type of `replica_assign_delay_ms_for_dropouts` from uint64
to int64, there is less impact bacause int64 is big enough to set a
milliseconds time interval (maximum to 106751991167 days).
- Change the type of `max_copy_rate_megabytes_per_disk`,
`max_send_rate_megabytes_per_disk` to int64, there is no impact
bacause its range is enlarged.
- Use FLAGS_max_concurrent_bulk_load_downloading_count directly and remove
it from replication_options structure and replica_stub class.
diff --git a/src/common/replication_common.cpp b/src/common/replication_common.cpp
index 3b0a7f0..b5d2571 100644
--- a/src/common/replication_common.cpp
+++ b/src/common/replication_common.cpp
@@ -48,10 +48,6 @@
namespace replication {
DSN_DEFINE_bool(replication, duplication_enabled, true, "is duplication enabled");
-DSN_DEFINE_int32(replication,
- max_concurrent_bulk_load_downloading_count,
- 5,
- "concurrent bulk load downloading replica count");
DSN_DEFINE_int32(replication,
mutation_2pc_min_replica_count,
@@ -144,8 +140,6 @@
CHECK(!data_dirs.empty(), "no replica data dir found, maybe not set or excluded by black list");
- max_concurrent_bulk_load_downloading_count = FLAGS_max_concurrent_bulk_load_downloading_count;
-
CHECK(replica_helper::load_meta_servers(meta_servers), "invalid meta server config");
}
diff --git a/src/common/replication_common.h b/src/common/replication_common.h
index 3771b2c..30412e7 100644
--- a/src/common/replication_common.h
+++ b/src/common/replication_common.h
@@ -64,8 +64,6 @@
std::vector<std::string> data_dirs;
std::vector<std::string> data_dir_tags;
- int32_t max_concurrent_bulk_load_downloading_count;
-
public:
replication_options() = default;
~replication_options();
diff --git a/src/meta/app_balance_policy.cpp b/src/meta/app_balance_policy.cpp
index 972f8ba..00ebbc6 100644
--- a/src/meta/app_balance_policy.cpp
+++ b/src/meta/app_balance_policy.cpp
@@ -20,7 +20,6 @@
#include <iterator>
#include <map>
#include <set>
-#include <string>
#include "app_balance_policy.h"
#include "common/gpid.h"
@@ -50,30 +49,18 @@
_only_primary_balancer = false;
_only_move_primary = false;
}
- _cmds.emplace_back(dsn::command_manager::instance().register_command(
- {"meta.lb.balancer_in_turn"},
- "meta.lb.balancer_in_turn <true|false>",
- "control whether do app balancer in turn",
- [this](const std::vector<std::string> &args) {
- return remote_command_set_bool_flag(_balancer_in_turn, "lb.balancer_in_turn", args);
- }));
+ _cmds.emplace_back(dsn::command_manager::instance().register_bool_command(
+ _balancer_in_turn, "meta.lb.balancer_in_turn", "control whether do app balancer in turn"));
- _cmds.emplace_back(dsn::command_manager::instance().register_command(
- {"meta.lb.only_primary_balancer"},
- "meta.lb.only_primary_balancer <true|false>",
- "control whether do only primary balancer",
- [this](const std::vector<std::string> &args) {
- return remote_command_set_bool_flag(
- _only_primary_balancer, "lb.only_primary_balancer", args);
- }));
+ _cmds.emplace_back(dsn::command_manager::instance().register_bool_command(
+ _only_primary_balancer,
+ "meta.lb.only_primary_balancer",
+ "control whether do only primary balancer"));
- _cmds.emplace_back(dsn::command_manager::instance().register_command(
- {"meta.lb.only_move_primary"},
- "meta.lb.only_move_primary <true|false>",
- "control whether only move primary in balancer",
- [this](const std::vector<std::string> &args) {
- return remote_command_set_bool_flag(_only_move_primary, "lb.only_move_primary", args);
- }));
+ _cmds.emplace_back(dsn::command_manager::instance().register_bool_command(
+ _only_move_primary,
+ "meta.lb.only_move_primary",
+ "control whether only move primary in balancer"));
}
void app_balance_policy::balance(bool checker, const meta_view *global_view, migration_list *list)
diff --git a/src/meta/meta_service.cpp b/src/meta/meta_service.cpp
index 8fc64c1..952b33a 100644
--- a/src/meta/meta_service.cpp
+++ b/src/meta/meta_service.cpp
@@ -55,8 +55,8 @@
#include "meta_service.h"
#include "meta_split_service.h"
#include "partition_split_types.h"
-#include "remote_cmd/remote_command.h"
#include "ranger/ranger_resource_policy_manager.h"
+#include "remote_cmd/remote_command.h"
#include "runtime/rpc/rpc_holder.h"
#include "runtime/task/async_calls.h"
#include "server_load_balancer.h"
@@ -67,7 +67,6 @@
#include "utils/filesystem.h"
#include "utils/flags.h"
#include "utils/fmt_logging.h"
-#include "utils/string_conv.h"
#include "utils/strings.h"
METRIC_DEFINE_counter(server,
@@ -112,11 +111,13 @@
lb_interval_ms,
10000,
"every this period(ms) the meta server will do load balance");
-DSN_DEFINE_uint64(meta_server,
- node_live_percentage_threshold_for_update,
- 65,
- "If live_node_count * 100 < total_node_count * "
- "node_live_percentage_threshold_for_update, then freeze the cluster.");
+DSN_DEFINE_int32(meta_server,
+ node_live_percentage_threshold_for_update,
+ 65,
+ "If live_node_count * 100 < total_node_count * "
+ "node_live_percentage_threshold_for_update, then freeze the cluster.");
+DSN_DEFINE_validator(node_live_percentage_threshold_for_update,
+ [](int32_t value) -> bool { return value >= 0 && value <= 100; });
DSN_DEFINE_string(meta_server,
meta_state_service_type,
"meta_state_service_simple",
@@ -303,29 +304,12 @@
void meta_service::register_ctrl_commands()
{
_ctrl_node_live_percentage_threshold_for_update =
- dsn::command_manager::instance().register_command(
- {"meta.live_percentage"},
- "meta.live_percentage [num | DEFAULT]",
+ dsn::command_manager::instance().register_int_command(
+ _node_live_percentage_threshold_for_update,
+ FLAGS_node_live_percentage_threshold_for_update,
+ "meta.live_percentage",
"node live percentage threshold for update",
- [this](const std::vector<std::string> &args) {
- std::string result("OK");
- if (args.empty()) {
- result = std::to_string(_node_live_percentage_threshold_for_update);
- } else {
- if (args[0] == "DEFAULT") {
- _node_live_percentage_threshold_for_update =
- FLAGS_node_live_percentage_threshold_for_update;
- } else {
- int32_t v = 0;
- if (!dsn::buf2int32(args[0], v) || v < 0) {
- result = std::string("ERR: invalid arguments");
- } else {
- _node_live_percentage_threshold_for_update = v;
- }
- }
- }
- return result;
- });
+ [](int32_t new_value) -> bool { return new_value >= 0 && new_value <= 100; });
}
void meta_service::start_service()
diff --git a/src/meta/meta_service.h b/src/meta/meta_service.h
index e46ad90..4b531a9 100644
--- a/src/meta/meta_service.h
+++ b/src/meta/meta_service.h
@@ -338,7 +338,7 @@
replication_options _opts;
meta_options _meta_opts;
- uint64_t _node_live_percentage_threshold_for_update;
+ int32_t _node_live_percentage_threshold_for_update;
std::unique_ptr<command_deregister> _ctrl_node_live_percentage_threshold_for_update;
std::shared_ptr<server_state> _state;
diff --git a/src/meta/partition_guardian.cpp b/src/meta/partition_guardian.cpp
index 0c16106..d99ce22 100644
--- a/src/meta/partition_guardian.cpp
+++ b/src/meta/partition_guardian.cpp
@@ -21,6 +21,7 @@
#include <inttypes.h>
#include <stdio.h>
#include <algorithm>
+#include <cstdint>
#include <ostream>
#include <unordered_map>
@@ -34,7 +35,6 @@
#include "utils/flags.h"
#include "utils/fmt_logging.h"
#include "utils/metrics.h"
-#include "utils/string_conv.h"
#include "utils/strings.h"
#include "utils/time_utils.h"
@@ -42,10 +42,10 @@
namespace replication {
DSN_DEFINE_int32(meta_server, max_replicas_in_group, 4, "max replicas(alive & dead) in a group");
-DSN_DEFINE_uint64(meta_server,
- replica_assign_delay_ms_for_dropouts,
- 300000,
- "The delay milliseconds to dropout replicas assign");
+DSN_DEFINE_int64(meta_server,
+ replica_assign_delay_ms_for_dropouts,
+ 300000,
+ "The delay milliseconds to dropout replicas assign");
partition_guardian::partition_guardian(meta_service *svc) : _svc(svc)
{
@@ -681,12 +681,11 @@
void partition_guardian::register_ctrl_commands()
{
- // TODO(yingchun): update _replica_assign_delay_ms_for_dropouts by http
- _cmds.emplace_back(dsn::command_manager::instance().register_command(
- {"meta.lb.assign_delay_ms"},
- "lb.assign_delay_ms [num | DEFAULT]",
- "control the replica_assign_delay_ms_for_dropouts config",
- [this](const std::vector<std::string> &args) { return ctrl_assign_delay_ms(args); }));
+ _cmds.emplace_back(dsn::command_manager::instance().register_int_command(
+ _replica_assign_delay_ms_for_dropouts,
+ FLAGS_replica_assign_delay_ms_for_dropouts,
+ "meta.lb.assign_delay_ms",
+ "control the replica_assign_delay_ms_for_dropouts config"));
_cmds.emplace_back(dsn::command_manager::instance().register_command(
{"meta.lb.assign_secondary_black_list"},
@@ -697,26 +696,6 @@
}));
}
-std::string partition_guardian::ctrl_assign_delay_ms(const std::vector<std::string> &args)
-{
- std::string result("OK");
- if (args.empty()) {
- result = std::to_string(_replica_assign_delay_ms_for_dropouts);
- } else {
- if (args[0] == "DEFAULT") {
- _replica_assign_delay_ms_for_dropouts = FLAGS_replica_assign_delay_ms_for_dropouts;
- } else {
- int32_t v = 0;
- if (!dsn::buf2int32(args[0], v) || v <= 0) {
- result = std::string("ERR: invalid arguments");
- } else {
- _replica_assign_delay_ms_for_dropouts = v;
- }
- }
- }
- return result;
-}
-
std::string
partition_guardian::ctrl_assign_secondary_black_list(const std::vector<std::string> &args)
{
diff --git a/src/meta/partition_guardian.h b/src/meta/partition_guardian.h
index 9c77da7..612a56a 100644
--- a/src/meta/partition_guardian.h
+++ b/src/meta/partition_guardian.h
@@ -74,7 +74,6 @@
void finish_cure_proposal(meta_view &view,
const dsn::gpid &gpid,
const configuration_proposal_action &action);
- std::string ctrl_assign_delay_ms(const std::vector<std::string> &args);
std::string ctrl_assign_secondary_black_list(const std::vector<std::string> &args);
void set_ddd_partition(ddd_partition_info &&partition)
@@ -103,7 +102,7 @@
// ]
std::vector<std::unique_ptr<command_deregister>> _cmds;
- uint64_t _replica_assign_delay_ms_for_dropouts;
+ int64_t _replica_assign_delay_ms_for_dropouts;
friend class meta_partition_guardian_test;
};
diff --git a/src/meta/server_state.cpp b/src/meta/server_state.cpp
index 9ff3c38..aef6d03 100644
--- a/src/meta/server_state.cpp
+++ b/src/meta/server_state.cpp
@@ -38,6 +38,7 @@
#include <sstream> // IWYU pragma: keep
#include <string>
#include <thread>
+// IWYU pragma: no_include <type_traits>
#include <unordered_map>
#include "app_env_validator.h"
@@ -163,38 +164,16 @@
return std::string(err.to_string());
}));
- _cmds.emplace_back(dsn::command_manager::instance().register_command(
- {"meta.lb.add_secondary_enable_flow_control"},
- "meta.lb.add_secondary_enable_flow_control <true|false>",
- "control whether enable add secondary flow control",
- [this](const std::vector<std::string> &args) {
- return remote_command_set_bool_flag(
- _add_secondary_enable_flow_control, "lb.add_secondary_enable_flow_control", args);
- }));
+ _cmds.emplace_back(dsn::command_manager::instance().register_bool_command(
+ _add_secondary_enable_flow_control,
+ "meta.lb.add_secondary_enable_flow_control",
+ "control whether enable add secondary flow control"));
- _cmds.emplace_back(dsn::command_manager::instance().register_command(
- {"meta.lb.add_secondary_max_count_for_one_node"},
- "meta.lb.add_secondary_max_count_for_one_node [num | DEFAULT]",
- "control the max count to add secondary for one node",
- [this](const std::vector<std::string> &args) {
- std::string result("OK");
- if (args.empty()) {
- result = std::to_string(_add_secondary_max_count_for_one_node);
- } else {
- if (args[0] == "DEFAULT") {
- _add_secondary_max_count_for_one_node =
- FLAGS_add_secondary_max_count_for_one_node;
- } else {
- int32_t v = 0;
- if (!dsn::buf2int32(args[0], v) || v < 0) {
- result = std::string("ERR: invalid arguments");
- } else {
- _add_secondary_max_count_for_one_node = v;
- }
- }
- }
- return result;
- }));
+ _cmds.emplace_back(dsn::command_manager::instance().register_int_command(
+ _add_secondary_max_count_for_one_node,
+ FLAGS_add_secondary_max_count_for_one_node,
+ "meta.lb.add_secondary_max_count_for_one_node",
+ "control the max count to add secondary for one node"));
}
void server_state::initialize(meta_service *meta_svc, const std::string &apps_root)
diff --git a/src/meta/test/meta_test_base.cpp b/src/meta/test/meta_test_base.cpp
index 720efcf..6fc5371 100644
--- a/src/meta/test/meta_test_base.cpp
+++ b/src/meta/test/meta_test_base.cpp
@@ -116,7 +116,7 @@
FLAGS_min_live_node_count_for_unfreeze = node_count;
}
-void meta_test_base::set_node_live_percentage_threshold_for_update(uint64_t percentage_threshold)
+void meta_test_base::set_node_live_percentage_threshold_for_update(int32_t percentage_threshold)
{
_ms->_node_live_percentage_threshold_for_update = percentage_threshold;
}
diff --git a/src/meta/test/meta_test_base.h b/src/meta/test/meta_test_base.h
index 9422e21..843ff91 100644
--- a/src/meta/test/meta_test_base.h
+++ b/src/meta/test/meta_test_base.h
@@ -53,7 +53,7 @@
void set_min_live_node_count_for_unfreeze(uint64_t node_count);
- void set_node_live_percentage_threshold_for_update(uint64_t percentage_threshold);
+ void set_node_live_percentage_threshold_for_update(int32_t percentage_threshold);
std::vector<rpc_address> ensure_enough_alive_nodes(int min_node_count);
diff --git a/src/meta/test/update_configuration_test.cpp b/src/meta/test/update_configuration_test.cpp
index 2c23d23..d41d47e 100644
--- a/src/meta/test/update_configuration_test.cpp
+++ b/src/meta/test/update_configuration_test.cpp
@@ -70,9 +70,9 @@
namespace dsn {
namespace replication {
+DSN_DECLARE_int32(node_live_percentage_threshold_for_update);
+DSN_DECLARE_int64(replica_assign_delay_ms_for_dropouts);
DSN_DECLARE_uint64(min_live_node_count_for_unfreeze);
-DSN_DECLARE_uint64(node_live_percentage_threshold_for_update);
-DSN_DECLARE_uint64(replica_assign_delay_ms_for_dropouts);
class fake_sender_meta_service : public dsn::replication::meta_service
{
diff --git a/src/nfs/nfs_client_impl.cpp b/src/nfs/nfs_client_impl.cpp
index b6d52e9..63cb1b8 100644
--- a/src/nfs/nfs_client_impl.cpp
+++ b/src/nfs/nfs_client_impl.cpp
@@ -30,6 +30,7 @@
#include <mutex>
#include "absl/strings/string_view.h"
+#include "fmt/core.h"
#include "nfs/nfs_code_definition.h"
#include "nfs/nfs_node.h"
#include "utils/blob.h"
@@ -37,7 +38,6 @@
#include "utils/filesystem.h"
#include "utils/flags.h"
#include "utils/fmt_logging.h"
-#include "utils/string_conv.h"
#include "utils/token_buckets.h"
METRIC_DEFINE_counter(server,
@@ -69,17 +69,18 @@
nfs_copy_block_bytes,
4 * 1024 * 1024,
"max block size (bytes) for each network copy");
-DSN_DEFINE_uint32(
- nfs,
- max_copy_rate_megabytes_per_disk,
- 0,
- "max rate per disk of copying from remote node(MB/s), zero means disable rate limiter");
+static const char *kMaxCopyRateMegaBytesPerDiskDesc =
+ "The maximum bandwidth (MB/s) of writing data per local disk when copying from remote node, 0 "
+ "means no limit";
+DSN_DEFINE_int64(nfs, max_copy_rate_megabytes_per_disk, 0, kMaxCopyRateMegaBytesPerDiskDesc);
DSN_TAG_VARIABLE(max_copy_rate_megabytes_per_disk, FT_MUTABLE);
-// max_copy_rate_bytes should be zero or greater than nfs_copy_block_bytes which is the max
-// batch copy size once
+
+bool check_max_copy_rate_megabytes_per_disk(int64_t value)
+{
+ return value == 0 || (value << 20) > FLAGS_nfs_copy_block_bytes;
+}
DSN_DEFINE_group_validator(max_copy_rate_megabytes_per_disk, [](std::string &message) -> bool {
- return FLAGS_max_copy_rate_megabytes_per_disk == 0 ||
- (FLAGS_max_copy_rate_megabytes_per_disk << 20) > FLAGS_nfs_copy_block_bytes;
+ return check_max_copy_rate_megabytes_per_disk(FLAGS_max_copy_rate_megabytes_per_disk);
});
DSN_DEFINE_int32(nfs,
@@ -582,33 +583,15 @@
{
static std::once_flag flag;
std::call_once(flag, [&]() {
- _nfs_max_copy_rate_megabytes_cmd = dsn::command_manager::instance().register_command(
- {"nfs.max_copy_rate_megabytes_per_disk"},
- "nfs.max_copy_rate_megabytes_per_disk [num]",
- "control the max rate(MB/s) for one disk to copy file from remote node",
- [](const std::vector<std::string> &args) {
- std::string result("OK");
-
- if (args.empty()) {
- return std::to_string(FLAGS_max_copy_rate_megabytes_per_disk);
- }
-
- int32_t max_copy_rate_megabytes = 0;
- if (!dsn::buf2int32(args[0], max_copy_rate_megabytes) ||
- max_copy_rate_megabytes <= 0) {
- return std::string("ERR: invalid arguments");
- }
-
- uint32_t max_copy_rate_bytes = max_copy_rate_megabytes << 20;
- if (max_copy_rate_bytes <= FLAGS_nfs_copy_block_bytes) {
- result = std::string("ERR: max_copy_rate_bytes(max_copy_rate_megabytes << 20) "
- "should be greater than nfs_copy_block_bytes:")
- .append(std::to_string(FLAGS_nfs_copy_block_bytes));
- return result;
- }
- FLAGS_max_copy_rate_megabytes_per_disk = max_copy_rate_megabytes;
- return result;
- });
+ _nfs_max_copy_rate_megabytes_cmd = dsn::command_manager::instance().register_int_command(
+ FLAGS_max_copy_rate_megabytes_per_disk,
+ FLAGS_max_copy_rate_megabytes_per_disk,
+ "nfs.max_copy_rate_megabytes_per_disk",
+ fmt::format("{}, "
+ "should be greater than 'nfs_copy_block_bytes' which is {}",
+ kMaxCopyRateMegaBytesPerDiskDesc,
+ FLAGS_nfs_copy_block_bytes),
+ &check_max_copy_rate_megabytes_per_disk);
});
}
} // namespace service
diff --git a/src/nfs/nfs_server_impl.cpp b/src/nfs/nfs_server_impl.cpp
index 6cce235..80fbea5 100644
--- a/src/nfs/nfs_server_impl.cpp
+++ b/src/nfs/nfs_server_impl.cpp
@@ -42,7 +42,6 @@
#include "utils/env.h"
#include "utils/filesystem.h"
#include "utils/flags.h"
-#include "utils/string_conv.h"
#include "utils/utils.h"
METRIC_DEFINE_counter(
@@ -62,11 +61,10 @@
namespace service {
-DSN_DEFINE_uint32(
- nfs,
- max_send_rate_megabytes_per_disk,
- 0,
- "max rate per disk of send to remote node(MB/s),zero means disable rate limiter");
+static const char *kMaxSendRateMegaBytesPerDiskDesc =
+ "The maximum bandwidth (MB/s) of reading data per local disk "
+ "when transferring data to remote node, 0 means no limit";
+DSN_DEFINE_int64(nfs, max_send_rate_megabytes_per_disk, 0, kMaxSendRateMegaBytesPerDiskDesc);
DSN_TAG_VARIABLE(max_send_rate_megabytes_per_disk, FT_MUTABLE);
DSN_DECLARE_int32(file_close_timer_interval_ms_on_server);
@@ -261,26 +259,11 @@
{
static std::once_flag flag;
std::call_once(flag, [&]() {
- _nfs_max_send_rate_megabytes_cmd = dsn::command_manager::instance().register_command(
- {"nfs.max_send_rate_megabytes_per_disk"},
- "nfs.max_send_rate_megabytes_per_disk [num]",
- "control the max rate(MB/s) for one disk to send file to remote node",
- [](const std::vector<std::string> &args) {
- std::string result("OK");
-
- if (args.empty()) {
- return std::to_string(FLAGS_max_send_rate_megabytes_per_disk);
- }
-
- int32_t max_send_rate_megabytes = 0;
- if (!dsn::buf2int32(args[0], max_send_rate_megabytes) ||
- max_send_rate_megabytes <= 0) {
- return std::string("ERR: invalid arguments");
- }
-
- FLAGS_max_send_rate_megabytes_per_disk = max_send_rate_megabytes;
- return result;
- });
+ _nfs_max_send_rate_megabytes_cmd = dsn::command_manager::instance().register_int_command(
+ FLAGS_max_send_rate_megabytes_per_disk,
+ FLAGS_max_send_rate_megabytes_per_disk,
+ "nfs.max_send_rate_megabytes_per_disk",
+ kMaxSendRateMegaBytesPerDiskDesc);
});
}
diff --git a/src/replica/backup/cold_backup_context.cpp b/src/replica/backup/cold_backup_context.cpp
index fb7876a..a96dd94 100644
--- a/src/replica/backup/cold_backup_context.cpp
+++ b/src/replica/backup/cold_backup_context.cpp
@@ -20,6 +20,7 @@
#include <chrono>
#include <cstdint>
#include <memory>
+// IWYU pragma: no_include <type_traits>
#include "common/backup_common.h"
#include "common/replication.codes.h"
diff --git a/src/replica/bulk_load/replica_bulk_loader.cpp b/src/replica/bulk_load/replica_bulk_loader.cpp
index ba32e41..1306eae 100644
--- a/src/replica/bulk_load/replica_bulk_loader.cpp
+++ b/src/replica/bulk_load/replica_bulk_loader.cpp
@@ -44,6 +44,7 @@
#include "utils/env.h"
#include "utils/fail_point.h"
#include "utils/filesystem.h"
+#include "utils/flags.h"
#include "utils/fmt_logging.h"
#include "utils/load_dump_object.h"
#include "utils/thread_access_checker.h"
@@ -83,6 +84,8 @@
dsn::metric_unit::kBytes,
"The size of files that have been downloaded successfully for bulk loads");
+DSN_DECLARE_int32(max_concurrent_bulk_load_downloading_count);
+
namespace dsn {
namespace dist {
namespace block_service {
@@ -425,7 +428,7 @@
const std::string &provider_name)
{
if (_stub->_bulk_load_downloading_count.load() >=
- _stub->_max_concurrent_bulk_load_downloading_count) {
+ FLAGS_max_concurrent_bulk_load_downloading_count) {
LOG_WARNING_PREFIX("node[{}] already has {} replica downloading, wait for next round",
_stub->_primary_address_str,
_stub->_bulk_load_downloading_count.load());
diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp
index 2f5a8e6..6a2ca4d 100644
--- a/src/replica/replica_stub.cpp
+++ b/src/replica/replica_stub.cpp
@@ -78,7 +78,6 @@
#include "utils/ports.h"
#include "utils/process_utils.h"
#include "utils/rand.h"
-#include "utils/string_conv.h"
#include "utils/strings.h"
#include "utils/synchronize.h"
#ifdef DSN_ENABLE_GPERF
@@ -90,6 +89,15 @@
#include "remote_cmd/remote_command.h"
#include "utils/fail_point.h"
+static const char *kMaxConcurrentBulkLoadDownloadingCountDesc =
+ "The maximum concurrent bulk load downloading replica count";
+DSN_DEFINE_int32(replication,
+ max_concurrent_bulk_load_downloading_count,
+ 5,
+ kMaxConcurrentBulkLoadDownloadingCountDesc);
+DSN_DEFINE_validator(max_concurrent_bulk_load_downloading_count,
+ [](int32_t value) -> bool { return value >= 0; });
+
METRIC_DEFINE_gauge_int64(server,
total_replicas,
dsn::metric_unit::kReplicas,
@@ -294,6 +302,12 @@
10,
"if tcmalloc reserved but not-used memory exceed this percentage of application allocated "
"memory, replica server will release the exceeding memory back to operating system");
+bool check_mem_release_max_reserved_mem_percentage(int32_t value)
+{
+ return value > 0 && value <= 100;
+}
+DSN_DEFINE_validator(mem_release_max_reserved_mem_percentage,
+ &check_mem_release_max_reserved_mem_percentage);
DSN_DEFINE_string(
pegasus.server,
@@ -339,7 +353,6 @@
_verbose_commit_log(false),
_release_tcmalloc_memory(false),
_mem_release_max_reserved_mem_percentage(10),
- _max_concurrent_bulk_load_downloading_count(5),
_learn_app_concurrent_count(0),
_bulk_load_downloading_count(0),
_manual_emergency_checkpointing_count(0),
@@ -406,8 +419,6 @@
_verbose_commit_log = FLAGS_verbose_commit_log_on_start;
_release_tcmalloc_memory = FLAGS_mem_release_enabled;
_mem_release_max_reserved_mem_percentage = FLAGS_mem_release_max_reserved_mem_percentage;
- _max_concurrent_bulk_load_downloading_count =
- _options.max_concurrent_bulk_load_downloading_count;
// clear dirs if need
if (clear) {
@@ -2181,32 +2192,18 @@
return std::string(e.to_string());
}));
- _cmds.emplace_back(::dsn::command_manager::instance().register_command(
- {"replica.deny-client"},
- "replica.deny-client <true|false>",
- "replica.deny-client - control if deny client read & write request",
- [this](const std::vector<std::string> &args) {
- return remote_command_set_bool_flag(_deny_client, "deny-client", args);
- }));
+ _cmds.emplace_back(::dsn::command_manager::instance().register_bool_command(
+ _deny_client, "replica.deny-client", "control if deny client read & write request"));
- _cmds.emplace_back(::dsn::command_manager::instance().register_command(
- {"replica.verbose-client-log"},
- "replica.verbose-client-log <true|false>",
- "replica.verbose-client-log - control if print verbose error log when reply read & "
- "write request",
- [this](const std::vector<std::string> &args) {
- return remote_command_set_bool_flag(
- _verbose_client_log, "verbose-client-log", args);
- }));
+ _cmds.emplace_back(::dsn::command_manager::instance().register_bool_command(
+ _verbose_client_log,
+ "replica.verbose-client-log",
+ "control if print verbose error log when reply read & write request"));
- _cmds.emplace_back(::dsn::command_manager::instance().register_command(
- {"replica.verbose-commit-log"},
- "replica.verbose-commit-log <true|false>",
- "replica.verbose-commit-log - control if print verbose log when commit mutation",
- [this](const std::vector<std::string> &args) {
- return remote_command_set_bool_flag(
- _verbose_commit_log, "verbose-commit-log", args);
- }));
+ _cmds.emplace_back(::dsn::command_manager::instance().register_bool_command(
+ _verbose_commit_log,
+ "replica.verbose-commit-log",
+ "control if print verbose log when commit mutation"));
_cmds.emplace_back(::dsn::command_manager::instance().register_command(
{"replica.trigger-checkpoint"},
@@ -2246,14 +2243,10 @@
}));
#ifdef DSN_ENABLE_GPERF
- _cmds.emplace_back(::dsn::command_manager::instance().register_command(
- {"replica.release-tcmalloc-memory"},
- "replica.release-tcmalloc-memory <true|false>",
- "replica.release-tcmalloc-memory - control if try to release tcmalloc memory",
- [this](const std::vector<std::string> &args) {
- return remote_command_set_bool_flag(
- _release_tcmalloc_memory, "release-tcmalloc-memory", args);
- }));
+ _cmds.emplace_back(::dsn::command_manager::instance().register_bool_command(
+ _release_tcmalloc_memory,
+ "replica.release-tcmalloc-memory",
+ "control if try to release tcmalloc memory"));
_cmds.emplace_back(::dsn::command_manager::instance().register_command(
{"replica.get-tcmalloc-status"},
@@ -2265,32 +2258,12 @@
return std::string(buf);
}));
- _cmds.emplace_back(::dsn::command_manager::instance().register_command(
- {"replica.mem-release-max-reserved-percentage"},
- "replica.mem-release-max-reserved-percentage [num | DEFAULT]",
+ _cmds.emplace_back(::dsn::command_manager::instance().register_int_command(
+ _mem_release_max_reserved_mem_percentage,
+ FLAGS_mem_release_max_reserved_mem_percentage,
+ "replica.mem-release-max-reserved-percentage",
"control tcmalloc max reserved but not-used memory percentage",
- [this](const std::vector<std::string> &args) {
- std::string result("OK");
- if (args.empty()) {
- // show current value
- result = "mem-release-max-reserved-percentage = " +
- std::to_string(_mem_release_max_reserved_mem_percentage);
- return result;
- }
- if (args[0] == "DEFAULT") {
- // set to default value
- _mem_release_max_reserved_mem_percentage =
- FLAGS_mem_release_max_reserved_mem_percentage;
- return result;
- }
- int32_t percentage = 0;
- if (!dsn::buf2int32(args[0], percentage) || percentage <= 0 || percentage > 100) {
- result = std::string("ERR: invalid arguments");
- } else {
- _mem_release_max_reserved_mem_percentage = percentage;
- }
- return result;
- }));
+ &check_mem_release_max_reserved_mem_percentage));
_cmds.emplace_back(::dsn::command_manager::instance().register_command(
{"replica.release-all-reserved-memory"},
@@ -2303,33 +2276,11 @@
#elif defined(DSN_USE_JEMALLOC)
register_jemalloc_ctrl_command();
#endif
- // TODO(yingchun): use http
- _cmds.emplace_back(::dsn::command_manager::instance().register_command(
- {"replica.max-concurrent-bulk-load-downloading-count"},
- "replica.max-concurrent-bulk-load-downloading-count [num | DEFAULT]",
- "control stub max_concurrent_bulk_load_downloading_count",
- [this](const std::vector<std::string> &args) {
- std::string result("OK");
- if (args.empty()) {
- result = "max_concurrent_bulk_load_downloading_count=" +
- std::to_string(_max_concurrent_bulk_load_downloading_count);
- return result;
- }
-
- if (args[0] == "DEFAULT") {
- _max_concurrent_bulk_load_downloading_count =
- _options.max_concurrent_bulk_load_downloading_count;
- return result;
- }
-
- int32_t count = 0;
- if (!dsn::buf2int32(args[0], count) || count <= 0) {
- result = std::string("ERR: invalid arguments");
- } else {
- _max_concurrent_bulk_load_downloading_count = count;
- }
- return result;
- }));
+ _cmds.emplace_back(::dsn::command_manager::instance().register_int_command(
+ FLAGS_max_concurrent_bulk_load_downloading_count,
+ FLAGS_max_concurrent_bulk_load_downloading_count,
+ "replica.max-concurrent-bulk-load-downloading-count",
+ kMaxConcurrentBulkLoadDownloadingCountDesc));
});
}
diff --git a/src/replica/replica_stub.h b/src/replica/replica_stub.h
index a02921b..6fdc8ec 100644
--- a/src/replica/replica_stub.h
+++ b/src/replica/replica_stub.h
@@ -472,7 +472,6 @@
bool _verbose_commit_log;
bool _release_tcmalloc_memory;
int32_t _mem_release_max_reserved_mem_percentage;
- int32_t _max_concurrent_bulk_load_downloading_count;
// we limit LT_APP max concurrent count, because nfs service implementation is
// too simple, it do not support priority.
diff --git a/src/utils/command_manager.cpp b/src/utils/command_manager.cpp
index ab2e22f..0c5dd9b 100644
--- a/src/utils/command_manager.cpp
+++ b/src/utils/command_manager.cpp
@@ -26,6 +26,7 @@
#include "utils/command_manager.h"
+// IWYU pragma: no_include <ext/alloc_traits.h>
#include <stdlib.h>
#include <chrono>
#include <limits>
@@ -33,8 +34,6 @@
#include <thread>
#include <utility>
-#include "utils/fmt_logging.h"
-
namespace dsn {
std::unique_ptr<command_deregister>
@@ -43,31 +42,32 @@
const std::string &help_long,
command_handler handler)
{
- utils::auto_write_lock l(_lock);
- bool is_valid_cmd = false;
- for (const std::string &cmd : commands) {
- if (!cmd.empty()) {
- is_valid_cmd = true;
- CHECK(_handlers.find(cmd) == _handlers.end(), "command '{}' already regisered", cmd);
- }
- }
- CHECK(is_valid_cmd, "should not register empty command");
-
- command_instance *c = new command_instance();
+ auto *c = new command_instance();
c->commands = commands;
c->help_short = help_one_line;
c->help_long = help_long;
- c->handler = handler;
+ c->handler = std::move(handler);
- for (const std::string &cmd : commands) {
- if (!cmd.empty()) {
- _handlers[cmd] = c;
- }
+ utils::auto_write_lock l(_lock);
+ for (const auto &cmd : commands) {
+ CHECK(!cmd.empty(), "should not register empty command");
+ CHECK(_handlers.emplace(cmd, c).second, "command '{}' already registered", cmd);
}
return std::make_unique<command_deregister>(reinterpret_cast<uintptr_t>(c));
}
+std::unique_ptr<command_deregister> command_manager::register_bool_command(
+ bool &value, const std::string &command, const std::string &help)
+{
+ return register_command({command},
+ fmt::format("{} <true|false>", command),
+ help,
+ [&value, command](const std::vector<std::string> &args) {
+ return set_bool(value, command, args);
+ });
+}
+
void command_manager::deregister_command(uintptr_t handle)
{
auto c = reinterpret_cast<command_instance *>(handle);
@@ -99,6 +99,33 @@
}
}
+std::string command_manager::set_bool(bool &value,
+ const std::string &name,
+ const std::vector<std::string> &args)
+{
+ // Query.
+ if (args.empty()) {
+ return value ? "true" : "false";
+ }
+
+ // Invalid arguments size.
+ if (args.size() > 1) {
+ return fmt::format("ERR: invalid arguments, only one boolean argument is acceptable");
+ }
+
+ // Invalid argument.
+ bool new_value;
+ if (!dsn::buf2bool(args[0], new_value, /* ignore_case */ true)) {
+ return fmt::format("ERR: invalid arguments, '{}' is not a boolean", args[0]);
+ }
+
+ // Set to a new value.
+ value = new_value;
+ LOG_INFO("set {} to {} by remote command", name, new_value);
+
+ return "OK";
+}
+
command_manager::command_manager()
{
_cmds.emplace_back(register_command({"help", "h", "H", "Help"},
diff --git a/src/utils/command_manager.h b/src/utils/command_manager.h
index a005c7f..26a6cb3 100644
--- a/src/utils/command_manager.h
+++ b/src/utils/command_manager.h
@@ -26,6 +26,7 @@
#pragma once
+#include <fmt/core.h>
// IWYU pragma: no_include <ext/alloc_traits.h>
#include <stdint.h>
#include <functional>
@@ -38,6 +39,8 @@
#include "utils/fmt_logging.h"
#include "utils/ports.h"
#include "utils/singleton.h"
+#include "utils/string_conv.h"
+#include "utils/strings.h"
#include "utils/synchronize.h"
namespace dsn {
@@ -47,7 +50,7 @@
class command_manager : public ::dsn::utils::singleton<command_manager>
{
public:
- typedef std::function<std::string(const std::vector<std::string> &)> command_handler;
+ using command_handler = std::function<std::string(const std::vector<std::string> &)>;
std::unique_ptr<command_deregister>
register_command(const std::vector<std::string> &commands,
@@ -55,6 +58,35 @@
const std::string &help_long,
command_handler handler) WARN_UNUSED_RESULT;
+ // Register command which query or update a boolean configuration.
+ // The 'value' will be queried or updated by the command named 'command' with the 'help'
+ // description.
+ std::unique_ptr<command_deregister> register_bool_command(
+ bool &value, const std::string &command, const std::string &help) WARN_UNUSED_RESULT;
+
+ // Register command which query or update an integer configuration.
+ // The 'value' will be queried or updated by the command named 'command' with the 'help'
+ // description.
+ // 'validator' is used to validate the new value.
+ // The value is reset to 'default_value' if passing "DEFAULT" argument.
+ template <typename T>
+ WARN_UNUSED_RESULT std::unique_ptr<command_deregister>
+ register_int_command(T &value,
+ T default_value,
+ const std::string &command,
+ const std::string &help,
+ std::function<bool(int64_t new_value)> validator =
+ [](int64_t new_value) -> bool { return new_value >= 0; })
+ {
+ return register_command(
+ {command},
+ fmt::format("{} [num | DEFAULT]", command),
+ help,
+ [&value, default_value, command, validator](const std::vector<std::string> &args) {
+ return set_int(value, default_value, command, args, validator);
+ });
+ }
+
bool run_command(const std::string &cmd,
const std::vector<std::string> &args,
/*out*/ std::string &output);
@@ -76,6 +108,46 @@
void deregister_command(uintptr_t handle);
+ static std::string
+ set_bool(bool &value, const std::string &name, const std::vector<std::string> &args);
+
+ template <typename T>
+ static std::string set_int(T &value,
+ T default_value,
+ const std::string &name,
+ const std::vector<std::string> &args,
+ const std::function<bool(int64_t value)> &validator)
+ {
+ // Query.
+ if (args.empty()) {
+ return std::to_string(value);
+ }
+
+ // Invalid arguments size.
+ if (args.size() > 1) {
+ return fmt::format("ERR: invalid arguments, only one integer argument is acceptable");
+ }
+
+ // Reset to the default value.
+ if (dsn::utils::iequals(args[0], "DEFAULT")) {
+ value = default_value;
+ return "OK";
+ }
+
+ // Invalid argument.
+ T new_value = 0;
+ if (!internal::buf2signed(args[0], new_value) ||
+ !validator(static_cast<int64_t>(new_value))) {
+ return {"ERR: invalid arguments"};
+ }
+
+ // Set to a new value.
+ value = new_value;
+ LOG_INFO("set {} to {} by remote command", name, new_value);
+
+ return "OK";
+ }
+
typedef ref_ptr<command_instance> command_instance_ptr;
utils::rw_lock_nr _lock;
std::map<std::string, command_instance_ptr> _handlers;
@@ -86,7 +158,7 @@
class command_deregister
{
public:
- command_deregister(uintptr_t id) : cmd_id_(id) {}
+ explicit command_deregister(uintptr_t id) : cmd_id_(id) {}
~command_deregister()
{
if (cmd_id_ != 0) {
@@ -100,26 +172,3 @@
};
} // namespace dsn
-
-// if args are empty, then return the old flag;
-// otherwise set the proper "flag" according to args
-inline std::string remote_command_set_bool_flag(bool &flag,
- const char *flag_name,
- const std::vector<std::string> &args)
-{
- std::string ret_msg("OK");
- if (args.empty()) {
- ret_msg = flag ? "true" : "false";
- } else {
- if (args[0] == "true") {
- flag = true;
- LOG_INFO("set {} to true by remote command", flag_name);
- } else if (args[0] == "false") {
- flag = false;
- LOG_INFO("set {} to false by remote command", flag_name);
- } else {
- ret_msg = "ERR: invalid arguments";
- }
- }
- return ret_msg;
-}