blob: fe719a898773fdff32b36c480950d57b3e61c34b [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <ctype.h>
#include <algorithm>
#include <chrono>
#include <iosfwd>
#include <memory>
#include <unordered_set>
#include <utility>
// Disable class-memaccess warning to facilitate compilation with gcc>7
// https://github.com/Tencent/rapidjson/issues/1700
#pragma GCC diagnostic push
#if defined(__GNUC__) && __GNUC__ >= 8
#pragma GCC diagnostic ignored "-Wclass-memaccess"
#endif
#include <rapidjson/document.h>
#pragma GCC diagnostic pop
#include "common/replica_envs.h"
#include "common/replication.codes.h"
#include "common/replication_common.h"
#include "dsn.layer2_types.h"
#include "fmt/core.h"
#include "meta/meta_service.h"
#include "meta/meta_state_service.h"
#include "meta/server_state.h"
#include "meta_admin_types.h"
#include "ranger/ranger_resource_policy.h"
#include "ranger/ranger_resource_policy_manager.h"
#include "ranger_resource_policy_manager.h"
#include "rapidjson/allocators.h"
#include "runtime/task/async_calls.h"
#include "runtime/task/task.h"
#include "runtime/task/task_code.h"
#include "utils/blob.h"
#include "utils/filesystem.h"
#include "utils/flags.h"
#include "utils/fmt_logging.h"
#include "utils/process_utils.h"
#include "utils/strings.h"
namespace dsn {
namespace ranger {
DSN_DEFINE_uint32(security,
update_ranger_policy_interval_sec,
5,
"The interval seconds of meta "
"server to pull the latest "
"access control policy from "
"Ranger service.");
DSN_DEFINE_string(ranger, ranger_service_url, "", "Apache Ranger service url.");
DSN_DEFINE_string(ranger,
ranger_service_name,
"",
"The name of the policies defined in the Ranger service.");
DSN_DEFINE_string(ranger,
legacy_table_database_mapping_policy_name,
"__default__",
"The name of the Ranger database policy matched by the legacy table(The table "
"name does not follow the naming rules of {database_name}.{table_name})");
#define RETURN_ERR_IF_MISSING_MEMBER(obj, member) \
do { \
if (!obj.IsObject() || !obj.HasMember(member)) { \
return dsn::ERR_RANGER_PARSE_ACL; \
} \
} while (0)
#define CONTINUE_IF_MISSING_MEMBER(obj, member) \
do { \
if (!obj.IsObject() || !obj.HasMember(member)) { \
continue; \
} \
} while (0)
#define RETURN_ERR_IF_NOT_ARRAY(obj) \
do { \
if (!obj.IsArray() || obj.Empty()) { \
return dsn::ERR_RANGER_PARSE_ACL; \
} \
} while (0)
#define RETURN_VOID_IF_NOT_ARRAY(obj) \
do { \
if (!obj.IsArray() || obj.Empty()) { \
return; \
} \
} while (0)
namespace {
// Register access types of 'rpc_codes' as 'ac_type' to 'ac_type_of_rpc'.
// TODO(wanghao): A better way is to define the ac_type when defining rpc, and traverse all RPCs to
// register to avoid omission or duplication.
void register_rpc_access_type(access_type ac_type,
const std::vector<std::string> &rpc_codes,
access_type_of_rpc_code &ac_type_of_rpc)
{
for (const auto &rpc_code : rpc_codes) {
auto code = task_code::try_get(rpc_code, TASK_CODE_INVALID);
CHECK_NE(code, TASK_CODE_INVALID);
ac_type_of_rpc.emplace(code, ac_type);
}
}
// Used to map access_type matched resources policies json string.
const std::map<std::string, access_type> kAccessTypeMaping({{"READ", access_type::kRead},
{"WRITE", access_type::kWrite},
{"CREATE", access_type::kCreate},
{"DROP", access_type::kDrop},
{"LIST", access_type::kList},
{"METADATA", access_type::kMetadata},
{"CONTROL", access_type::kControl}});
// Pull policies in JSON format from Ranger service.
dsn::error_code pull_policies_from_ranger_service(std::string *ranger_policies)
{
std::string cmd =
fmt::format("curl {}/{}", FLAGS_ranger_service_url, FLAGS_ranger_service_name);
std::stringstream resp;
if (dsn::utils::pipe_execute(cmd.c_str(), resp) != 0) {
return dsn::ERR_SYNC_RANGER_POLICIES_FAILED;
}
*ranger_policies = resp.str();
return dsn::ERR_OK;
}
} // anonymous namespace
const std::chrono::milliseconds kLoadRangerPolicyRetryDelayMs(10000);
ranger_resource_policy_manager::ranger_resource_policy_manager(
dsn::replication::meta_service *meta_svc)
: _meta_svc(meta_svc), _local_policy_version(-1)
{
// GLOBAL - kMetadata
register_rpc_access_type(access_type::kMetadata,
{"RPC_CM_LIST_NODES", "RPC_CM_CLUSTER_INFO", "RPC_QUERY_DISK_INFO"},
_ac_type_of_global_rpcs);
// GLOBAL - kControl
register_rpc_access_type(access_type::kControl,
{"RPC_HTTP_SERVICE",
"RPC_CM_CONTROL_META",
"RPC_CM_START_RECOVERY",
"RPC_REPLICA_DISK_MIGRATE",
"RPC_ADD_NEW_DISK",
"RPC_DETECT_HOTKEY",
"RPC_CLI_CLI_CALL_ACK"},
_ac_type_of_global_rpcs);
// DATABASE - kList
register_rpc_access_type(access_type::kList, {"RPC_CM_LIST_APPS"}, _ac_type_of_database_rpcs);
// DATABASE - kCreate
register_rpc_access_type(
access_type::kCreate, {"RPC_CM_CREATE_APP"}, _ac_type_of_database_rpcs);
// DATABASE - kDrop
register_rpc_access_type(
access_type::kDrop, {"RPC_CM_DROP_APP", "RPC_CM_RECALL_APP"}, _ac_type_of_database_rpcs);
// DATABASE - kMetadata
register_rpc_access_type(access_type::kMetadata,
{"RPC_CM_QUERY_BACKUP_STATUS",
"RPC_CM_QUERY_RESTORE_STATUS",
"RPC_CM_QUERY_DUPLICATION",
"RPC_CM_QUERY_PARTITION_SPLIT",
"RPC_CM_QUERY_BULK_LOAD_STATUS",
"RPC_CM_QUERY_MANUAL_COMPACT_STATUS",
"RPC_CM_GET_MAX_REPLICA_COUNT"},
_ac_type_of_database_rpcs);
// DATABASE - kControl
register_rpc_access_type(access_type::kControl,
{"RPC_CM_START_BACKUP_APP",
"RPC_CM_START_RESTORE",
"RPC_CM_PROPOSE_BALANCER",
"RPC_CM_ADD_DUPLICATION",
"RPC_CM_MODIFY_DUPLICATION",
"RPC_CM_UPDATE_APP_ENV",
"RPC_CM_DDD_DIAGNOSE",
"RPC_CM_START_PARTITION_SPLIT",
"RPC_CM_CONTROL_PARTITION_SPLIT",
"RPC_CM_START_BULK_LOAD",
"RPC_CM_CONTROL_BULK_LOAD",
"RPC_CM_CLEAR_BULK_LOAD",
"RPC_CM_START_MANUAL_COMPACT",
"RPC_CM_SET_MAX_REPLICA_COUNT",
"RPC_CM_RENAME_APP"},
_ac_type_of_database_rpcs);
}
void ranger_resource_policy_manager::start()
{
CHECK_NOTNULL(_meta_svc, "");
_ranger_policy_meta_root = dsn::utils::filesystem::concat_path_unix_style(
_meta_svc->cluster_root(), "ranger_policy_meta_root");
tasking::enqueue_timer(LPC_USE_RANGER_ACCESS_CONTROL,
&_tracker,
[this]() { this->update_policies_from_ranger_service(); },
std::chrono::seconds(FLAGS_update_ranger_policy_interval_sec),
0,
std::chrono::milliseconds(1));
}
access_control_result ranger_resource_policy_manager::allowed(
const int rpc_code, const std::string &user_name, const std::string &database_name) const
{
do {
const auto &ac_type = _ac_type_of_global_rpcs.find(rpc_code);
// It's not a GLOBAL rpc code.
if (ac_type == _ac_type_of_global_rpcs.end()) {
break;
}
// Check if it is denied by any GLOBAL policy.
utils::auto_read_lock l(_global_policies_lock);
return check_ranger_resource_policy_allowed(_global_policies_cache,
ac_type->second,
user_name,
match_database_type::kNotNeed,
"",
"");
} while (false);
do {
const auto &ac_type = _ac_type_of_database_rpcs.find(rpc_code);
// It's not a DATABASE rpc code.
if (ac_type == _ac_type_of_database_rpcs.end()) {
break;
}
utils::auto_read_lock l(_database_policies_lock);
return check_ranger_resource_policy_allowed(
_database_policies_cache,
ac_type->second,
user_name,
match_database_type::kNeed,
database_name,
FLAGS_legacy_table_database_mapping_policy_name);
} while (false);
// The check that does not match any resource returns false.
return access_control_result::kDenied;
}
void ranger_resource_policy_manager::parse_policies_from_json(const rapidjson::Value &data,
std::vector<policy_item> &policies)
{
CHECK(policies.empty(), "Ranger policy list should not be empty.");
RETURN_VOID_IF_NOT_ARRAY(data);
for (const auto &item : data.GetArray()) {
CONTINUE_IF_MISSING_MEMBER(item, "accesses");
policy_item pi;
for (const auto &access : item["accesses"].GetArray()) {
CONTINUE_IF_MISSING_MEMBER(access, "isAllowed");
CONTINUE_IF_MISSING_MEMBER(access, "type");
if (access["isAllowed"].GetBool()) {
std::string type = access["type"].GetString();
std::transform(type.begin(), type.end(), type.begin(), toupper);
const auto &at = kAccessTypeMaping.find(type);
// ignore invalid access_type
if (kAccessTypeMaping.end() != at) {
pi.access_types |= at->second;
}
}
}
CONTINUE_IF_MISSING_MEMBER(item, "users");
for (const auto &user : item["users"].GetArray()) {
pi.users.emplace(user.GetString());
}
policies.emplace_back(pi);
}
}
dsn::error_code ranger_resource_policy_manager::update_policies_from_ranger_service()
{
std::string ranger_policies;
LOG_AND_RETURN_NOT_OK(
ERROR, pull_policies_from_ranger_service(&ranger_policies), "Pull Ranger policies failed.");
LOG_DEBUG("Pull Ranger policies success.");
auto err_code = load_policies_from_json(ranger_policies);
if (err_code == dsn::ERR_RANGER_POLICIES_NO_NEED_UPDATE) {
LOG_DEBUG("Skip to update local policies.");
// For the newly created table, its app envs must be empty. This needs to be executed
// periodically to update the table's app envs, regardless of whether the Ranger policy is
// updated or not.
LOG_AND_RETURN_NOT_OK(
ERROR, sync_policies_to_app_envs(), "Sync policies to app envs failed.");
return dsn::ERR_OK;
}
LOG_AND_RETURN_NOT_OK(ERROR, err_code, "Parse Ranger policies failed.");
start_to_dump_and_sync_policies();
return dsn::ERR_OK;
}
dsn::error_code ranger_resource_policy_manager::load_policies_from_json(const std::string &data)
{
// The Ranger policy pulled from Ranger service demo.
/*
{
"serviceName": "PEGASUS1",
"serviceId": 1069,
"policyVersion": 60,
"policyUpdateTime": 1673254471000,
"policies": [{
"id": 5334,
"guid": "c7918f8c-921a-4f3d-b9d7-bce7009ee5f8",
"isEnabled": true,
"version": 13,
"service": "PEGASUS1",
"name": "all - database",
"policyType": 0,
"policyPriority": 0,
"description": "Policy for all - database",
"isAuditEnabled": true,
"resources": {
"database": {
"values": ["PEGASUS1"],
"isExcludes": false,
"isRecursive": true
}
},
"policyItems": [{
"accesses": [{
"type": "create",
"isAllowed": true
}, {
"type": "drop",
"isAllowed": true
}, {
"type": "control",
"isAllowed": true
}, {
"type": "metadata",
"isAllowed": true
}, {
"type": "list",
"isAllowed": true
}],
"users": ["PEGASUS1"],
"groups": [],
"roles": [],
"conditions": [],
"delegateAdmin": true
}],
"denyPolicyItems": [],
"allowExceptions": [],
"denyExceptions": [],
"dataMaskPolicyItems": [],
"rowFilterPolicyItems": [],
"serviceType": "pegasus",
"options": {},
"validitySchedules": [],
"policyLabels": [],
"zoneName": "",
"isDenyAllElse": false
}],
"auditMode": "audit-default",
"serviceConfig": {}
}
*/
rapidjson::Document doc;
doc.Parse(data.c_str());
// Check if it is needed to update policies.
RETURN_ERR_IF_MISSING_MEMBER(doc, "policyVersion");
int remote_policy_version = doc["policyVersion"].GetInt();
if (_local_policy_version == remote_policy_version) {
LOG_DEBUG("Ranger policy version: {}, no need to update.", _local_policy_version);
return dsn::ERR_RANGER_POLICIES_NO_NEED_UPDATE;
}
if (_local_policy_version > remote_policy_version) {
LOG_WARNING("Local Ranger policy version ({}) is larger than remote version ({}), please "
"check Ranger services ({}).",
_local_policy_version,
remote_policy_version,
FLAGS_ranger_service_name);
return dsn::ERR_RANGER_POLICIES_NO_NEED_UPDATE;
}
_local_policy_version = remote_policy_version;
// Update policies.
_all_resource_policies.clear();
// TODO(wanghao): it's optional
// Provide a DATABASE default policy for legacy tables.
// ranger_resource_policy default_database_policy;
// ranger_resource_policy::create_default_database_policy(default_database_policy);
// _all_resource_policies[enum_to_string(resource_type::kDatabase)] = {default_database_policy};
RETURN_ERR_IF_MISSING_MEMBER(doc, "policies");
const rapidjson::Value &policies = doc["policies"];
RETURN_ERR_IF_NOT_ARRAY(policies);
for (const auto &policy : policies.GetArray()) {
RETURN_ERR_IF_MISSING_MEMBER(policy, "isEnabled");
// 1. Check if the policy is enabled or not.
if (!policy["isEnabled"].IsBool() || !policy["isEnabled"].GetBool()) {
continue;
}
// 2. Parse resource type.
RETURN_ERR_IF_MISSING_MEMBER(policy, "resources");
std::map<std::string, std::unordered_set<std::string>> values_of_resource_type;
for (const auto &resource : policy["resources"].GetObject()) {
RETURN_ERR_IF_MISSING_MEMBER(resource.value, "values");
RETURN_ERR_IF_NOT_ARRAY((resource.value)["values"]);
std::unordered_set<std::string> values;
for (const auto &v : (resource.value)["values"].GetArray()) {
values.insert(v.GetString());
}
values_of_resource_type.emplace(std::make_pair(resource.name.GetString(), values));
}
// 3. Construct ACL policy.
ranger_resource_policy resource_policy;
CONTINUE_IF_MISSING_MEMBER(policy, "name");
resource_policy.name = policy["name"].GetString();
resource_type rt = resource_type::kUnknown;
do {
// TODO(wanghao): refactor the following code
// parse Ranger policies json string into `values_of_resource_type`, distinguish
// resource types by `values_of_resource_type.size()`
if (values_of_resource_type.size() == 1) {
auto iter = values_of_resource_type.find("global");
if (iter != values_of_resource_type.end()) {
rt = resource_type::kGlobal;
break;
}
iter = values_of_resource_type.find("database");
if (iter != values_of_resource_type.end()) {
resource_policy.database_names = iter->second;
rt = resource_type::kDatabase;
break;
}
} else if (values_of_resource_type.size() == 2) {
auto iter1 = values_of_resource_type.find("database");
auto iter2 = values_of_resource_type.find("table");
if (iter1 != values_of_resource_type.end() &&
iter2 != values_of_resource_type.end()) {
resource_policy.database_names = iter1->second;
resource_policy.table_names = iter2->second;
rt = resource_type::kDatabaseTable;
break;
}
}
return dsn::ERR_RANGER_PARSE_ACL;
} while (false);
parse_policies_from_json(policy["policyItems"], resource_policy.policies.allow_policies);
parse_policies_from_json(policy["denyPolicyItems"], resource_policy.policies.deny_policies);
parse_policies_from_json(policy["allowExceptions"],
resource_policy.policies.allow_policies_exclude);
parse_policies_from_json(policy["denyExceptions"],
resource_policy.policies.deny_policies_exclude);
// 4. Add the ACL policy.
auto ret = _all_resource_policies.emplace(enum_to_string(rt),
resource_policies({resource_policy}));
if (!ret.second) {
ret.first->second.emplace_back(resource_policy);
}
}
return dsn::ERR_OK;
}
void ranger_resource_policy_manager::start_to_dump_and_sync_policies()
{
LOG_DEBUG("Start to create Ranger policy meta root on remote storage.");
dsn::task_ptr sync_task = dsn::tasking::create_task(
LPC_USE_RANGER_ACCESS_CONTROL, &_tracker, [this]() { dump_and_sync_policies(); });
_meta_svc->get_remote_storage()->create_node(
_ranger_policy_meta_root,
LPC_USE_RANGER_ACCESS_CONTROL,
[this, sync_task](dsn::error_code err) {
if (err == dsn::ERR_OK || err == dsn::ERR_NODE_ALREADY_EXIST) {
LOG_DEBUG("Create Ranger policy meta root succeed.");
sync_task->enqueue();
return;
}
CHECK_EQ(err, dsn::ERR_TIMEOUT);
LOG_ERROR("Create Ranger policy meta root timeout, retry later.");
dsn::tasking::enqueue(LPC_USE_RANGER_ACCESS_CONTROL,
&_tracker,
[this]() { start_to_dump_and_sync_policies(); },
0,
kLoadRangerPolicyRetryDelayMs);
});
}
void ranger_resource_policy_manager::dump_and_sync_policies()
{
LOG_DEBUG("Start to sync Ranger policies to remote storage.");
dump_policies_to_remote_storage();
LOG_DEBUG("Dump Ranger policies to remote storage succeed.");
update_cached_policies();
LOG_DEBUG("Update using resources policies succeed.");
if (dsn::ERR_OK != sync_policies_to_app_envs()) {
LOG_ERROR("Sync policies to app envs failed.");
}
}
void ranger_resource_policy_manager::dump_policies_to_remote_storage()
{
dsn::blob value = json::json_forwarder<all_resource_policies>::encode(_all_resource_policies);
_meta_svc->get_remote_storage()->set_data(
_ranger_policy_meta_root, value, LPC_USE_RANGER_ACCESS_CONTROL, [this](dsn::error_code e) {
if (e == dsn::ERR_OK) {
LOG_DEBUG("Dump Ranger policies to remote storage succeed.");
return;
}
// The return error code is not 'ERR_TIMEOUT', use assert here.
CHECK_EQ(e, dsn::ERR_TIMEOUT);
LOG_ERROR("Dump Ranger policies to remote storage timeout, retry later.");
dsn::tasking::enqueue(LPC_USE_RANGER_ACCESS_CONTROL,
&_tracker,
[this]() { dump_policies_to_remote_storage(); },
0,
kLoadRangerPolicyRetryDelayMs);
});
}
void ranger_resource_policy_manager::update_cached_policies()
{
{
utils::auto_write_lock l(_global_policies_lock);
_global_policies_cache.swap(_all_resource_policies[enum_to_string(resource_type::kGlobal)]);
// TODO(wanghao): provide a query method
}
{
utils::auto_write_lock l(_database_policies_lock);
_database_policies_cache.swap(
_all_resource_policies[enum_to_string(resource_type::kDatabase)]);
// TODO(wanghao): provide a query method
}
}
dsn::error_code ranger_resource_policy_manager::sync_policies_to_app_envs()
{
const auto &table_policies =
_all_resource_policies.find(enum_to_string(resource_type::kDatabaseTable));
if (table_policies == _all_resource_policies.end()) {
LOG_INFO("DATABASE_TABLE level policy is empty, skip to sync app envs.");
return dsn::ERR_OK;
}
dsn::replication::configuration_list_apps_response list_resp;
dsn::replication::configuration_list_apps_request list_req;
list_req.status = dsn::app_status::AS_AVAILABLE;
_meta_svc->get_server_state()->list_apps(list_req, list_resp);
LOG_AND_RETURN_NOT_OK(ERROR, list_resp.err, "list_apps failed.");
for (const auto &app : list_resp.infos) {
std::string database_name = get_database_name_from_app_name(app.app_name);
// Use 'legacy_table_database_mapping_policy_name' for table name of invalid Ranger rules to
// match datdabase resources.
if (database_name.empty()) {
database_name = FLAGS_legacy_table_database_mapping_policy_name;
}
std::string table_name = get_table_name_from_app_name(app.app_name);
auto req = std::make_unique<dsn::replication::configuration_update_app_env_request>();
req->__set_app_name(app.app_name);
req->__set_keys({dsn::replica_envs::REPLICA_ACCESS_CONTROLLER_RANGER_POLICIES});
std::vector<matched_database_table_policy> matched_database_table_policies;
for (const auto &policy : table_policies->second) {
// If this table does not match any database, this policy will be skipped and will not
// be written into app_envs.
if (policy.database_names.count(database_name) == 0 &&
policy.database_names.count("*") == 0) {
continue;
}
// If this table does not match any database table, this policy will be skipped and will
// not be written into app_envs.
if (policy.table_names.count(table_name) == 0 && policy.table_names.count("*") == 0) {
continue;
}
// This table matches a policy.
matched_database_table_policy database_table_policy(
{database_name, table_name, policy.policies});
// This table matches the policy whose database is "*".
if (policy.database_names.count(database_name) == 0) {
CHECK(policy.database_names.count("*") != 0,
"the list of database_name must contain *");
database_table_policy.matched_database_name = "*";
}
// This table matches the policy whose database table is "*".
if (policy.table_names.count(table_name) == 0) {
CHECK(policy.table_names.count("*") != 0, "the list of table_name must contain *");
database_table_policy.matched_table_name = "*";
}
matched_database_table_policies.emplace_back(database_table_policy);
}
if (matched_database_table_policies.empty()) {
// There is no matched policy, clear app Ranger policy
req->__set_op(dsn::replication::app_env_operation::type::APP_ENV_OP_DEL);
dsn::replication::update_app_env_rpc rpc(std::move(req), LPC_USE_RANGER_ACCESS_CONTROL);
_meta_svc->get_server_state()->del_app_envs(rpc);
_meta_svc->get_server_state()->wait_all_task();
LOG_AND_RETURN_NOT_OK(ERROR, rpc.response().err, "del_app_envs failed.");
} else {
req->__set_op(dsn::replication::app_env_operation::type::APP_ENV_OP_SET);
req->__set_values(
{json::json_forwarder<std::vector<matched_database_table_policy>>::encode(
matched_database_table_policies)
.to_string()});
dsn::replication::update_app_env_rpc rpc(std::move(req), LPC_USE_RANGER_ACCESS_CONTROL);
_meta_svc->get_server_state()->set_app_envs(rpc);
_meta_svc->get_server_state()->wait_all_task();
LOG_AND_RETURN_NOT_OK(ERROR, rpc.response().err, "set_app_envs failed.");
}
}
LOG_DEBUG("Sync policies to app envs succeeded.");
return dsn::ERR_OK;
}
std::string get_database_name_from_app_name(const std::string &app_name)
{
std::string prefix = utils::find_string_prefix(app_name, '.');
if (prefix.empty() || prefix == app_name) {
return std::string();
}
return prefix;
}
std::string get_table_name_from_app_name(const std::string &app_name)
{
std::string database_name = get_database_name_from_app_name(app_name);
return database_name.empty() ? app_name : app_name.substr(database_name.size() + 1);
}
} // namespace ranger
} // namespace dsn