blob: 1db53b83711ae5e091da54c3479f888cfd51faa3 [file] [log] [blame]
/*
* The MIT License (MIT)
*
* Copyright (c) 2015 Microsoft Corporation
*
* -=- Robust Distributed System Nucleus (rDSN) -=-
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
#pragma once
#include <fmt/core.h>
#include <stddef.h>
#include <stdint.h>
#include <atomic>
#include <map>
#include <memory>
#include <set>
#include <string>
#include <vector>
#include "block_service/block_service_manager.h"
#include "common/bulk_load_common.h"
#include "common/duplication_common.h"
#include "common/manual_compact.h"
#include "common/partition_split_common.h"
#include "common/replication_common.h"
#include "meta_admin_types.h"
#include "meta_options.h"
#include "meta_rpc_types.h"
#include "meta_server_failure_detector.h"
#include "runtime/api_layer1.h"
#include "runtime/rpc/dns_resolver.h"
#include "runtime/rpc/network.h"
#include "runtime/rpc/rpc_host_port.h"
#include "runtime/rpc/rpc_message.h"
#include "runtime/rpc/serialization.h"
#include "security/access_controller.h"
#include "runtime/serverlet.h"
#include "runtime/task/task.h"
#include "runtime/task/task_code.h"
#include "runtime/task/task_tracker.h"
#include "utils/autoref_ptr.h"
#include "utils/enum_helper.h"
#include "utils/error_code.h"
#include "utils/fmt_logging.h"
#include "utils/metrics.h"
#include "utils/threadpool_code.h"
#include "utils/zlocks.h"
namespace dsn {
class command_deregister;
namespace ranger {
class ranger_resource_policy_manager;
} // namespace ranger
namespace dist {
class meta_state_service;
} // namespace dist
namespace replication {
class backup_service;
class bulk_load_service;
class meta_duplication_service;
class meta_split_service;
class partition_guardian;
class server_load_balancer;
class server_state;
namespace mss {
struct meta_storage;
} // namespace mss
namespace test {
class test_checker;
}
DEFINE_TASK_CODE(LPC_DEFAULT_CALLBACK, TASK_PRIORITY_COMMON, dsn::THREAD_POOL_DEFAULT)
enum class meta_op_status
{
FREE = 0,
RECALL,
BALANCE,
BACKUP,
BULKLOAD,
RESTORE,
MANUAL_COMPACT,
INVALID
};
ENUM_BEGIN(meta_op_status, meta_op_status::INVALID)
ENUM_REG(meta_op_status::FREE)
ENUM_REG(meta_op_status::RECALL)
ENUM_REG(meta_op_status::BALANCE)
ENUM_REG(meta_op_status::BACKUP)
ENUM_REG(meta_op_status::BULKLOAD)
ENUM_REG(meta_op_status::RESTORE)
ENUM_REG(meta_op_status::MANUAL_COMPACT)
ENUM_END(meta_op_status)
// The leader status of meta server
enum class meta_leader_state : int
{
kIsLeader, // the meta is leader
kNotLeaderAndCanForwardRpc, // meta isn't leader, and rpc-msg can forward to others
kNotLeaderAndCannotForwardRpc, // meta isn't leader, and rpc-msg can't forward to others
};
// Meta server service for EON (rDSN layer 2).
class meta_service : public serverlet<meta_service>
{
public:
meta_service();
virtual ~meta_service();
error_code start();
void stop();
const replication_options &get_options() const { return _opts; }
const meta_options &get_meta_options() const { return _meta_opts; }
/// NOTE: prefer using mss::meta_storage instead.
dist::meta_state_service *get_remote_storage() const { return _storage.get(); }
mss::meta_storage *get_meta_storage() const { return _meta_storage.get(); }
server_state *get_server_state() { return _state.get(); }
security::access_controller *get_access_controller() { return _access_controller.get(); }
server_load_balancer *get_balancer() { return _balancer.get(); }
partition_guardian *get_partition_guardian() { return _partition_guardian.get(); }
dist::block_service::block_service_manager &get_block_service_manager()
{
return _block_service_manager;
}
bulk_load_service *get_bulk_load_service() { return _bulk_load_svc.get(); }
meta_function_level::type get_function_level()
{
meta_function_level::type level = _function_level.load();
if (level > meta_function_level::fl_freezed && check_freeze()) {
level = meta_function_level::fl_freezed;
}
return level;
}
void set_function_level(meta_function_level::type level) { _function_level.store(level); }
template <typename TResponse>
void reply_data(dsn::message_ex *request, const TResponse &data)
{
dsn::message_ex *response = request->create_response();
dsn::marshall(response, data);
reply_message(request, response);
}
virtual void reply_message(dsn::message_ex *, dsn::message_ex *response)
{
dsn_rpc_reply(response);
}
virtual void send_message(const host_port &target, dsn::message_ex *request)
{
dsn_rpc_call_one_way(dsn::dns_resolver::instance().resolve_address(target), request);
}
virtual void send_request(dsn::message_ex * /*req*/,
const host_port &target,
const rpc_response_task_ptr &callback)
{
dsn_rpc_call(dsn::dns_resolver::instance().resolve_address(target), callback);
}
// these two callbacks are running in fd's thread_pool, and in fd's lock
void set_node_state(const std::vector<host_port> &nodes_list, bool is_alive);
void get_node_state(/*out*/ std::map<host_port, bool> &all_nodes);
void start_service();
void balancer_run();
dsn::task_tracker *tracker() { return &_tracker; }
size_t get_alive_node_count() const;
bool try_lock_meta_op_status(meta_op_status op_status);
void unlock_meta_op_status();
meta_op_status get_op_status() const { return _meta_op_status.load(); }
std::string get_meta_list_string() const
{
std::string metas;
for (const auto &node : _opts.meta_servers) {
metas = fmt::format("{}{},", metas, node.to_string());
}
return metas.substr(0, metas.length() - 1);
}
std::string cluster_root() const { return _cluster_root; }
private:
void register_rpc_handlers();
void register_ctrl_commands();
// client => meta server
void on_query_configuration_by_index(configuration_query_by_index_rpc rpc);
// partition server => meta server
void on_config_sync(configuration_query_by_node_rpc rpc);
// update configuration
void on_propose_balancer(configuration_balancer_rpc rpc);
void on_update_configuration(dsn::message_ex *req);
// app operations
void on_create_app(dsn::message_ex *req);
void on_drop_app(dsn::message_ex *req);
void on_recall_app(dsn::message_ex *req);
void on_rename_app(configuration_rename_app_rpc rpc);
void on_list_apps(configuration_list_apps_rpc rpc);
void on_list_nodes(configuration_list_nodes_rpc rpc);
// app env operations
void update_app_env(app_env_rpc env_rpc);
// ddd diagnose
void ddd_diagnose(ddd_diagnose_rpc rpc);
// cluster info
void on_query_cluster_info(configuration_cluster_info_rpc rpc);
// meta control
void on_control_meta_level(configuration_meta_control_rpc rpc);
void on_start_recovery(configuration_recovery_rpc rpc);
// backup/restore
void on_start_backup_app(start_backup_app_rpc rpc);
void on_query_backup_status(query_backup_status_rpc rpc);
void on_start_restore(dsn::message_ex *req);
void on_add_backup_policy(dsn::message_ex *req);
void on_query_backup_policy(query_backup_policy_rpc policy_rpc);
void on_modify_backup_policy(configuration_modify_backup_policy_rpc rpc);
void on_report_restore_status(configuration_report_restore_status_rpc rpc);
void on_query_restore_status(configuration_query_restore_rpc rpc);
// duplication
void on_add_duplication(duplication_add_rpc rpc);
void on_modify_duplication(duplication_modify_rpc rpc);
void on_query_duplication_info(duplication_query_rpc rpc);
void on_duplication_sync(duplication_sync_rpc rpc);
void register_duplication_rpc_handlers();
void recover_duplication_from_meta_state();
void initialize_duplication_service();
// split
void on_start_partition_split(start_split_rpc rpc);
void on_control_partition_split(control_split_rpc rpc);
void on_query_partition_split(query_split_rpc rpc);
void on_register_child_on_meta(register_child_rpc rpc);
void on_notify_stop_split(notify_stop_split_rpc rpc);
void on_query_child_state(query_child_state_rpc rpc);
// bulk load
void on_start_bulk_load(start_bulk_load_rpc rpc);
void on_control_bulk_load(control_bulk_load_rpc rpc);
void on_query_bulk_load_status(query_bulk_load_rpc rpc);
void on_clear_bulk_load(clear_bulk_load_rpc rpc);
// manual compaction
void on_start_manual_compact(start_manual_compact_rpc rpc);
void on_query_manual_compact_status(query_manual_compact_rpc rpc);
// get/set max_replica_count of an app
void on_get_max_replica_count(configuration_get_max_replica_count_rpc rpc);
void on_set_max_replica_count(configuration_set_max_replica_count_rpc rpc);
// if return 'kNotLeaderAndCannotForwardRpc' and 'forward_address' != nullptr, then return
// leader by 'forward_address'.
meta_leader_state check_leader(dsn::message_ex *req, dsn::host_port *forward_address);
template <typename TRpcHolder>
meta_leader_state check_leader(TRpcHolder rpc, /*out*/ host_port *forward_address);
// app_name: when the Ranger ACL is enabled, some rpc requests need to verify the app_name
// ret:
// false: rpc request check failed because check leader failed or ACL authentication failed
// true: rpc request check and authentication succeed
template <typename TRpcHolder>
bool check_status_and_authz(TRpcHolder rpc,
/*out*/ host_port *forward_address = nullptr,
const std::string &app_name = "");
// app_name: when the Ranger ACL is enabled, some rpc requests need to verify the app_name
// ret:
// false: rpc request check failed because check leader failed or ACL authentication failed
// true: rpc request check and authentication succeed
template <typename TRespType>
bool check_status_and_authz_with_reply(message_ex *req,
TRespType &response_struct,
const std::string &app_name = "");
template <typename TReqType, typename TRespType>
bool check_status_and_authz_with_reply(message_ex *msg);
template <typename TRpcHolder>
bool check_leader_status(TRpcHolder rpc, host_port *forward_address = nullptr);
error_code remote_storage_initialize();
bool check_freeze() const;
private:
friend class backup_engine_test;
friend class backup_service_test;
friend class bulk_load_service_test;
friend class meta_backup_service_test;
friend class meta_backup_test_base;
friend class meta_duplication_service;
friend class meta_http_service;
friend class meta_http_service_test;
friend class meta_partition_guardian_test;
friend class meta_service_test;
friend class meta_service_test_app;
friend class meta_split_service_test;
friend class meta_test_base;
friend class policy_context_test;
friend class server_state_restore_test;
friend class test::test_checker;
friend class fake_receiver_meta_service;
replication_options _opts;
meta_options _meta_opts;
int32_t _node_live_percentage_threshold_for_update;
std::unique_ptr<command_deregister> _ctrl_node_live_percentage_threshold_for_update;
std::shared_ptr<server_state> _state;
std::shared_ptr<meta_server_failure_detector> _failure_detector;
std::shared_ptr<dist::meta_state_service> _storage;
std::unique_ptr<mss::meta_storage> _meta_storage;
std::shared_ptr<server_load_balancer> _balancer;
std::shared_ptr<backup_service> _backup_handler;
std::shared_ptr<partition_guardian> _partition_guardian;
std::unique_ptr<meta_duplication_service> _dup_svc;
std::unique_ptr<meta_split_service> _split_svc;
std::unique_ptr<bulk_load_service> _bulk_load_svc;
// handle all the block filesystems for current meta service
// (in other words, current service node)
dist::block_service::block_service_manager _block_service_manager;
// [
// this is protected by failure_detector::_lock
std::set<host_port> _alive_set;
std::set<host_port> _dead_set;
// ]
mutable zrwlock_nr _meta_lock;
std::atomic_bool _started;
std::atomic_bool _recovering;
// reference replication.thrift for what the meta_function_level means
std::atomic<meta_function_level::type> _function_level;
std::string _cluster_root;
METRIC_VAR_DECLARE_counter(replica_server_disconnections);
METRIC_VAR_DECLARE_gauge_int64(unalive_replica_servers);
METRIC_VAR_DECLARE_gauge_int64(alive_replica_servers);
dsn::task_tracker _tracker;
std::shared_ptr<security::access_controller> _access_controller;
// Use Apache Ranger for access control, which is nullptr when not use
std::shared_ptr<ranger::ranger_resource_policy_manager> _ranger_resource_policy_manager;
// indicate which operation is processeding in meta server
std::atomic<meta_op_status> _meta_op_status;
};
template <typename TRpcHolder>
meta_leader_state meta_service::check_leader(TRpcHolder rpc, host_port *forward_address)
{
host_port leader;
if (!_failure_detector->get_leader(&leader)) {
if (!rpc.dsn_request()->header->context.u.is_forward_supported) {
if (forward_address != nullptr)
*forward_address = leader;
return meta_leader_state::kNotLeaderAndCannotForwardRpc;
}
if (!leader.is_invalid()) {
rpc.forward(dsn::dns_resolver::instance().resolve_address(leader));
return meta_leader_state::kNotLeaderAndCanForwardRpc;
} else {
if (forward_address != nullptr)
forward_address->reset();
return meta_leader_state::kNotLeaderAndCannotForwardRpc;
}
}
return meta_leader_state::kIsLeader;
}
template <typename TRpcHolder>
bool meta_service::check_leader_status(TRpcHolder rpc, host_port *forward_address)
{
auto result = check_leader(rpc, forward_address);
if (result == meta_leader_state::kNotLeaderAndCanForwardRpc)
return false;
if (result == meta_leader_state::kNotLeaderAndCannotForwardRpc || !_started) {
if (result == meta_leader_state::kNotLeaderAndCannotForwardRpc) {
rpc.response().err = ERR_FORWARD_TO_OTHERS;
} else if (_recovering) {
rpc.response().err = ERR_UNDER_RECOVERY;
} else {
rpc.response().err = ERR_SERVICE_NOT_ACTIVE;
}
LOG_INFO("reject request with {}", rpc.response().err);
return false;
}
return true;
}
// when the Ranger ACL is enabled, only the leader meta_server will pull Ranger policy, so if it is
// not the leader, _access_controller may be a null pointer, or a new leader is elected, and the
// above policy information may be out of date.
template <typename TRpcHolder>
bool meta_service::check_status_and_authz(TRpcHolder rpc,
host_port *forward_address,
const std::string &app_name)
{
if (!check_leader_status(rpc, forward_address)) {
return false;
}
if (!_access_controller->allowed(rpc.dsn_request(), app_name)) {
rpc.response().err = ERR_ACL_DENY;
LOG_DEBUG("not authorized {} to operate on app({}) for user({})",
rpc.dsn_request()->rpc_code(),
app_name,
rpc.dsn_request()->io_session->get_client_username());
return false;
}
return true;
}
template <typename TRespType>
bool meta_service::check_status_and_authz_with_reply(message_ex *req,
TRespType &response_struct,
const std::string &app_name)
{
auto result = check_leader(req, nullptr);
if (result == meta_leader_state::kNotLeaderAndCanForwardRpc) {
return false;
}
if (result == meta_leader_state::kNotLeaderAndCannotForwardRpc || !_started) {
if (result == meta_leader_state::kNotLeaderAndCannotForwardRpc) {
response_struct.err = ERR_FORWARD_TO_OTHERS;
} else if (_recovering) {
response_struct.err = ERR_UNDER_RECOVERY;
} else {
response_struct.err = ERR_SERVICE_NOT_ACTIVE;
}
LOG_DEBUG("reject request with {}", response_struct.err);
reply(req, response_struct);
return false;
}
if (!_access_controller->allowed(req, app_name)) {
response_struct.err = ERR_ACL_DENY;
LOG_DEBUG("not authorized {} to operate on app({}) for user({})",
req->rpc_code(),
app_name,
req->io_session->get_client_username());
reply(req, response_struct);
return false;
}
return true;
}
template <typename TReqType, typename TRespType>
bool meta_service::check_status_and_authz_with_reply(message_ex *msg)
{
TReqType req;
TRespType resp;
dsn::message_ex *copied_msg = message_ex::copy_message_no_reply(*msg);
dsn::unmarshall(copied_msg, req);
return check_status_and_authz_with_reply(msg, resp, req.app_name);
}
} // namespace replication
} // namespace dsn