refactor(command_manager): Simplify command_manager code (#1875)

- Simplify the registration of boolean remote command, now it can be registered
  by `ommand_manager::register_bool_command(bool &value, const std::string &command, const std::string &help)`
- Change the type of `node_live_percentage_threshold_for_update` from
  uint64 to int32, there is no impact bacause it's a percentage which
  is range in [0, 100].
- Change the type of `replica_assign_delay_ms_for_dropouts` from uint64
  to int64, there is less impact bacause int64 is big enough to set a
  milliseconds time interval (maximum to 106751991167 days).
- Change the type of `max_copy_rate_megabytes_per_disk`,
  `max_send_rate_megabytes_per_disk` to int64, there is no impact
  bacause its range is enlarged.
- Use FLAGS_max_concurrent_bulk_load_downloading_count directly and remove
  it from replication_options structure and replica_stub class.
diff --git a/src/common/replication_common.cpp b/src/common/replication_common.cpp
index 3b0a7f0..b5d2571 100644
--- a/src/common/replication_common.cpp
+++ b/src/common/replication_common.cpp
@@ -48,10 +48,6 @@
 namespace replication {
 
 DSN_DEFINE_bool(replication, duplication_enabled, true, "is duplication enabled");
-DSN_DEFINE_int32(replication,
-                 max_concurrent_bulk_load_downloading_count,
-                 5,
-                 "concurrent bulk load downloading replica count");
 
 DSN_DEFINE_int32(replication,
                  mutation_2pc_min_replica_count,
@@ -144,8 +140,6 @@
 
     CHECK(!data_dirs.empty(), "no replica data dir found, maybe not set or excluded by black list");
 
-    max_concurrent_bulk_load_downloading_count = FLAGS_max_concurrent_bulk_load_downloading_count;
-
     CHECK(replica_helper::load_meta_servers(meta_servers), "invalid meta server config");
 }
 
diff --git a/src/common/replication_common.h b/src/common/replication_common.h
index 3771b2c..30412e7 100644
--- a/src/common/replication_common.h
+++ b/src/common/replication_common.h
@@ -64,8 +64,6 @@
     std::vector<std::string> data_dirs;
     std::vector<std::string> data_dir_tags;
 
-    int32_t max_concurrent_bulk_load_downloading_count;
-
 public:
     replication_options() = default;
     ~replication_options();
diff --git a/src/meta/app_balance_policy.cpp b/src/meta/app_balance_policy.cpp
index 972f8ba..00ebbc6 100644
--- a/src/meta/app_balance_policy.cpp
+++ b/src/meta/app_balance_policy.cpp
@@ -20,7 +20,6 @@
 #include <iterator>
 #include <map>
 #include <set>
-#include <string>
 
 #include "app_balance_policy.h"
 #include "common/gpid.h"
@@ -50,30 +49,18 @@
         _only_primary_balancer = false;
         _only_move_primary = false;
     }
-    _cmds.emplace_back(dsn::command_manager::instance().register_command(
-        {"meta.lb.balancer_in_turn"},
-        "meta.lb.balancer_in_turn <true|false>",
-        "control whether do app balancer in turn",
-        [this](const std::vector<std::string> &args) {
-            return remote_command_set_bool_flag(_balancer_in_turn, "lb.balancer_in_turn", args);
-        }));
+    _cmds.emplace_back(dsn::command_manager::instance().register_bool_command(
+        _balancer_in_turn, "meta.lb.balancer_in_turn", "control whether do app balancer in turn"));
 
-    _cmds.emplace_back(dsn::command_manager::instance().register_command(
-        {"meta.lb.only_primary_balancer"},
-        "meta.lb.only_primary_balancer <true|false>",
-        "control whether do only primary balancer",
-        [this](const std::vector<std::string> &args) {
-            return remote_command_set_bool_flag(
-                _only_primary_balancer, "lb.only_primary_balancer", args);
-        }));
+    _cmds.emplace_back(dsn::command_manager::instance().register_bool_command(
+        _only_primary_balancer,
+        "meta.lb.only_primary_balancer",
+        "control whether do only primary balancer"));
 
-    _cmds.emplace_back(dsn::command_manager::instance().register_command(
-        {"meta.lb.only_move_primary"},
-        "meta.lb.only_move_primary <true|false>",
-        "control whether only move primary in balancer",
-        [this](const std::vector<std::string> &args) {
-            return remote_command_set_bool_flag(_only_move_primary, "lb.only_move_primary", args);
-        }));
+    _cmds.emplace_back(dsn::command_manager::instance().register_bool_command(
+        _only_move_primary,
+        "meta.lb.only_move_primary",
+        "control whether only move primary in balancer"));
 }
 
 void app_balance_policy::balance(bool checker, const meta_view *global_view, migration_list *list)
diff --git a/src/meta/meta_service.cpp b/src/meta/meta_service.cpp
index 8fc64c1..952b33a 100644
--- a/src/meta/meta_service.cpp
+++ b/src/meta/meta_service.cpp
@@ -55,8 +55,8 @@
 #include "meta_service.h"
 #include "meta_split_service.h"
 #include "partition_split_types.h"
-#include "remote_cmd/remote_command.h"
 #include "ranger/ranger_resource_policy_manager.h"
+#include "remote_cmd/remote_command.h"
 #include "runtime/rpc/rpc_holder.h"
 #include "runtime/task/async_calls.h"
 #include "server_load_balancer.h"
@@ -67,7 +67,6 @@
 #include "utils/filesystem.h"
 #include "utils/flags.h"
 #include "utils/fmt_logging.h"
-#include "utils/string_conv.h"
 #include "utils/strings.h"
 
 METRIC_DEFINE_counter(server,
@@ -112,11 +111,13 @@
                  lb_interval_ms,
                  10000,
                  "every this period(ms) the meta server will do load balance");
-DSN_DEFINE_uint64(meta_server,
-                  node_live_percentage_threshold_for_update,
-                  65,
-                  "If live_node_count * 100 < total_node_count * "
-                  "node_live_percentage_threshold_for_update, then freeze the cluster.");
+DSN_DEFINE_int32(meta_server,
+                 node_live_percentage_threshold_for_update,
+                 65,
+                 "If live_node_count * 100 < total_node_count * "
+                 "node_live_percentage_threshold_for_update, then freeze the cluster.");
+DSN_DEFINE_validator(node_live_percentage_threshold_for_update,
+                     [](int32_t value) -> bool { return value >= 0 && value <= 100; });
 DSN_DEFINE_string(meta_server,
                   meta_state_service_type,
                   "meta_state_service_simple",
@@ -303,29 +304,12 @@
 void meta_service::register_ctrl_commands()
 {
     _ctrl_node_live_percentage_threshold_for_update =
-        dsn::command_manager::instance().register_command(
-            {"meta.live_percentage"},
-            "meta.live_percentage [num | DEFAULT]",
+        dsn::command_manager::instance().register_int_command(
+            _node_live_percentage_threshold_for_update,
+            FLAGS_node_live_percentage_threshold_for_update,
+            "meta.live_percentage",
             "node live percentage threshold for update",
-            [this](const std::vector<std::string> &args) {
-                std::string result("OK");
-                if (args.empty()) {
-                    result = std::to_string(_node_live_percentage_threshold_for_update);
-                } else {
-                    if (args[0] == "DEFAULT") {
-                        _node_live_percentage_threshold_for_update =
-                            FLAGS_node_live_percentage_threshold_for_update;
-                    } else {
-                        int32_t v = 0;
-                        if (!dsn::buf2int32(args[0], v) || v < 0) {
-                            result = std::string("ERR: invalid arguments");
-                        } else {
-                            _node_live_percentage_threshold_for_update = v;
-                        }
-                    }
-                }
-                return result;
-            });
+            [](int32_t new_value) -> bool { return new_value >= 0 && new_value <= 100; });
 }
 
 void meta_service::start_service()
diff --git a/src/meta/meta_service.h b/src/meta/meta_service.h
index e46ad90..4b531a9 100644
--- a/src/meta/meta_service.h
+++ b/src/meta/meta_service.h
@@ -338,7 +338,7 @@
 
     replication_options _opts;
     meta_options _meta_opts;
-    uint64_t _node_live_percentage_threshold_for_update;
+    int32_t _node_live_percentage_threshold_for_update;
     std::unique_ptr<command_deregister> _ctrl_node_live_percentage_threshold_for_update;
 
     std::shared_ptr<server_state> _state;
diff --git a/src/meta/partition_guardian.cpp b/src/meta/partition_guardian.cpp
index 0c16106..d99ce22 100644
--- a/src/meta/partition_guardian.cpp
+++ b/src/meta/partition_guardian.cpp
@@ -21,6 +21,7 @@
 #include <inttypes.h>
 #include <stdio.h>
 #include <algorithm>
+#include <cstdint>
 #include <ostream>
 #include <unordered_map>
 
@@ -34,7 +35,6 @@
 #include "utils/flags.h"
 #include "utils/fmt_logging.h"
 #include "utils/metrics.h"
-#include "utils/string_conv.h"
 #include "utils/strings.h"
 #include "utils/time_utils.h"
 
@@ -42,10 +42,10 @@
 namespace replication {
 
 DSN_DEFINE_int32(meta_server, max_replicas_in_group, 4, "max replicas(alive & dead) in a group");
-DSN_DEFINE_uint64(meta_server,
-                  replica_assign_delay_ms_for_dropouts,
-                  300000,
-                  "The delay milliseconds to dropout replicas assign");
+DSN_DEFINE_int64(meta_server,
+                 replica_assign_delay_ms_for_dropouts,
+                 300000,
+                 "The delay milliseconds to dropout replicas assign");
 
 partition_guardian::partition_guardian(meta_service *svc) : _svc(svc)
 {
@@ -681,12 +681,11 @@
 
 void partition_guardian::register_ctrl_commands()
 {
-    // TODO(yingchun): update _replica_assign_delay_ms_for_dropouts by http
-    _cmds.emplace_back(dsn::command_manager::instance().register_command(
-        {"meta.lb.assign_delay_ms"},
-        "lb.assign_delay_ms [num | DEFAULT]",
-        "control the replica_assign_delay_ms_for_dropouts config",
-        [this](const std::vector<std::string> &args) { return ctrl_assign_delay_ms(args); }));
+    _cmds.emplace_back(dsn::command_manager::instance().register_int_command(
+        _replica_assign_delay_ms_for_dropouts,
+        FLAGS_replica_assign_delay_ms_for_dropouts,
+        "meta.lb.assign_delay_ms",
+        "control the replica_assign_delay_ms_for_dropouts config"));
 
     _cmds.emplace_back(dsn::command_manager::instance().register_command(
         {"meta.lb.assign_secondary_black_list"},
@@ -697,26 +696,6 @@
         }));
 }
 
-std::string partition_guardian::ctrl_assign_delay_ms(const std::vector<std::string> &args)
-{
-    std::string result("OK");
-    if (args.empty()) {
-        result = std::to_string(_replica_assign_delay_ms_for_dropouts);
-    } else {
-        if (args[0] == "DEFAULT") {
-            _replica_assign_delay_ms_for_dropouts = FLAGS_replica_assign_delay_ms_for_dropouts;
-        } else {
-            int32_t v = 0;
-            if (!dsn::buf2int32(args[0], v) || v <= 0) {
-                result = std::string("ERR: invalid arguments");
-            } else {
-                _replica_assign_delay_ms_for_dropouts = v;
-            }
-        }
-    }
-    return result;
-}
-
 std::string
 partition_guardian::ctrl_assign_secondary_black_list(const std::vector<std::string> &args)
 {
diff --git a/src/meta/partition_guardian.h b/src/meta/partition_guardian.h
index 9c77da7..612a56a 100644
--- a/src/meta/partition_guardian.h
+++ b/src/meta/partition_guardian.h
@@ -74,7 +74,6 @@
     void finish_cure_proposal(meta_view &view,
                               const dsn::gpid &gpid,
                               const configuration_proposal_action &action);
-    std::string ctrl_assign_delay_ms(const std::vector<std::string> &args);
     std::string ctrl_assign_secondary_black_list(const std::vector<std::string> &args);
 
     void set_ddd_partition(ddd_partition_info &&partition)
@@ -103,7 +102,7 @@
     // ]
 
     std::vector<std::unique_ptr<command_deregister>> _cmds;
-    uint64_t _replica_assign_delay_ms_for_dropouts;
+    int64_t _replica_assign_delay_ms_for_dropouts;
 
     friend class meta_partition_guardian_test;
 };
diff --git a/src/meta/server_state.cpp b/src/meta/server_state.cpp
index 9ff3c38..aef6d03 100644
--- a/src/meta/server_state.cpp
+++ b/src/meta/server_state.cpp
@@ -38,6 +38,7 @@
 #include <sstream> // IWYU pragma: keep
 #include <string>
 #include <thread>
+// IWYU pragma: no_include <type_traits>
 #include <unordered_map>
 
 #include "app_env_validator.h"
@@ -163,38 +164,16 @@
             return std::string(err.to_string());
         }));
 
-    _cmds.emplace_back(dsn::command_manager::instance().register_command(
-        {"meta.lb.add_secondary_enable_flow_control"},
-        "meta.lb.add_secondary_enable_flow_control <true|false>",
-        "control whether enable add secondary flow control",
-        [this](const std::vector<std::string> &args) {
-            return remote_command_set_bool_flag(
-                _add_secondary_enable_flow_control, "lb.add_secondary_enable_flow_control", args);
-        }));
+    _cmds.emplace_back(dsn::command_manager::instance().register_bool_command(
+        _add_secondary_enable_flow_control,
+        "meta.lb.add_secondary_enable_flow_control",
+        "control whether enable add secondary flow control"));
 
-    _cmds.emplace_back(dsn::command_manager::instance().register_command(
-        {"meta.lb.add_secondary_max_count_for_one_node"},
-        "meta.lb.add_secondary_max_count_for_one_node [num | DEFAULT]",
-        "control the max count to add secondary for one node",
-        [this](const std::vector<std::string> &args) {
-            std::string result("OK");
-            if (args.empty()) {
-                result = std::to_string(_add_secondary_max_count_for_one_node);
-            } else {
-                if (args[0] == "DEFAULT") {
-                    _add_secondary_max_count_for_one_node =
-                        FLAGS_add_secondary_max_count_for_one_node;
-                } else {
-                    int32_t v = 0;
-                    if (!dsn::buf2int32(args[0], v) || v < 0) {
-                        result = std::string("ERR: invalid arguments");
-                    } else {
-                        _add_secondary_max_count_for_one_node = v;
-                    }
-                }
-            }
-            return result;
-        }));
+    _cmds.emplace_back(dsn::command_manager::instance().register_int_command(
+        _add_secondary_max_count_for_one_node,
+        FLAGS_add_secondary_max_count_for_one_node,
+        "meta.lb.add_secondary_max_count_for_one_node",
+        "control the max count to add secondary for one node"));
 }
 
 void server_state::initialize(meta_service *meta_svc, const std::string &apps_root)
diff --git a/src/meta/test/meta_test_base.cpp b/src/meta/test/meta_test_base.cpp
index 720efcf..6fc5371 100644
--- a/src/meta/test/meta_test_base.cpp
+++ b/src/meta/test/meta_test_base.cpp
@@ -116,7 +116,7 @@
     FLAGS_min_live_node_count_for_unfreeze = node_count;
 }
 
-void meta_test_base::set_node_live_percentage_threshold_for_update(uint64_t percentage_threshold)
+void meta_test_base::set_node_live_percentage_threshold_for_update(int32_t percentage_threshold)
 {
     _ms->_node_live_percentage_threshold_for_update = percentage_threshold;
 }
diff --git a/src/meta/test/meta_test_base.h b/src/meta/test/meta_test_base.h
index 9422e21..843ff91 100644
--- a/src/meta/test/meta_test_base.h
+++ b/src/meta/test/meta_test_base.h
@@ -53,7 +53,7 @@
 
     void set_min_live_node_count_for_unfreeze(uint64_t node_count);
 
-    void set_node_live_percentage_threshold_for_update(uint64_t percentage_threshold);
+    void set_node_live_percentage_threshold_for_update(int32_t percentage_threshold);
 
     std::vector<rpc_address> ensure_enough_alive_nodes(int min_node_count);
 
diff --git a/src/meta/test/update_configuration_test.cpp b/src/meta/test/update_configuration_test.cpp
index 2c23d23..d41d47e 100644
--- a/src/meta/test/update_configuration_test.cpp
+++ b/src/meta/test/update_configuration_test.cpp
@@ -70,9 +70,9 @@
 namespace dsn {
 namespace replication {
 
+DSN_DECLARE_int32(node_live_percentage_threshold_for_update);
+DSN_DECLARE_int64(replica_assign_delay_ms_for_dropouts);
 DSN_DECLARE_uint64(min_live_node_count_for_unfreeze);
-DSN_DECLARE_uint64(node_live_percentage_threshold_for_update);
-DSN_DECLARE_uint64(replica_assign_delay_ms_for_dropouts);
 
 class fake_sender_meta_service : public dsn::replication::meta_service
 {
diff --git a/src/nfs/nfs_client_impl.cpp b/src/nfs/nfs_client_impl.cpp
index b6d52e9..63cb1b8 100644
--- a/src/nfs/nfs_client_impl.cpp
+++ b/src/nfs/nfs_client_impl.cpp
@@ -30,6 +30,7 @@
 #include <mutex>
 
 #include "absl/strings/string_view.h"
+#include "fmt/core.h"
 #include "nfs/nfs_code_definition.h"
 #include "nfs/nfs_node.h"
 #include "utils/blob.h"
@@ -37,7 +38,6 @@
 #include "utils/filesystem.h"
 #include "utils/flags.h"
 #include "utils/fmt_logging.h"
-#include "utils/string_conv.h"
 #include "utils/token_buckets.h"
 
 METRIC_DEFINE_counter(server,
@@ -69,17 +69,18 @@
                   nfs_copy_block_bytes,
                   4 * 1024 * 1024,
                   "max block size (bytes) for each network copy");
-DSN_DEFINE_uint32(
-    nfs,
-    max_copy_rate_megabytes_per_disk,
-    0,
-    "max rate per disk of copying from remote node(MB/s), zero means disable rate limiter");
+static const char *kMaxCopyRateMegaBytesPerDiskDesc =
+    "The maximum bandwidth (MB/s) of writing data per local disk when copying from remote node, 0 "
+    "means no limit";
+DSN_DEFINE_int64(nfs, max_copy_rate_megabytes_per_disk, 0, kMaxCopyRateMegaBytesPerDiskDesc);
 DSN_TAG_VARIABLE(max_copy_rate_megabytes_per_disk, FT_MUTABLE);
-// max_copy_rate_bytes should be zero or greater than nfs_copy_block_bytes which is the max
-// batch copy size once
+
+bool check_max_copy_rate_megabytes_per_disk(int64_t value)
+{
+    return value == 0 || (value << 20) > FLAGS_nfs_copy_block_bytes;
+}
 DSN_DEFINE_group_validator(max_copy_rate_megabytes_per_disk, [](std::string &message) -> bool {
-    return FLAGS_max_copy_rate_megabytes_per_disk == 0 ||
-           (FLAGS_max_copy_rate_megabytes_per_disk << 20) > FLAGS_nfs_copy_block_bytes;
+    return check_max_copy_rate_megabytes_per_disk(FLAGS_max_copy_rate_megabytes_per_disk);
 });
 
 DSN_DEFINE_int32(nfs,
@@ -582,33 +583,15 @@
 {
     static std::once_flag flag;
     std::call_once(flag, [&]() {
-        _nfs_max_copy_rate_megabytes_cmd = dsn::command_manager::instance().register_command(
-            {"nfs.max_copy_rate_megabytes_per_disk"},
-            "nfs.max_copy_rate_megabytes_per_disk [num]",
-            "control the max rate(MB/s) for one disk to copy file from remote node",
-            [](const std::vector<std::string> &args) {
-                std::string result("OK");
-
-                if (args.empty()) {
-                    return std::to_string(FLAGS_max_copy_rate_megabytes_per_disk);
-                }
-
-                int32_t max_copy_rate_megabytes = 0;
-                if (!dsn::buf2int32(args[0], max_copy_rate_megabytes) ||
-                    max_copy_rate_megabytes <= 0) {
-                    return std::string("ERR: invalid arguments");
-                }
-
-                uint32_t max_copy_rate_bytes = max_copy_rate_megabytes << 20;
-                if (max_copy_rate_bytes <= FLAGS_nfs_copy_block_bytes) {
-                    result = std::string("ERR: max_copy_rate_bytes(max_copy_rate_megabytes << 20) "
-                                         "should be greater than nfs_copy_block_bytes:")
-                                 .append(std::to_string(FLAGS_nfs_copy_block_bytes));
-                    return result;
-                }
-                FLAGS_max_copy_rate_megabytes_per_disk = max_copy_rate_megabytes;
-                return result;
-            });
+        _nfs_max_copy_rate_megabytes_cmd = dsn::command_manager::instance().register_int_command(
+            FLAGS_max_copy_rate_megabytes_per_disk,
+            FLAGS_max_copy_rate_megabytes_per_disk,
+            "nfs.max_copy_rate_megabytes_per_disk",
+            fmt::format("{}, "
+                        "should be greater than 'nfs_copy_block_bytes' which is {}",
+                        kMaxCopyRateMegaBytesPerDiskDesc,
+                        FLAGS_nfs_copy_block_bytes),
+            &check_max_copy_rate_megabytes_per_disk);
     });
 }
 } // namespace service
diff --git a/src/nfs/nfs_server_impl.cpp b/src/nfs/nfs_server_impl.cpp
index 6cce235..80fbea5 100644
--- a/src/nfs/nfs_server_impl.cpp
+++ b/src/nfs/nfs_server_impl.cpp
@@ -42,7 +42,6 @@
 #include "utils/env.h"
 #include "utils/filesystem.h"
 #include "utils/flags.h"
-#include "utils/string_conv.h"
 #include "utils/utils.h"
 
 METRIC_DEFINE_counter(
@@ -62,11 +61,10 @@
 
 namespace service {
 
-DSN_DEFINE_uint32(
-    nfs,
-    max_send_rate_megabytes_per_disk,
-    0,
-    "max rate per disk of send to remote node(MB/s),zero means disable rate limiter");
+static const char *kMaxSendRateMegaBytesPerDiskDesc =
+    "The maximum bandwidth (MB/s) of reading data per local disk "
+    "when transferring data to remote node, 0 means no limit";
+DSN_DEFINE_int64(nfs, max_send_rate_megabytes_per_disk, 0, kMaxSendRateMegaBytesPerDiskDesc);
 DSN_TAG_VARIABLE(max_send_rate_megabytes_per_disk, FT_MUTABLE);
 
 DSN_DECLARE_int32(file_close_timer_interval_ms_on_server);
@@ -261,26 +259,11 @@
 {
     static std::once_flag flag;
     std::call_once(flag, [&]() {
-        _nfs_max_send_rate_megabytes_cmd = dsn::command_manager::instance().register_command(
-            {"nfs.max_send_rate_megabytes_per_disk"},
-            "nfs.max_send_rate_megabytes_per_disk [num]",
-            "control the max rate(MB/s) for one disk to send file to remote node",
-            [](const std::vector<std::string> &args) {
-                std::string result("OK");
-
-                if (args.empty()) {
-                    return std::to_string(FLAGS_max_send_rate_megabytes_per_disk);
-                }
-
-                int32_t max_send_rate_megabytes = 0;
-                if (!dsn::buf2int32(args[0], max_send_rate_megabytes) ||
-                    max_send_rate_megabytes <= 0) {
-                    return std::string("ERR: invalid arguments");
-                }
-
-                FLAGS_max_send_rate_megabytes_per_disk = max_send_rate_megabytes;
-                return result;
-            });
+        _nfs_max_send_rate_megabytes_cmd = dsn::command_manager::instance().register_int_command(
+            FLAGS_max_send_rate_megabytes_per_disk,
+            FLAGS_max_send_rate_megabytes_per_disk,
+            "nfs.max_send_rate_megabytes_per_disk",
+            kMaxSendRateMegaBytesPerDiskDesc);
     });
 }
 
diff --git a/src/replica/backup/cold_backup_context.cpp b/src/replica/backup/cold_backup_context.cpp
index fb7876a..a96dd94 100644
--- a/src/replica/backup/cold_backup_context.cpp
+++ b/src/replica/backup/cold_backup_context.cpp
@@ -20,6 +20,7 @@
 #include <chrono>
 #include <cstdint>
 #include <memory>
+// IWYU pragma: no_include <type_traits>
 
 #include "common/backup_common.h"
 #include "common/replication.codes.h"
diff --git a/src/replica/bulk_load/replica_bulk_loader.cpp b/src/replica/bulk_load/replica_bulk_loader.cpp
index ba32e41..1306eae 100644
--- a/src/replica/bulk_load/replica_bulk_loader.cpp
+++ b/src/replica/bulk_load/replica_bulk_loader.cpp
@@ -44,6 +44,7 @@
 #include "utils/env.h"
 #include "utils/fail_point.h"
 #include "utils/filesystem.h"
+#include "utils/flags.h"
 #include "utils/fmt_logging.h"
 #include "utils/load_dump_object.h"
 #include "utils/thread_access_checker.h"
@@ -83,6 +84,8 @@
                       dsn::metric_unit::kBytes,
                       "The size of files that have been downloaded successfully for bulk loads");
 
+DSN_DECLARE_int32(max_concurrent_bulk_load_downloading_count);
+
 namespace dsn {
 namespace dist {
 namespace block_service {
@@ -425,7 +428,7 @@
                                                const std::string &provider_name)
 {
     if (_stub->_bulk_load_downloading_count.load() >=
-        _stub->_max_concurrent_bulk_load_downloading_count) {
+        FLAGS_max_concurrent_bulk_load_downloading_count) {
         LOG_WARNING_PREFIX("node[{}] already has {} replica downloading, wait for next round",
                            _stub->_primary_address_str,
                            _stub->_bulk_load_downloading_count.load());
diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp
index 2f5a8e6..6a2ca4d 100644
--- a/src/replica/replica_stub.cpp
+++ b/src/replica/replica_stub.cpp
@@ -78,7 +78,6 @@
 #include "utils/ports.h"
 #include "utils/process_utils.h"
 #include "utils/rand.h"
-#include "utils/string_conv.h"
 #include "utils/strings.h"
 #include "utils/synchronize.h"
 #ifdef DSN_ENABLE_GPERF
@@ -90,6 +89,15 @@
 #include "remote_cmd/remote_command.h"
 #include "utils/fail_point.h"
 
+static const char *kMaxConcurrentBulkLoadDownloadingCountDesc =
+    "The maximum concurrent bulk load downloading replica count";
+DSN_DEFINE_int32(replication,
+                 max_concurrent_bulk_load_downloading_count,
+                 5,
+                 kMaxConcurrentBulkLoadDownloadingCountDesc);
+DSN_DEFINE_validator(max_concurrent_bulk_load_downloading_count,
+                     [](int32_t value) -> bool { return value >= 0; });
+
 METRIC_DEFINE_gauge_int64(server,
                           total_replicas,
                           dsn::metric_unit::kReplicas,
@@ -294,6 +302,12 @@
     10,
     "if tcmalloc reserved but not-used memory exceed this percentage of application allocated "
     "memory, replica server will release the exceeding memory back to operating system");
+bool check_mem_release_max_reserved_mem_percentage(int32_t value)
+{
+    return value > 0 && value <= 100;
+}
+DSN_DEFINE_validator(mem_release_max_reserved_mem_percentage,
+                     &check_mem_release_max_reserved_mem_percentage);
 
 DSN_DEFINE_string(
     pegasus.server,
@@ -339,7 +353,6 @@
       _verbose_commit_log(false),
       _release_tcmalloc_memory(false),
       _mem_release_max_reserved_mem_percentage(10),
-      _max_concurrent_bulk_load_downloading_count(5),
       _learn_app_concurrent_count(0),
       _bulk_load_downloading_count(0),
       _manual_emergency_checkpointing_count(0),
@@ -406,8 +419,6 @@
     _verbose_commit_log = FLAGS_verbose_commit_log_on_start;
     _release_tcmalloc_memory = FLAGS_mem_release_enabled;
     _mem_release_max_reserved_mem_percentage = FLAGS_mem_release_max_reserved_mem_percentage;
-    _max_concurrent_bulk_load_downloading_count =
-        _options.max_concurrent_bulk_load_downloading_count;
 
     // clear dirs if need
     if (clear) {
@@ -2181,32 +2192,18 @@
                 return std::string(e.to_string());
             }));
 
-        _cmds.emplace_back(::dsn::command_manager::instance().register_command(
-            {"replica.deny-client"},
-            "replica.deny-client <true|false>",
-            "replica.deny-client - control if deny client read & write request",
-            [this](const std::vector<std::string> &args) {
-                return remote_command_set_bool_flag(_deny_client, "deny-client", args);
-            }));
+        _cmds.emplace_back(::dsn::command_manager::instance().register_bool_command(
+            _deny_client, "replica.deny-client", "control if deny client read & write request"));
 
-        _cmds.emplace_back(::dsn::command_manager::instance().register_command(
-            {"replica.verbose-client-log"},
-            "replica.verbose-client-log <true|false>",
-            "replica.verbose-client-log - control if print verbose error log when reply read & "
-            "write request",
-            [this](const std::vector<std::string> &args) {
-                return remote_command_set_bool_flag(
-                    _verbose_client_log, "verbose-client-log", args);
-            }));
+        _cmds.emplace_back(::dsn::command_manager::instance().register_bool_command(
+            _verbose_client_log,
+            "replica.verbose-client-log",
+            "control if print verbose error log when reply read & write request"));
 
-        _cmds.emplace_back(::dsn::command_manager::instance().register_command(
-            {"replica.verbose-commit-log"},
-            "replica.verbose-commit-log <true|false>",
-            "replica.verbose-commit-log - control if print verbose log when commit mutation",
-            [this](const std::vector<std::string> &args) {
-                return remote_command_set_bool_flag(
-                    _verbose_commit_log, "verbose-commit-log", args);
-            }));
+        _cmds.emplace_back(::dsn::command_manager::instance().register_bool_command(
+            _verbose_commit_log,
+            "replica.verbose-commit-log",
+            "control if print verbose log when commit mutation"));
 
         _cmds.emplace_back(::dsn::command_manager::instance().register_command(
             {"replica.trigger-checkpoint"},
@@ -2246,14 +2243,10 @@
             }));
 
 #ifdef DSN_ENABLE_GPERF
-        _cmds.emplace_back(::dsn::command_manager::instance().register_command(
-            {"replica.release-tcmalloc-memory"},
-            "replica.release-tcmalloc-memory <true|false>",
-            "replica.release-tcmalloc-memory - control if try to release tcmalloc memory",
-            [this](const std::vector<std::string> &args) {
-                return remote_command_set_bool_flag(
-                    _release_tcmalloc_memory, "release-tcmalloc-memory", args);
-            }));
+        _cmds.emplace_back(::dsn::command_manager::instance().register_bool_command(
+            _release_tcmalloc_memory,
+            "replica.release-tcmalloc-memory",
+            "control if try to release tcmalloc memory"));
 
         _cmds.emplace_back(::dsn::command_manager::instance().register_command(
             {"replica.get-tcmalloc-status"},
@@ -2265,32 +2258,12 @@
                 return std::string(buf);
             }));
 
-        _cmds.emplace_back(::dsn::command_manager::instance().register_command(
-            {"replica.mem-release-max-reserved-percentage"},
-            "replica.mem-release-max-reserved-percentage [num | DEFAULT]",
+        _cmds.emplace_back(::dsn::command_manager::instance().register_int_command(
+            _mem_release_max_reserved_mem_percentage,
+            FLAGS_mem_release_max_reserved_mem_percentage,
+            "replica.mem-release-max-reserved-percentage",
             "control tcmalloc max reserved but not-used memory percentage",
-            [this](const std::vector<std::string> &args) {
-                std::string result("OK");
-                if (args.empty()) {
-                    // show current value
-                    result = "mem-release-max-reserved-percentage = " +
-                             std::to_string(_mem_release_max_reserved_mem_percentage);
-                    return result;
-                }
-                if (args[0] == "DEFAULT") {
-                    // set to default value
-                    _mem_release_max_reserved_mem_percentage =
-                        FLAGS_mem_release_max_reserved_mem_percentage;
-                    return result;
-                }
-                int32_t percentage = 0;
-                if (!dsn::buf2int32(args[0], percentage) || percentage <= 0 || percentage > 100) {
-                    result = std::string("ERR: invalid arguments");
-                } else {
-                    _mem_release_max_reserved_mem_percentage = percentage;
-                }
-                return result;
-            }));
+            &check_mem_release_max_reserved_mem_percentage));
 
         _cmds.emplace_back(::dsn::command_manager::instance().register_command(
             {"replica.release-all-reserved-memory"},
@@ -2303,33 +2276,11 @@
 #elif defined(DSN_USE_JEMALLOC)
         register_jemalloc_ctrl_command();
 #endif
-        // TODO(yingchun): use http
-        _cmds.emplace_back(::dsn::command_manager::instance().register_command(
-            {"replica.max-concurrent-bulk-load-downloading-count"},
-            "replica.max-concurrent-bulk-load-downloading-count [num | DEFAULT]",
-            "control stub max_concurrent_bulk_load_downloading_count",
-            [this](const std::vector<std::string> &args) {
-                std::string result("OK");
-                if (args.empty()) {
-                    result = "max_concurrent_bulk_load_downloading_count=" +
-                             std::to_string(_max_concurrent_bulk_load_downloading_count);
-                    return result;
-                }
-
-                if (args[0] == "DEFAULT") {
-                    _max_concurrent_bulk_load_downloading_count =
-                        _options.max_concurrent_bulk_load_downloading_count;
-                    return result;
-                }
-
-                int32_t count = 0;
-                if (!dsn::buf2int32(args[0], count) || count <= 0) {
-                    result = std::string("ERR: invalid arguments");
-                } else {
-                    _max_concurrent_bulk_load_downloading_count = count;
-                }
-                return result;
-            }));
+        _cmds.emplace_back(::dsn::command_manager::instance().register_int_command(
+            FLAGS_max_concurrent_bulk_load_downloading_count,
+            FLAGS_max_concurrent_bulk_load_downloading_count,
+            "replica.max-concurrent-bulk-load-downloading-count",
+            kMaxConcurrentBulkLoadDownloadingCountDesc));
     });
 }
 
diff --git a/src/replica/replica_stub.h b/src/replica/replica_stub.h
index a02921b..6fdc8ec 100644
--- a/src/replica/replica_stub.h
+++ b/src/replica/replica_stub.h
@@ -472,7 +472,6 @@
     bool _verbose_commit_log;
     bool _release_tcmalloc_memory;
     int32_t _mem_release_max_reserved_mem_percentage;
-    int32_t _max_concurrent_bulk_load_downloading_count;
 
     // we limit LT_APP max concurrent count, because nfs service implementation is
     // too simple, it do not support priority.
diff --git a/src/utils/command_manager.cpp b/src/utils/command_manager.cpp
index ab2e22f..0c5dd9b 100644
--- a/src/utils/command_manager.cpp
+++ b/src/utils/command_manager.cpp
@@ -26,6 +26,7 @@
 
 #include "utils/command_manager.h"
 
+// IWYU pragma: no_include <ext/alloc_traits.h>
 #include <stdlib.h>
 #include <chrono>
 #include <limits>
@@ -33,8 +34,6 @@
 #include <thread>
 #include <utility>
 
-#include "utils/fmt_logging.h"
-
 namespace dsn {
 
 std::unique_ptr<command_deregister>
@@ -43,31 +42,32 @@
                                   const std::string &help_long,
                                   command_handler handler)
 {
-    utils::auto_write_lock l(_lock);
-    bool is_valid_cmd = false;
-    for (const std::string &cmd : commands) {
-        if (!cmd.empty()) {
-            is_valid_cmd = true;
-            CHECK(_handlers.find(cmd) == _handlers.end(), "command '{}' already regisered", cmd);
-        }
-    }
-    CHECK(is_valid_cmd, "should not register empty command");
-
-    command_instance *c = new command_instance();
+    auto *c = new command_instance();
     c->commands = commands;
     c->help_short = help_one_line;
     c->help_long = help_long;
-    c->handler = handler;
+    c->handler = std::move(handler);
 
-    for (const std::string &cmd : commands) {
-        if (!cmd.empty()) {
-            _handlers[cmd] = c;
-        }
+    utils::auto_write_lock l(_lock);
+    for (const auto &cmd : commands) {
+        CHECK(!cmd.empty(), "should not register empty command");
+        CHECK(_handlers.emplace(cmd, c).second, "command '{}' already registered", cmd);
     }
 
     return std::make_unique<command_deregister>(reinterpret_cast<uintptr_t>(c));
 }
 
+std::unique_ptr<command_deregister> command_manager::register_bool_command(
+    bool &value, const std::string &command, const std::string &help)
+{
+    return register_command({command},
+                            fmt::format("{} <true|false>", command),
+                            help,
+                            [&value, command](const std::vector<std::string> &args) {
+                                return set_bool(value, command, args);
+                            });
+}
+
 void command_manager::deregister_command(uintptr_t handle)
 {
     auto c = reinterpret_cast<command_instance *>(handle);
@@ -99,6 +99,33 @@
     }
 }
 
+std::string command_manager::set_bool(bool &value,
+                                      const std::string &name,
+                                      const std::vector<std::string> &args)
+{
+    // Query.
+    if (args.empty()) {
+        return value ? "true" : "false";
+    }
+
+    // Invalid arguments size.
+    if (args.size() > 1) {
+        return fmt::format("ERR: invalid arguments, only one boolean argument is acceptable");
+    }
+
+    // Invalid argument.
+    bool new_value;
+    if (!dsn::buf2bool(args[0], new_value, /* ignore_case */ true)) {
+        return fmt::format("ERR: invalid arguments, '{}' is not a boolean", args[0]);
+    }
+
+    // Set to a new value.
+    value = new_value;
+    LOG_INFO("set {} to {} by remote command", name, new_value);
+
+    return "OK";
+}
+
 command_manager::command_manager()
 {
     _cmds.emplace_back(register_command({"help", "h", "H", "Help"},
diff --git a/src/utils/command_manager.h b/src/utils/command_manager.h
index a005c7f..26a6cb3 100644
--- a/src/utils/command_manager.h
+++ b/src/utils/command_manager.h
@@ -26,6 +26,7 @@
 
 #pragma once
 
+#include <fmt/core.h>
 // IWYU pragma: no_include <ext/alloc_traits.h>
 #include <stdint.h>
 #include <functional>
@@ -38,6 +39,8 @@
 #include "utils/fmt_logging.h"
 #include "utils/ports.h"
 #include "utils/singleton.h"
+#include "utils/string_conv.h"
+#include "utils/strings.h"
 #include "utils/synchronize.h"
 
 namespace dsn {
@@ -47,7 +50,7 @@
 class command_manager : public ::dsn::utils::singleton<command_manager>
 {
 public:
-    typedef std::function<std::string(const std::vector<std::string> &)> command_handler;
+    using command_handler = std::function<std::string(const std::vector<std::string> &)>;
 
     std::unique_ptr<command_deregister>
     register_command(const std::vector<std::string> &commands,
@@ -55,6 +58,35 @@
                      const std::string &help_long,
                      command_handler handler) WARN_UNUSED_RESULT;
 
+    // Register command which query or update a boolean configuration.
+    // The 'value' will be queried or updated by the command named 'command' with the 'help'
+    // description.
+    std::unique_ptr<command_deregister> register_bool_command(
+        bool &value, const std::string &command, const std::string &help) WARN_UNUSED_RESULT;
+
+    // Register command which query or update an integer configuration.
+    // The 'value' will be queried or updated by the command named 'command' with the 'help'
+    // description.
+    // 'validator' is used to validate the new value.
+    // The value is reset to 'default_value' if passing "DEFAULT" argument.
+    template <typename T>
+    WARN_UNUSED_RESULT std::unique_ptr<command_deregister>
+    register_int_command(T &value,
+                         T default_value,
+                         const std::string &command,
+                         const std::string &help,
+                         std::function<bool(int64_t new_value)> validator =
+                             [](int64_t new_value) -> bool { return new_value >= 0; })
+    {
+        return register_command(
+            {command},
+            fmt::format("{} [num | DEFAULT]", command),
+            help,
+            [&value, default_value, command, validator](const std::vector<std::string> &args) {
+                return set_int(value, default_value, command, args, validator);
+            });
+    }
+
     bool run_command(const std::string &cmd,
                      const std::vector<std::string> &args,
                      /*out*/ std::string &output);
@@ -76,6 +108,46 @@
 
     void deregister_command(uintptr_t handle);
 
+    static std::string
+    set_bool(bool &value, const std::string &name, const std::vector<std::string> &args);
+
+    template <typename T>
+    static std::string set_int(T &value,
+                               T default_value,
+                               const std::string &name,
+                               const std::vector<std::string> &args,
+                               const std::function<bool(int64_t value)> &validator)
+    {
+        // Query.
+        if (args.empty()) {
+            return std::to_string(value);
+        }
+
+        // Invalid arguments size.
+        if (args.size() > 1) {
+            return fmt::format("ERR: invalid arguments, only one integer argument is acceptable");
+        }
+
+        // Reset to the default value.
+        if (dsn::utils::iequals(args[0], "DEFAULT")) {
+            value = default_value;
+            return "OK";
+        }
+
+        // Invalid argument.
+        T new_value = 0;
+        if (!internal::buf2signed(args[0], new_value) ||
+            !validator(static_cast<int64_t>(new_value))) {
+            return {"ERR: invalid arguments"};
+        }
+
+        // Set to a new value.
+        value = new_value;
+        LOG_INFO("set {} to {} by remote command", name, new_value);
+
+        return "OK";
+    }
+
     typedef ref_ptr<command_instance> command_instance_ptr;
     utils::rw_lock_nr _lock;
     std::map<std::string, command_instance_ptr> _handlers;
@@ -86,7 +158,7 @@
 class command_deregister
 {
 public:
-    command_deregister(uintptr_t id) : cmd_id_(id) {}
+    explicit command_deregister(uintptr_t id) : cmd_id_(id) {}
     ~command_deregister()
     {
         if (cmd_id_ != 0) {
@@ -100,26 +172,3 @@
 };
 
 } // namespace dsn
-
-// if args are empty, then return the old flag;
-// otherwise set the proper "flag" according to args
-inline std::string remote_command_set_bool_flag(bool &flag,
-                                                const char *flag_name,
-                                                const std::vector<std::string> &args)
-{
-    std::string ret_msg("OK");
-    if (args.empty()) {
-        ret_msg = flag ? "true" : "false";
-    } else {
-        if (args[0] == "true") {
-            flag = true;
-            LOG_INFO("set {} to true by remote command", flag_name);
-        } else if (args[0] == "false") {
-            flag = false;
-            LOG_INFO("set {} to false by remote command", flag_name);
-        } else {
-            ret_msg = "ERR: invalid arguments";
-        }
-    }
-    return ret_msg;
-}