| /* |
| * The MIT License (MIT) |
| * |
| * Copyright (c) 2015 Microsoft Corporation |
| * |
| * -=- Robust Distributed System Nucleus (rDSN) -=- |
| * |
| * Permission is hereby granted, free of charge, to any person obtaining a copy |
| * of this software and associated documentation files (the "Software"), to deal |
| * in the Software without restriction, including without limitation the rights |
| * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell |
| * copies of the Software, and to permit persons to whom the Software is |
| * furnished to do so, subject to the following conditions: |
| * |
| * The above copyright notice and this permission notice shall be included in |
| * all copies or substantial portions of the Software. |
| * |
| * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
| * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
| * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
| * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
| * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
| * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN |
| * THE SOFTWARE. |
| */ |
| |
| #include <absl/strings/string_view.h> |
| // IWYU pragma: no_include <boost/detail/basic_pointerbuf.hpp> |
| // IWYU pragma: no_include <ext/alloc_traits.h> |
| #include <boost/lexical_cast.hpp> |
| #include <algorithm> // for std::remove_if |
| #include <chrono> |
| #include <functional> |
| #include <ostream> |
| #include <unordered_map> |
| #include <utility> |
| |
| #include "backup_types.h" |
| #include "bulk_load_types.h" |
| #include "common/common.h" |
| #include "common/gpid.h" |
| #include "common/replication.codes.h" |
| #include "dsn.layer2_types.h" |
| #include "duplication_types.h" |
| #include "meta/duplication/meta_duplication_service.h" |
| #include "meta/meta_backup_service.h" |
| #include "meta/meta_data.h" |
| #include "meta/meta_options.h" |
| #include "meta/meta_rpc_types.h" |
| #include "meta/meta_server_failure_detector.h" |
| #include "meta/meta_state_service.h" |
| #include "meta/meta_state_service_utils.h" |
| #include "meta/partition_guardian.h" |
| #include "meta_bulk_load_service.h" |
| #include "meta_service.h" |
| #include "meta_split_service.h" |
| #include "partition_split_types.h" |
| #include "ranger/ranger_resource_policy_manager.h" |
| #include "remote_cmd/remote_command.h" |
| #include "runtime/rpc/rpc_address.h" |
| #include "runtime/rpc/rpc_holder.h" |
| #include "runtime/task/async_calls.h" |
| #include "server_load_balancer.h" |
| #include "server_state.h" |
| #include "utils/autoref_ptr.h" |
| #include "utils/command_manager.h" |
| #include "utils/factory_store.h" |
| #include "utils/filesystem.h" |
| #include "utils/flags.h" |
| #include "utils/fmt_logging.h" |
| #include "utils/strings.h" |
| |
| DSN_DECLARE_string(hosts_list); |
| DSN_DEFINE_bool(meta_server, |
| recover_from_replica_server, |
| false, |
| "Whether to recover tables from replica servers when there is no " |
| "data of the tables in remote storage"); |
| DSN_DEFINE_bool(meta_server, cold_backup_disabled, true, "whether to disable cold backup"); |
| DSN_DEFINE_bool(meta_server, |
| enable_white_list, |
| false, |
| "whether to enable white list of replica servers"); |
| DSN_DEFINE_uint64(meta_server, |
| min_live_node_count_for_unfreeze, |
| 3, |
| "If the number of ALIVE nodes is less than this threshold, MetaServer will " |
| "also enter the 'freezed' protection state"); |
| DSN_TAG_VARIABLE(min_live_node_count_for_unfreeze, FT_MUTABLE); |
| DSN_DEFINE_validator(min_live_node_count_for_unfreeze, |
| [](uint64_t min_live_node_count) -> bool { return min_live_node_count > 0; }); |
| |
| DSN_DEFINE_int32(replication, |
| lb_interval_ms, |
| 10000, |
| "The interval milliseconds of meta server to execute load balance"); |
| DSN_DEFINE_int32(meta_server, |
| node_live_percentage_threshold_for_update, |
| 65, |
| "If the proportion of ALIVE nodes is less than this threshold, MetaServer will " |
| "enter the 'freezed' protection state"); |
| DSN_DEFINE_validator(node_live_percentage_threshold_for_update, |
| [](int32_t value) -> bool { return value >= 0 && value <= 100; }); |
| DSN_DEFINE_string(meta_server, |
| meta_state_service_type, |
| #ifdef MOCK_TEST |
| "meta_state_service_simple", |
| #else |
| "meta_state_service_zookeeper", |
| #endif |
| "The implementation class of metadata storage service"); |
| DSN_DEFINE_string(meta_server, |
| cluster_root, |
| "/", |
| "The root of the cluster meta state service to be stored on remote storage. " |
| "Different meta servers in the same cluster need to be configured with the " |
| "same value, while different clusters using different values if they share " |
| "the same remote storage"); |
| DSN_DEFINE_string(meta_server, |
| server_load_balancer_type, |
| "greedy_load_balancer", |
| "The implementation class of load balancer"); |
| DSN_DEFINE_string(meta_server, |
| partition_guardian_type, |
| "partition_guardian", |
| "partition guardian provider"); |
| |
| DSN_DECLARE_bool(duplication_enabled); |
| DSN_DECLARE_int32(fd_beacon_interval_seconds); |
| DSN_DECLARE_int32(fd_check_interval_seconds); |
| DSN_DECLARE_int32(fd_grace_seconds); |
| DSN_DECLARE_int32(fd_lease_seconds); |
| DSN_DECLARE_string(cold_backup_root); |
| |
| METRIC_DEFINE_counter(server, |
| replica_server_disconnections, |
| dsn::metric_unit::kDisconnections, |
| "The number of disconnections with replica servers"); |
| |
| METRIC_DEFINE_gauge_int64(server, |
| unalive_replica_servers, |
| dsn::metric_unit::kServers, |
| "The number of unalive replica servers"); |
| |
| METRIC_DEFINE_gauge_int64(server, |
| alive_replica_servers, |
| dsn::metric_unit::kServers, |
| "The number of alive replica servers"); |
| |
| namespace dsn { |
| namespace replication { |
| |
| #define CHECK_APP_ID_STATUS_AND_AUTHZ(app_id) \ |
| do { \ |
| const auto &_app_id = (app_id); \ |
| const auto &_app = _state->get_app(_app_id); \ |
| if (!_app) { \ |
| rpc.response().err = ERR_INVALID_PARAMETERS; \ |
| LOG_WARNING("reject request on app_id = {}", _app_id); \ |
| return; \ |
| } \ |
| const std::string &app_name = _app->app_name; \ |
| if (!check_status_and_authz(rpc, nullptr, app_name)) { \ |
| return; \ |
| } \ |
| } while (0) |
| |
| meta_service::meta_service() |
| : serverlet("meta_service"), |
| _failure_detector(nullptr), |
| _started(false), |
| _recovering(false), |
| METRIC_VAR_INIT_server(replica_server_disconnections), |
| METRIC_VAR_INIT_server(unalive_replica_servers), |
| METRIC_VAR_INIT_server(alive_replica_servers) |
| { |
| _opts.initialize(); |
| _meta_opts.initialize(); |
| _node_live_percentage_threshold_for_update = FLAGS_node_live_percentage_threshold_for_update; |
| _state.reset(new server_state()); |
| _function_level.store(_meta_opts.meta_function_level_on_start); |
| if (FLAGS_recover_from_replica_server) { |
| LOG_INFO("enter recovery mode for [meta_server].recover_from_replica_server = true"); |
| _recovering = true; |
| if (_meta_opts.meta_function_level_on_start > meta_function_level::fl_steady) { |
| LOG_INFO("meta server function level changed to fl_steady under recovery mode"); |
| _function_level.store(meta_function_level::fl_steady); |
| } |
| } |
| |
| _meta_op_status.store(meta_op_status::FREE); |
| } |
| |
| meta_service::~meta_service() { stop(); } |
| |
| void meta_service::stop() |
| { |
| zauto_write_lock l(_meta_lock); |
| if (!_started.load()) { |
| return; |
| } |
| _tracker.cancel_outstanding_tasks(); |
| _ctrl_node_live_percentage_threshold_for_update.reset(); |
| _failure_detector.reset(); |
| _balancer.reset(); |
| _partition_guardian.reset(); |
| _started = false; |
| } |
| |
| bool meta_service::check_freeze() const |
| { |
| zauto_lock l(_failure_detector->_lock); |
| if (_alive_set.size() < FLAGS_min_live_node_count_for_unfreeze) |
| return true; |
| int total = _alive_set.size() + _dead_set.size(); |
| return _alive_set.size() * 100 < _node_live_percentage_threshold_for_update * total; |
| } |
| |
| error_code meta_service::remote_storage_initialize() |
| { |
| // create storage |
| dsn::dist::meta_state_service *storage = |
| dsn::utils::factory_store<::dsn::dist::meta_state_service>::create( |
| FLAGS_meta_state_service_type, PROVIDER_TYPE_MAIN); |
| error_code err = storage->initialize(_meta_opts.meta_state_service_args); |
| if (err != ERR_OK) { |
| LOG_ERROR("init meta_state_service failed, err = {}", err); |
| return err; |
| } |
| _storage.reset(storage); |
| _meta_storage.reset(new mss::meta_storage(_storage.get(), &_tracker)); |
| |
| std::vector<std::string> slices; |
| utils::split_args(FLAGS_cluster_root, slices, '/'); |
| std::string current = ""; |
| for (unsigned int i = 0; i != slices.size(); ++i) { |
| current = utils::filesystem::concat_path_unix_style(current, slices[i]); |
| task_ptr tsk = |
| _storage->create_node(current, LPC_META_CALLBACK, [&err](error_code ec) { err = ec; }); |
| tsk->wait(); |
| if (err != ERR_OK && err != ERR_NODE_ALREADY_EXIST) { |
| LOG_ERROR("create node failed, node_path = {}, err = {}", current, err); |
| return err; |
| } |
| } |
| _cluster_root = current.empty() ? "/" : current; |
| |
| LOG_INFO("init meta_state_service succeed, cluster_root = {}", _cluster_root); |
| return ERR_OK; |
| } |
| |
| // visited in protection of failure_detector::_lock |
| void meta_service::set_node_state(const std::vector<host_port> &nodes, bool is_alive) |
| { |
| for (auto &node : nodes) { |
| if (is_alive) { |
| _alive_set.insert(node); |
| _dead_set.erase(node); |
| } else { |
| _alive_set.erase(node); |
| _dead_set.insert(node); |
| } |
| } |
| |
| METRIC_VAR_INCREMENT_BY(replica_server_disconnections, is_alive ? 0 : nodes.size()); |
| METRIC_VAR_SET(unalive_replica_servers, _dead_set.size()); |
| METRIC_VAR_SET(alive_replica_servers, _alive_set.size()); |
| |
| if (!_started) { |
| return; |
| } |
| for (const auto &hp : nodes) { |
| tasking::enqueue(LPC_META_STATE_HIGH, |
| nullptr, |
| std::bind(&server_state::on_change_node_state, _state.get(), hp, is_alive), |
| server_state::sStateHash); |
| } |
| } |
| |
| void meta_service::get_node_state(/*out*/ std::map<host_port, bool> &all_nodes) |
| { |
| zauto_lock l(_failure_detector->_lock); |
| for (auto &node : _alive_set) |
| all_nodes[node] = true; |
| for (auto &node : _dead_set) |
| all_nodes[node] = false; |
| } |
| |
| void meta_service::balancer_run() { _state->check_all_partitions(); } |
| |
| bool meta_service::try_lock_meta_op_status(meta_op_status op_status) |
| { |
| meta_op_status expected = meta_op_status::FREE; |
| if (!_meta_op_status.compare_exchange_strong(expected, op_status)) { |
| LOG_ERROR("LOCK meta op status failed, meta " |
| "server is busy, current op status is {}", |
| enum_to_string(expected)); |
| return false; |
| } |
| |
| LOG_INFO("LOCK meta op status to {}", enum_to_string(op_status)); |
| return true; |
| } |
| |
| void meta_service::unlock_meta_op_status() |
| { |
| LOG_INFO("UNLOCK meta op status from {}", enum_to_string(_meta_op_status.load())); |
| _meta_op_status.store(meta_op_status::FREE); |
| } |
| |
| void meta_service::register_ctrl_commands() |
| { |
| _ctrl_node_live_percentage_threshold_for_update = |
| dsn::command_manager::instance().register_int_command( |
| _node_live_percentage_threshold_for_update, |
| FLAGS_node_live_percentage_threshold_for_update, |
| "meta.live_percentage", |
| "node live percentage threshold for update", |
| [](int32_t new_value) -> bool { return new_value >= 0 && new_value <= 100; }); |
| } |
| |
| void meta_service::start_service() |
| { |
| zauto_lock l(_failure_detector->_lock); |
| |
| const meta_view view = _state->get_meta_view(); |
| for (auto &kv : *view.nodes) { |
| if (_dead_set.find(kv.first) == _dead_set.end()) |
| _alive_set.insert(kv.first); |
| } |
| |
| METRIC_VAR_SET(alive_replica_servers, _alive_set.size()); |
| |
| for (const auto &node : _alive_set) { |
| // sync alive set and the failure_detector |
| _failure_detector->unregister_worker(node); |
| _failure_detector->register_worker(node, true); |
| } |
| |
| _ranger_resource_policy_manager = |
| std::make_shared<ranger::ranger_resource_policy_manager>(this); |
| |
| _access_controller = security::create_meta_access_controller(_ranger_resource_policy_manager); |
| |
| _started = true; |
| for (const auto &node : _alive_set) { |
| tasking::enqueue(LPC_META_STATE_HIGH, |
| nullptr, |
| std::bind(&server_state::on_change_node_state, _state.get(), node, true), |
| server_state::sStateHash); |
| } |
| for (const auto &node : _dead_set) { |
| tasking::enqueue(LPC_META_STATE_HIGH, |
| nullptr, |
| std::bind(&server_state::on_change_node_state, _state.get(), node, false), |
| server_state::sStateHash); |
| } |
| |
| tasking::enqueue_timer(LPC_META_STATE_NORMAL, |
| nullptr, |
| std::bind(&meta_service::balancer_run, this), |
| std::chrono::milliseconds(FLAGS_lb_interval_ms), |
| server_state::sStateHash, |
| std::chrono::milliseconds(FLAGS_lb_interval_ms)); |
| |
| if (!FLAGS_cold_backup_disabled) { |
| LOG_INFO("start backup service"); |
| tasking::enqueue(LPC_DEFAULT_CALLBACK, |
| nullptr, |
| std::bind(&backup_service::start, _backup_handler.get())); |
| } |
| |
| if (_bulk_load_svc) { |
| LOG_INFO("start bulk load service"); |
| tasking::enqueue(LPC_META_CALLBACK, tracker(), [this]() { |
| _bulk_load_svc->initialize_bulk_load_service(); |
| }); |
| } |
| } |
| |
| // the start function is executed in threadpool default |
| error_code meta_service::start() |
| { |
| CHECK(!_started, "meta service is already started"); |
| register_ctrl_commands(); |
| |
| error_code err; |
| |
| err = remote_storage_initialize(); |
| dreturn_not_ok_logged(err, "init remote storage failed, err = {}", err); |
| LOG_INFO("remote storage is successfully initialized"); |
| |
| // start failure detector, and try to acquire the leader lock |
| _failure_detector.reset(new meta_server_failure_detector(this)); |
| if (FLAGS_enable_white_list) |
| _failure_detector->set_allow_list(_meta_opts.replica_white_list); |
| _failure_detector->register_ctrl_commands(); |
| |
| CHECK_GT_MSG(FLAGS_fd_grace_seconds, FLAGS_fd_lease_seconds, ""); |
| err = _failure_detector->start(FLAGS_fd_check_interval_seconds, |
| FLAGS_fd_beacon_interval_seconds, |
| FLAGS_fd_lease_seconds, |
| FLAGS_fd_grace_seconds, |
| FLAGS_enable_white_list); |
| |
| dreturn_not_ok_logged(err, "start failure_detector failed, err = {}", err); |
| LOG_INFO("meta service failure detector is successfully started {}", |
| FLAGS_enable_white_list ? "with whitelist enabled" : ""); |
| |
| // should register rpc handlers before acquiring leader lock, so that this meta service |
| // can tell others who is the current leader |
| register_rpc_handlers(); |
| |
| // start remote command service before acquiring leader lock, |
| // so that the command line call can be handled |
| dist::cmd::register_remote_command_rpc(); |
| |
| _failure_detector->acquire_leader_lock(); |
| CHECK(_failure_detector->get_leader(nullptr), "must be primary at this point"); |
| LOG_INFO("{}({}) got the primary lock, start to recover server state from remote storage", |
| dsn_primary_host_port(), |
| dsn_primary_address()); |
| |
| // initialize the load balancer |
| server_load_balancer *balancer = utils::factory_store<server_load_balancer>::create( |
| FLAGS_server_load_balancer_type, PROVIDER_TYPE_MAIN, this); |
| _balancer.reset(balancer); |
| // register control command to singleton-container for load balancer |
| _balancer->register_ctrl_commands(); |
| |
| partition_guardian *guardian = utils::factory_store<partition_guardian>::create( |
| FLAGS_partition_guardian_type, PROVIDER_TYPE_MAIN, this); |
| _partition_guardian.reset(guardian); |
| _partition_guardian->register_ctrl_commands(); |
| |
| // initializing the backup_handler should after remote_storage be initialized, |
| // because we should use _cluster_root |
| if (!FLAGS_cold_backup_disabled) { |
| LOG_INFO("initialize backup handler"); |
| _backup_handler = std::make_shared<backup_service>( |
| this, |
| utils::filesystem::concat_path_unix_style(_cluster_root, "backup"), |
| FLAGS_cold_backup_root); |
| } |
| |
| _bulk_load_svc = std::make_unique<bulk_load_service>( |
| this, utils::filesystem::concat_path_unix_style(_cluster_root, "bulk_load")); |
| |
| // initialize the server_state |
| _state->initialize(this, utils::filesystem::concat_path_unix_style(_cluster_root, "apps")); |
| while ((err = _state->initialize_data_structure()) != ERR_OK) { |
| if (err == ERR_OBJECT_NOT_FOUND && FLAGS_recover_from_replica_server) { |
| LOG_INFO("can't find apps from remote storage, and " |
| "[meta_server].recover_from_replica_server = true, " |
| "administrator should recover this cluster manually later"); |
| return dsn::ERR_OK; |
| } |
| LOG_ERROR("initialize server state from remote storage failed, err = {}, retry ...", err); |
| } |
| |
| _state->recover_from_max_replica_count_env(); |
| |
| initialize_duplication_service(); |
| recover_duplication_from_meta_state(); |
| |
| _split_svc = std::make_unique<meta_split_service>(this); |
| |
| _state->register_cli_commands(); |
| |
| start_service(); |
| |
| LOG_INFO("start meta_service succeed"); |
| |
| return ERR_OK; |
| } |
| |
| void meta_service::register_rpc_handlers() |
| { |
| register_rpc_handler_with_rpc_holder( |
| RPC_CM_CONFIG_SYNC, "config_sync", &meta_service::on_config_sync); |
| register_rpc_handler_with_rpc_holder(RPC_CM_QUERY_PARTITION_CONFIG_BY_INDEX, |
| "query_configuration_by_index", |
| &meta_service::on_query_configuration_by_index); |
| register_rpc_handler(RPC_CM_UPDATE_PARTITION_CONFIGURATION, |
| "update_configuration", |
| &meta_service::on_update_configuration); |
| register_rpc_handler(RPC_CM_CREATE_APP, "create_app", &meta_service::on_create_app); |
| register_rpc_handler(RPC_CM_DROP_APP, "drop_app", &meta_service::on_drop_app); |
| register_rpc_handler(RPC_CM_RECALL_APP, "recall_app", &meta_service::on_recall_app); |
| register_rpc_handler_with_rpc_holder( |
| RPC_CM_RENAME_APP, "rename_app", &meta_service::on_rename_app); |
| register_rpc_handler_with_rpc_holder( |
| RPC_CM_LIST_APPS, "list_apps", &meta_service::on_list_apps); |
| register_rpc_handler_with_rpc_holder( |
| RPC_CM_LIST_NODES, "list_nodes", &meta_service::on_list_nodes); |
| register_rpc_handler_with_rpc_holder( |
| RPC_CM_CLUSTER_INFO, "cluster_info", &meta_service::on_query_cluster_info); |
| register_rpc_handler_with_rpc_holder( |
| RPC_CM_PROPOSE_BALANCER, "propose_balancer", &meta_service::on_propose_balancer); |
| register_rpc_handler_with_rpc_holder( |
| RPC_CM_CONTROL_META, "control_meta_level", &meta_service::on_control_meta_level); |
| register_rpc_handler_with_rpc_holder( |
| RPC_CM_START_RECOVERY, "start_recovery", &meta_service::on_start_recovery); |
| register_rpc_handler(RPC_CM_START_RESTORE, "start_restore", &meta_service::on_start_restore); |
| register_rpc_handler( |
| RPC_CM_ADD_BACKUP_POLICY, "add_backup_policy", &meta_service::on_add_backup_policy); |
| register_rpc_handler_with_rpc_holder( |
| RPC_CM_QUERY_BACKUP_POLICY, "query_backup_policy", &meta_service::on_query_backup_policy); |
| register_rpc_handler_with_rpc_holder(RPC_CM_MODIFY_BACKUP_POLICY, |
| "modify_backup_policy", |
| &meta_service::on_modify_backup_policy); |
| register_rpc_handler_with_rpc_holder(RPC_CM_REPORT_RESTORE_STATUS, |
| "report_restore_status", |
| &meta_service::on_report_restore_status); |
| register_rpc_handler_with_rpc_holder(RPC_CM_QUERY_RESTORE_STATUS, |
| "query_restore_status", |
| &meta_service::on_query_restore_status); |
| register_duplication_rpc_handlers(); |
| register_rpc_handler_with_rpc_holder( |
| RPC_CM_UPDATE_APP_ENV, "update_app_env(set/del/clear)", &meta_service::update_app_env); |
| register_rpc_handler_with_rpc_holder( |
| RPC_CM_DDD_DIAGNOSE, "ddd_diagnose", &meta_service::ddd_diagnose); |
| register_rpc_handler_with_rpc_holder(RPC_CM_START_PARTITION_SPLIT, |
| "start_partition_split", |
| &meta_service::on_start_partition_split); |
| register_rpc_handler_with_rpc_holder(RPC_CM_CONTROL_PARTITION_SPLIT, |
| "control_partition_split(pause/restart/cancel)", |
| &meta_service::on_control_partition_split); |
| register_rpc_handler_with_rpc_holder(RPC_CM_QUERY_PARTITION_SPLIT, |
| "query_partition_split", |
| &meta_service::on_query_partition_split); |
| register_rpc_handler_with_rpc_holder(RPC_CM_REGISTER_CHILD_REPLICA, |
| "register_child_on_meta", |
| &meta_service::on_register_child_on_meta); |
| register_rpc_handler_with_rpc_holder( |
| RPC_CM_NOTIFY_STOP_SPLIT, "notify_stop_split", &meta_service::on_notify_stop_split); |
| register_rpc_handler_with_rpc_holder( |
| RPC_CM_QUERY_CHILD_STATE, "query_child_state", &meta_service::on_query_child_state); |
| register_rpc_handler_with_rpc_holder( |
| RPC_CM_START_BULK_LOAD, "start_bulk_load", &meta_service::on_start_bulk_load); |
| register_rpc_handler_with_rpc_holder( |
| RPC_CM_CONTROL_BULK_LOAD, "control_bulk_load", &meta_service::on_control_bulk_load); |
| register_rpc_handler_with_rpc_holder(RPC_CM_QUERY_BULK_LOAD_STATUS, |
| "query_bulk_load_status", |
| &meta_service::on_query_bulk_load_status); |
| register_rpc_handler_with_rpc_holder( |
| RPC_CM_CLEAR_BULK_LOAD, "clear_bulk_load", &meta_service::on_clear_bulk_load); |
| register_rpc_handler_with_rpc_holder( |
| RPC_CM_START_BACKUP_APP, "start_backup_app", &meta_service::on_start_backup_app); |
| register_rpc_handler_with_rpc_holder( |
| RPC_CM_QUERY_BACKUP_STATUS, "query_backup_status", &meta_service::on_query_backup_status); |
| register_rpc_handler_with_rpc_holder(RPC_CM_START_MANUAL_COMPACT, |
| "start_manual_compact", |
| &meta_service::on_start_manual_compact); |
| register_rpc_handler_with_rpc_holder(RPC_CM_QUERY_MANUAL_COMPACT_STATUS, |
| "query_manual_compact_status", |
| &meta_service::on_query_manual_compact_status); |
| register_rpc_handler_with_rpc_holder(RPC_CM_GET_MAX_REPLICA_COUNT, |
| "get_max_replica_count", |
| &meta_service::on_get_max_replica_count); |
| register_rpc_handler_with_rpc_holder(RPC_CM_SET_MAX_REPLICA_COUNT, |
| "set_max_replica_count", |
| &meta_service::on_set_max_replica_count); |
| } |
| |
| meta_leader_state meta_service::check_leader(dsn::message_ex *req, dsn::host_port *forward_address) |
| { |
| host_port leader; |
| if (!_failure_detector->get_leader(&leader)) { |
| if (!req->header->context.u.is_forward_supported) { |
| if (forward_address != nullptr) |
| *forward_address = leader; |
| return meta_leader_state::kNotLeaderAndCannotForwardRpc; |
| } |
| |
| LOG_DEBUG("leader address: {}", leader); |
| if (!leader.is_invalid()) { |
| dsn_rpc_forward(req, dsn::dns_resolver::instance().resolve_address(leader)); |
| return meta_leader_state::kNotLeaderAndCanForwardRpc; |
| } else { |
| if (forward_address != nullptr) |
| forward_address->reset(); |
| return meta_leader_state::kNotLeaderAndCannotForwardRpc; |
| } |
| } |
| return meta_leader_state::kIsLeader; |
| } |
| |
| // table operations |
| void meta_service::on_create_app(dsn::message_ex *req) |
| { |
| if (!check_status_and_authz_with_reply<configuration_create_app_request, |
| configuration_create_app_response>(req)) { |
| return; |
| } |
| |
| req->add_ref(); |
| tasking::enqueue(LPC_META_STATE_NORMAL, |
| nullptr, |
| std::bind(&server_state::create_app, _state.get(), req), |
| server_state::sStateHash); |
| } |
| |
| void meta_service::on_drop_app(dsn::message_ex *req) |
| { |
| if (!check_status_and_authz_with_reply<configuration_drop_app_request, |
| configuration_drop_app_response>(req)) { |
| return; |
| } |
| |
| req->add_ref(); |
| tasking::enqueue(LPC_META_STATE_NORMAL, |
| nullptr, |
| std::bind(&server_state::drop_app, _state.get(), req), |
| server_state::sStateHash); |
| } |
| |
| void meta_service::on_rename_app(configuration_rename_app_rpc rpc) |
| { |
| if (!check_status_and_authz(rpc, nullptr, rpc.request().old_app_name)) { |
| return; |
| } |
| |
| tasking::enqueue(LPC_META_STATE_NORMAL, |
| tracker(), |
| std::bind(&server_state::rename_app, _state.get(), rpc), |
| server_state::sStateHash); |
| } |
| |
| void meta_service::on_recall_app(dsn::message_ex *req) |
| { |
| configuration_recall_app_request request; |
| configuration_recall_app_response response; |
| dsn::message_ex *copied_req = message_ex::copy_message_no_reply(*req); |
| dsn::unmarshall(copied_req, request); |
| auto target_app = _state->get_app(request.app_id); |
| if (!target_app) { |
| response.err = ERR_APP_NOT_EXIST; |
| reply(req, response); |
| return; |
| } |
| const std::string &app_name = target_app->app_name; |
| |
| if (!check_status_and_authz_with_reply(req, response, app_name)) { |
| return; |
| } |
| // check new_app_name reasonable. |
| // when the Ranger ACL is enabled, ensure that the prefix of new_app_name is consistent with |
| // old, or it is empty |
| if (_access_controller->is_enable_ranger_acl() && !request.new_app_name.empty()) { |
| std::string app_name_prefix = ranger::get_database_name_from_app_name(app_name); |
| std::string new_app_name_prefix = |
| ranger::get_database_name_from_app_name(request.new_app_name); |
| if (app_name_prefix != new_app_name_prefix) { |
| response.err = ERR_INVALID_PARAMETERS; |
| reply(req, response); |
| return; |
| } |
| } |
| |
| req->add_ref(); |
| tasking::enqueue(LPC_META_STATE_NORMAL, |
| nullptr, |
| std::bind(&server_state::recall_app, _state.get(), req), |
| server_state::sStateHash); |
| } |
| |
| void meta_service::on_list_apps(configuration_list_apps_rpc rpc) |
| { |
| if (!check_leader_status(rpc)) { |
| return; |
| } |
| |
| dsn::message_ex *msg = nullptr; |
| if (_access_controller->is_enable_ranger_acl()) { |
| msg = rpc.dsn_request(); |
| } |
| _state->list_apps(rpc.request(), rpc.response(), msg); |
| } |
| |
| void meta_service::on_list_nodes(configuration_list_nodes_rpc rpc) |
| { |
| if (!check_status_and_authz(rpc)) { |
| return; |
| } |
| |
| configuration_list_nodes_response &response = rpc.response(); |
| const configuration_list_nodes_request &request = rpc.request(); |
| { |
| zauto_lock l(_failure_detector->_lock); |
| dsn::replication::node_info info; |
| if (request.status == node_status::NS_INVALID || request.status == node_status::NS_ALIVE) { |
| info.status = node_status::NS_ALIVE; |
| for (auto &node : _alive_set) { |
| info.address = dsn::dns_resolver::instance().resolve_address(node); |
| info.__set_hp_address(node); |
| response.infos.push_back(info); |
| } |
| } |
| if (request.status == node_status::NS_INVALID || |
| request.status == node_status::NS_UNALIVE) { |
| info.status = node_status::NS_UNALIVE; |
| for (auto &node : _dead_set) { |
| info.address = dsn::dns_resolver::instance().resolve_address(node); |
| info.__set_hp_address(node); |
| response.infos.push_back(info); |
| } |
| } |
| response.err = dsn::ERR_OK; |
| } |
| } |
| |
| void meta_service::on_query_cluster_info(configuration_cluster_info_rpc rpc) |
| { |
| if (!check_status_and_authz(rpc)) { |
| return; |
| } |
| |
| std::stringstream oss; |
| configuration_cluster_info_response &response = rpc.response(); |
| response.keys.push_back("meta_servers"); |
| for (size_t i = 0; i < _opts.meta_servers.size(); ++i) { |
| if (i != 0) |
| oss << ","; |
| oss << _opts.meta_servers[i]; |
| } |
| |
| response.values.push_back(oss.str()); |
| response.keys.push_back("primary_meta_server"); |
| response.values.push_back(dsn_primary_host_port().to_string()); |
| response.keys.push_back("zookeeper_hosts"); |
| response.values.push_back(FLAGS_hosts_list); |
| response.keys.push_back("zookeeper_root"); |
| response.values.push_back(_cluster_root); |
| response.keys.push_back("meta_function_level"); |
| response.values.push_back( |
| _meta_function_level_VALUES_TO_NAMES.find(get_function_level())->second + 3); |
| response.keys.push_back("balance_operation_count"); |
| std::vector<std::string> balance_operation_type; |
| balance_operation_type.emplace_back(std::string("detail")); |
| response.values.push_back(_balancer->get_balance_operation_count(balance_operation_type)); |
| double primary_stddev, total_stddev; |
| _state->get_cluster_balance_score(primary_stddev, total_stddev); |
| response.keys.push_back("primary_replica_count_stddev"); |
| response.values.push_back(fmt::format("{:.{}f}", primary_stddev, 2)); |
| response.keys.push_back("total_replica_count_stddev"); |
| response.values.push_back(fmt::format("{:.{}f}", total_stddev, 2)); |
| response.keys.push_back("cluster_name"); |
| response.values.push_back(get_current_cluster_name()); |
| response.err = dsn::ERR_OK; |
| } |
| |
| // client => meta server |
| void meta_service::on_query_configuration_by_index(configuration_query_by_index_rpc rpc) |
| { |
| query_cfg_response &response = rpc.response(); |
| host_port forward_hp; |
| if (!check_status_and_authz(rpc, &forward_hp)) { |
| if (!forward_hp.is_invalid()) { |
| partition_configuration config; |
| config.primary = dsn::dns_resolver::instance().resolve_address(forward_hp); |
| config.__set_hp_primary(forward_hp); |
| response.partitions.push_back(std::move(config)); |
| } |
| return; |
| } |
| |
| _state->query_configuration_by_index(rpc.request(), response); |
| if (ERR_OK == response.err) { |
| LOG_INFO("client {} queried an available app {} with appid {}", |
| rpc.dsn_request()->header->from_address, |
| rpc.request().app_name, |
| response.app_id); |
| } |
| } |
| |
| // partition sever => meta sever |
| // as get stale configuration is not allowed for partition server, we need to dispatch it to the |
| // meta state thread pool |
| void meta_service::on_config_sync(configuration_query_by_node_rpc rpc) |
| { |
| if (!check_status_and_authz(rpc)) { |
| return; |
| } |
| |
| { |
| // this code piece should be referenced together with meta_service::set_node_state. |
| // In which, the replica server's failure event is dispatched to the meta_state_thread with |
| // the protection |
| // of failure_detector::_lock. Here we use this lock again, to make sure the config_sync rpc |
| // AFTER the node dead is dispatch |
| // AFTER the node dead event |
| zauto_lock l(_failure_detector->_lock); |
| tasking::enqueue(LPC_META_STATE_HIGH, |
| nullptr, |
| std::bind(&server_state::on_config_sync, _state.get(), rpc), |
| server_state::sStateHash); |
| } |
| } |
| |
| void meta_service::on_update_configuration(dsn::message_ex *req) |
| { |
| configuration_update_response response; |
| if (!check_status_and_authz_with_reply(req, response)) { |
| return; |
| } |
| |
| std::shared_ptr<configuration_update_request> request = |
| std::make_shared<configuration_update_request>(); |
| dsn::unmarshall(req, *request); |
| |
| meta_function_level::type level = get_function_level(); |
| if (level <= meta_function_level::fl_freezed) { |
| response.err = ERR_STATE_FREEZED; |
| _state->query_configuration_by_gpid(request->config.pid, response.config); |
| reply(req, response); |
| |
| LOG_INFO("refuse request {} coz meta function level is {}", |
| boost::lexical_cast<std::string>(*request), |
| _meta_function_level_VALUES_TO_NAMES.find(level)->second); |
| return; |
| } |
| |
| req->add_ref(); |
| tasking::enqueue(LPC_META_STATE_HIGH, |
| nullptr, |
| std::bind(&server_state::on_update_configuration, _state.get(), request, req), |
| server_state::sStateHash); |
| } |
| |
| void meta_service::on_control_meta_level(configuration_meta_control_rpc rpc) |
| { |
| if (!check_status_and_authz(rpc)) { |
| return; |
| } |
| |
| const configuration_meta_control_request &request = rpc.request(); |
| configuration_meta_control_response &response = rpc.response(); |
| response.err = ERR_OK; |
| response.old_level = _function_level.load(); |
| if (request.level == meta_function_level::fl_invalid) { |
| return; |
| } |
| |
| if (request.level <= meta_function_level::fl_steady) { |
| tasking::enqueue(LPC_META_STATE_NORMAL, |
| nullptr, |
| std::bind(&server_state::clear_proposals, _state.get()), |
| server_state::sStateHash); |
| } |
| |
| _function_level.store(request.level); |
| } |
| |
| void meta_service::on_propose_balancer(configuration_balancer_rpc rpc) |
| { |
| CHECK_APP_ID_STATUS_AND_AUTHZ(rpc.request().gpid.get_app_id()); |
| const configuration_balancer_request &request = rpc.request(); |
| LOG_INFO("get proposal balancer request, gpid({})", request.gpid); |
| _state->on_propose_balancer(request, rpc.response()); |
| } |
| |
| void meta_service::on_start_recovery(configuration_recovery_rpc rpc) |
| { |
| configuration_recovery_response &response = rpc.response(); |
| LOG_INFO("got start recovery request, start to do recovery"); |
| auto result = check_leader(rpc, nullptr); |
| // request has been forwarded to others |
| if (result == meta_leader_state::kNotLeaderAndCanForwardRpc) { |
| return; |
| } |
| |
| if (result == meta_leader_state::kNotLeaderAndCannotForwardRpc) { |
| response.err = ERR_FORWARD_TO_OTHERS; |
| } else { |
| zauto_write_lock l(_meta_lock); |
| if (_started.load()) { |
| LOG_INFO("service({}({})) is already started, ignore the recovery request", |
| dsn_primary_host_port(), |
| dsn_primary_address()); |
| response.err = ERR_SERVICE_ALREADY_RUNNING; |
| } else { |
| _state->on_start_recovery(rpc.request(), response); |
| if (response.err == dsn::ERR_OK) { |
| _recovering = false; |
| start_service(); |
| } |
| } |
| } |
| } |
| |
| void meta_service::on_start_restore(dsn::message_ex *req) |
| { |
| if (!check_status_and_authz_with_reply<configuration_restore_request, |
| configuration_create_app_response>(req)) { |
| return; |
| } |
| |
| req->add_ref(); |
| tasking::enqueue( |
| LPC_RESTORE_BACKGROUND, nullptr, std::bind(&server_state::restore_app, _state.get(), req)); |
| } |
| |
| // TODO(heyuchen): implement it |
| void meta_service::on_add_backup_policy(dsn::message_ex *req) {} |
| |
| // TODO(heyuchen): implement it |
| void meta_service::on_query_backup_policy(query_backup_policy_rpc policy_rpc) {} |
| |
| // TODO(heyuchen): implement it |
| void meta_service::on_modify_backup_policy(configuration_modify_backup_policy_rpc rpc) {} |
| |
| void meta_service::on_report_restore_status(configuration_report_restore_status_rpc rpc) |
| { |
| if (!check_status_and_authz(rpc)) { |
| return; |
| } |
| |
| tasking::enqueue(LPC_META_STATE_NORMAL, |
| nullptr, |
| std::bind(&server_state::on_recv_restore_report, _state.get(), rpc)); |
| } |
| |
| void meta_service::on_query_restore_status(configuration_query_restore_rpc rpc) |
| { |
| CHECK_APP_ID_STATUS_AND_AUTHZ(rpc.request().restore_app_id); |
| tasking::enqueue(LPC_META_STATE_NORMAL, |
| nullptr, |
| std::bind(&server_state::on_query_restore_status, _state.get(), rpc)); |
| } |
| |
| void meta_service::on_add_duplication(duplication_add_rpc rpc) |
| { |
| if (!check_status_and_authz(rpc, nullptr, rpc.request().app_name)) { |
| return; |
| } |
| |
| if (!_dup_svc) { |
| rpc.response().err = ERR_SERVICE_NOT_ACTIVE; |
| return; |
| } |
| tasking::enqueue(LPC_META_STATE_NORMAL, |
| tracker(), |
| [this, rpc]() { _dup_svc->add_duplication(std::move(rpc)); }, |
| server_state::sStateHash); |
| } |
| |
| void meta_service::on_modify_duplication(duplication_modify_rpc rpc) |
| { |
| if (!check_status_and_authz(rpc, nullptr, rpc.request().app_name)) { |
| return; |
| } |
| |
| if (!_dup_svc) { |
| rpc.response().err = ERR_SERVICE_NOT_ACTIVE; |
| return; |
| } |
| tasking::enqueue(LPC_META_STATE_NORMAL, |
| tracker(), |
| [this, rpc]() { _dup_svc->modify_duplication(std::move(rpc)); }, |
| server_state::sStateHash); |
| } |
| |
| void meta_service::on_query_duplication_info(duplication_query_rpc rpc) |
| { |
| if (!check_status_and_authz(rpc, nullptr, rpc.request().app_name)) { |
| return; |
| } |
| |
| if (_dup_svc) { |
| _dup_svc->query_duplication_info(rpc.request(), rpc.response()); |
| } else { |
| rpc.response().err = ERR_SERVICE_NOT_ACTIVE; |
| } |
| } |
| |
| void meta_service::on_duplication_sync(duplication_sync_rpc rpc) |
| { |
| if (!check_status_and_authz(rpc)) { |
| return; |
| } |
| |
| tasking::enqueue(LPC_META_STATE_NORMAL, |
| tracker(), |
| [this, rpc]() { |
| if (_dup_svc) { |
| _dup_svc->duplication_sync(std::move(rpc)); |
| } else { |
| rpc.response().err = ERR_SERVICE_NOT_ACTIVE; |
| } |
| }, |
| server_state::sStateHash); |
| } |
| |
| void meta_service::recover_duplication_from_meta_state() |
| { |
| if (_dup_svc) { |
| _dup_svc->recover_from_meta_state(); |
| } |
| } |
| |
| void meta_service::register_duplication_rpc_handlers() |
| { |
| register_rpc_handler_with_rpc_holder( |
| RPC_CM_ADD_DUPLICATION, "add_duplication", &meta_service::on_add_duplication); |
| register_rpc_handler_with_rpc_holder( |
| RPC_CM_MODIFY_DUPLICATION, "modify duplication", &meta_service::on_modify_duplication); |
| register_rpc_handler_with_rpc_holder(RPC_CM_QUERY_DUPLICATION, |
| "query duplication info", |
| &meta_service::on_query_duplication_info); |
| register_rpc_handler_with_rpc_holder( |
| RPC_CM_DUPLICATION_SYNC, "sync duplication", &meta_service::on_duplication_sync); |
| } |
| |
| void meta_service::initialize_duplication_service() |
| { |
| if (FLAGS_duplication_enabled) { |
| _dup_svc = std::make_unique<meta_duplication_service>(_state.get(), this); |
| } |
| } |
| |
| void meta_service::update_app_env(app_env_rpc env_rpc) |
| { |
| if (!check_status_and_authz(env_rpc, nullptr, env_rpc.request().app_name)) { |
| return; |
| } |
| |
| auto &response = env_rpc.response(); |
| app_env_operation::type op = env_rpc.request().op; |
| switch (op) { |
| case app_env_operation::type::APP_ENV_OP_SET: |
| tasking::enqueue(LPC_META_STATE_NORMAL, |
| nullptr, |
| std::bind(&server_state::set_app_envs, _state.get(), env_rpc)); |
| break; |
| case app_env_operation::type::APP_ENV_OP_DEL: |
| tasking::enqueue(LPC_META_STATE_NORMAL, |
| nullptr, |
| std::bind(&server_state::del_app_envs, _state.get(), env_rpc)); |
| break; |
| case app_env_operation::type::APP_ENV_OP_CLEAR: |
| tasking::enqueue(LPC_META_STATE_NORMAL, |
| nullptr, |
| std::bind(&server_state::clear_app_envs, _state.get(), env_rpc)); |
| break; |
| default: // app_env_operation::type::APP_ENV_OP_INVALID |
| LOG_WARNING("recv a invalid update app_env request, just ignore"); |
| response.err = ERR_INVALID_PARAMETERS; |
| response.hint_message = |
| "recv a invalid update_app_env request with op = APP_ENV_OP_INVALID"; |
| break; |
| } |
| } |
| |
| void meta_service::ddd_diagnose(ddd_diagnose_rpc rpc) |
| { |
| CHECK_APP_ID_STATUS_AND_AUTHZ(rpc.request().pid.get_app_id()); |
| auto &response = rpc.response(); |
| get_partition_guardian()->get_ddd_partitions(rpc.request().pid, response.partitions); |
| response.err = ERR_OK; |
| } |
| |
| void meta_service::on_start_partition_split(start_split_rpc rpc) |
| { |
| if (!check_status_and_authz(rpc, nullptr, rpc.request().app_name)) { |
| return; |
| } |
| if (_split_svc == nullptr) { |
| LOG_ERROR("meta doesn't support partition split"); |
| rpc.response().err = ERR_SERVICE_NOT_ACTIVE; |
| return; |
| } |
| tasking::enqueue(LPC_META_STATE_NORMAL, |
| tracker(), |
| [this, rpc]() { _split_svc->start_partition_split(std::move(rpc)); }, |
| server_state::sStateHash); |
| } |
| |
| void meta_service::on_control_partition_split(control_split_rpc rpc) |
| { |
| if (!check_status_and_authz(rpc, nullptr, rpc.request().app_name)) { |
| return; |
| } |
| |
| if (_split_svc == nullptr) { |
| LOG_ERROR("meta doesn't support partition split"); |
| rpc.response().err = ERR_SERVICE_NOT_ACTIVE; |
| return; |
| } |
| tasking::enqueue(LPC_META_STATE_NORMAL, |
| tracker(), |
| [this, rpc]() { _split_svc->control_partition_split(std::move(rpc)); }, |
| server_state::sStateHash); |
| } |
| |
| void meta_service::on_query_partition_split(query_split_rpc rpc) |
| { |
| if (!check_status_and_authz(rpc, nullptr, rpc.request().app_name)) { |
| return; |
| } |
| |
| if (_split_svc == nullptr) { |
| LOG_ERROR("meta doesn't support partition split"); |
| rpc.response().err = ERR_SERVICE_NOT_ACTIVE; |
| return; |
| } |
| _split_svc->query_partition_split(std::move(rpc)); |
| } |
| |
| void meta_service::on_register_child_on_meta(register_child_rpc rpc) |
| { |
| if (!check_status_and_authz(rpc, nullptr, rpc.request().app.app_name)) { |
| return; |
| } |
| |
| tasking::enqueue(LPC_META_STATE_NORMAL, |
| tracker(), |
| [this, rpc]() { _split_svc->register_child_on_meta(std::move(rpc)); }, |
| server_state::sStateHash); |
| } |
| |
| void meta_service::on_notify_stop_split(notify_stop_split_rpc rpc) |
| { |
| if (!check_status_and_authz(rpc, nullptr, rpc.request().app_name)) { |
| return; |
| } |
| if (_split_svc == nullptr) { |
| LOG_ERROR("meta doesn't support partition split"); |
| rpc.response().err = ERR_SERVICE_NOT_ACTIVE; |
| return; |
| } |
| tasking::enqueue(LPC_META_STATE_NORMAL, |
| tracker(), |
| [this, rpc]() { _split_svc->notify_stop_split(std::move(rpc)); }, |
| server_state::sStateHash); |
| } |
| |
| void meta_service::on_query_child_state(query_child_state_rpc rpc) |
| { |
| if (!check_status_and_authz(rpc, nullptr, rpc.request().app_name)) { |
| return; |
| } |
| if (_split_svc == nullptr) { |
| LOG_ERROR("meta doesn't support partition split"); |
| rpc.response().err = ERR_SERVICE_NOT_ACTIVE; |
| return; |
| } |
| _split_svc->query_child_state(std::move(rpc)); |
| } |
| |
| void meta_service::on_start_bulk_load(start_bulk_load_rpc rpc) |
| { |
| if (!check_status_and_authz(rpc, nullptr, rpc.request().app_name)) { |
| return; |
| } |
| |
| if (_bulk_load_svc == nullptr) { |
| LOG_ERROR("meta doesn't support bulk load"); |
| rpc.response().err = ERR_SERVICE_NOT_ACTIVE; |
| return; |
| } |
| _bulk_load_svc->on_start_bulk_load(std::move(rpc)); |
| } |
| |
| void meta_service::on_control_bulk_load(control_bulk_load_rpc rpc) |
| { |
| if (!check_status_and_authz(rpc, nullptr, rpc.request().app_name)) { |
| return; |
| } |
| |
| if (_bulk_load_svc == nullptr) { |
| LOG_ERROR("meta doesn't support bulk load"); |
| rpc.response().err = ERR_SERVICE_NOT_ACTIVE; |
| return; |
| } |
| tasking::enqueue(LPC_META_STATE_NORMAL, |
| tracker(), |
| [this, rpc]() { _bulk_load_svc->on_control_bulk_load(std::move(rpc)); }, |
| server_state::sStateHash); |
| } |
| |
| void meta_service::on_query_bulk_load_status(query_bulk_load_rpc rpc) |
| { |
| if (!check_status_and_authz(rpc, nullptr, rpc.request().app_name)) { |
| return; |
| } |
| |
| if (_bulk_load_svc == nullptr) { |
| LOG_ERROR("meta doesn't support bulk load"); |
| rpc.response().err = ERR_SERVICE_NOT_ACTIVE; |
| return; |
| } |
| _bulk_load_svc->on_query_bulk_load_status(std::move(rpc)); |
| } |
| |
| void meta_service::on_clear_bulk_load(clear_bulk_load_rpc rpc) |
| { |
| if (!check_status_and_authz(rpc, nullptr, rpc.request().app_name)) { |
| return; |
| } |
| |
| if (_bulk_load_svc == nullptr) { |
| LOG_ERROR("meta doesn't support bulk load"); |
| rpc.response().err = ERR_SERVICE_NOT_ACTIVE; |
| return; |
| } |
| tasking::enqueue(LPC_META_STATE_NORMAL, |
| tracker(), |
| [this, rpc]() { _bulk_load_svc->on_clear_bulk_load(std::move(rpc)); }, |
| server_state::sStateHash); |
| } |
| |
| // TODO(heyuchen): implement it |
| void meta_service::on_start_backup_app(start_backup_app_rpc rpc) {} |
| |
| // TODO(heyuchen): implement it |
| void meta_service::on_query_backup_status(query_backup_status_rpc rpc) {} |
| |
| size_t meta_service::get_alive_node_count() const |
| { |
| zauto_lock l(_failure_detector->_lock); |
| return _alive_set.size(); |
| } |
| |
| void meta_service::on_start_manual_compact(start_manual_compact_rpc rpc) |
| { |
| if (!check_status_and_authz(rpc, nullptr, rpc.request().app_name)) { |
| return; |
| } |
| tasking::enqueue(LPC_META_STATE_NORMAL, |
| nullptr, |
| std::bind(&server_state::on_start_manual_compact, _state.get(), rpc)); |
| } |
| |
| void meta_service::on_query_manual_compact_status(query_manual_compact_rpc rpc) |
| { |
| if (!check_status_and_authz(rpc, nullptr, rpc.request().app_name)) { |
| return; |
| } |
| tasking::enqueue(LPC_META_STATE_NORMAL, |
| nullptr, |
| std::bind(&server_state::on_query_manual_compact_status, _state.get(), rpc)); |
| } |
| |
| // ThreadPool: THREAD_POOL_META_SERVER |
| void meta_service::on_get_max_replica_count(configuration_get_max_replica_count_rpc rpc) |
| { |
| if (!check_status_and_authz(rpc, nullptr, rpc.request().app_name)) { |
| return; |
| } |
| |
| tasking::enqueue(LPC_META_STATE_NORMAL, |
| tracker(), |
| std::bind(&server_state::get_max_replica_count, _state.get(), rpc), |
| server_state::sStateHash); |
| } |
| |
| // ThreadPool: THREAD_POOL_META_SERVER |
| void meta_service::on_set_max_replica_count(configuration_set_max_replica_count_rpc rpc) |
| { |
| if (!check_status_and_authz(rpc, nullptr, rpc.request().app_name)) { |
| return; |
| } |
| |
| tasking::enqueue(LPC_META_STATE_NORMAL, |
| tracker(), |
| std::bind(&server_state::set_max_replica_count, _state.get(), rpc), |
| server_state::sStateHash); |
| } |
| |
| } // namespace replication |
| } // namespace dsn |