| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "kudu/ranger/ranger_client.h" |
| |
| #include <algorithm> |
| #include <cstdlib> |
| #include <memory> |
| #include <ostream> |
| #include <string> |
| #include <utility> |
| #include <vector> |
| |
| #include <gflags/gflags.h> |
| #include <glog/logging.h> |
| |
| #include "kudu/common/table_util.h" |
| #include "kudu/gutil/macros.h" |
| #include "kudu/gutil/map-util.h" |
| #include "kudu/gutil/strings/join.h" |
| #include "kudu/gutil/strings/substitute.h" |
| #include "kudu/ranger/ranger.pb.h" |
| #include "kudu/security/init.h" |
| #include "kudu/subprocess/server.h" |
| #include "kudu/util/env.h" |
| #include "kudu/util/flag_tags.h" |
| #include "kudu/util/flag_validators.h" |
| #include "kudu/util/metrics.h" |
| #include "kudu/util/net/net_util.h" |
| #include "kudu/util/path_util.h" |
| #include "kudu/util/scoped_cleanup.h" |
| #include "kudu/util/slice.h" |
| #include "kudu/util/status.h" |
| #include "kudu/util/string_case.h" |
| #include "kudu/util/subprocess.h" |
| |
| DEFINE_string(ranger_config_path, "", |
| "Path to directory containing Ranger client configuration. " |
| "When set, Ranger integration is enabled, fine-grained access " |
| "control is enforced, and clients are issued authorization " |
| "tokens. In addition, both --ranger_java_path and --ranger_jar_path " |
| "flags need to be set properly for Ranger integration to work."); |
| |
| DEFINE_string(ranger_java_path, "", |
| "Path where the Java binary was installed. If the value " |
| "isn't an absolute path (e.g. 'java'), it will be evaluated " |
| "using the Kudu user's PATH. If not specified, $JAVA_HOME/bin/java " |
| "is used. If $JAVA_HOME is not found, Kudu will attempt to " |
| "find 'java' in the Kudu user's PATH."); |
| |
| DEFINE_string(ranger_jar_path, "", |
| "Path to the JAR file containing the Ranger subprocess. If " |
| "not specified, the default JAR file path is expected to be " |
| "next to the master binary."); |
| |
| DEFINE_string(ranger_receiver_fifo_dir, "", |
| "Directory in which to create a fifo used to receive messages " |
| "from the Ranger subprocess. Existing fifos at this path will be " |
| "overwritten. If not specified, a fifo will be created in the " |
| "--ranger_config_path directory."); |
| TAG_FLAG(ranger_receiver_fifo_dir, advanced); |
| |
| DEFINE_string(ranger_log_config_dir, "", |
| "Directory in which to look for a kudu-ranger-subprocess-log4j2.properties " |
| "file. If empty, will use the value of --log_dir. If such a file does not " |
| "exist, a properties file will be created to honor Kudu's logging " |
| "configurations."); |
| TAG_FLAG(ranger_log_config_dir, advanced); |
| TAG_FLAG(ranger_log_config_dir, evolving); |
| |
| DEFINE_bool(ranger_overwrite_log_config, true, |
| "Whether to overwrite any existing logging configuration file, if found."); |
| TAG_FLAG(ranger_overwrite_log_config, advanced); |
| TAG_FLAG(ranger_overwrite_log_config, evolving); |
| |
| DEFINE_string(ranger_log_level, "info", |
| "Log level to use in the Ranger Java subprocess. Supports \"all\", \"trace\", " |
| "\"debug\", \"info\", \"warn\", \"error\", \"fatal\", and \"off\""); |
| TAG_FLAG(ranger_log_level, advanced); |
| TAG_FLAG(ranger_log_level, evolving); |
| |
| DEFINE_bool(ranger_logtostdout, false, |
| "Whether to have the Ranger subprocess log to stdout."); |
| TAG_FLAG(ranger_logtostdout, advanced); |
| TAG_FLAG(ranger_logtostdout, evolving); |
| |
| DECLARE_int32(max_log_files); |
| DECLARE_uint32(max_log_size); |
| DECLARE_string(log_dir); |
| |
| METRIC_DEFINE_histogram(server, ranger_subprocess_execution_time_ms, |
| "Ranger subprocess execution time (ms)", |
| kudu::MetricUnit::kMilliseconds, |
| "Duration of time in ms spent executing the Ranger subprocess request, excluding " |
| "time spent spent in the subprocess queues", |
| kudu::MetricLevel::kInfo, |
| 60000LU, 1); |
| METRIC_DEFINE_histogram(server, ranger_subprocess_inbound_queue_length, |
| "Ranger subprocess inbound queue length", |
| kudu::MetricUnit::kMessages, |
| "Number of request messages in the Ranger subprocess' inbound request queue", |
| kudu::MetricLevel::kInfo, |
| 1000, 1); |
| METRIC_DEFINE_histogram(server, ranger_subprocess_inbound_queue_time_ms, |
| "Ranger subprocess inbound queue time (ms)", |
| kudu::MetricUnit::kMilliseconds, |
| "Duration of time in ms spent in the Ranger subprocess' inbound request queue", |
| kudu::MetricLevel::kInfo, |
| 60000LU, 1); |
| METRIC_DEFINE_histogram(server, ranger_subprocess_outbound_queue_length, |
| "Ranger subprocess outbound queue length", |
| kudu::MetricUnit::kMessages, |
| "Number of request messages in the Ranger subprocess' outbound response queue", |
| kudu::MetricLevel::kInfo, |
| 1000, 1); |
| METRIC_DEFINE_histogram(server, ranger_subprocess_outbound_queue_time_ms, |
| "Ranger subprocess outbound queue time (ms)", |
| kudu::MetricUnit::kMilliseconds, |
| "Duration of time in ms spent in the Ranger subprocess' outbound response queue", |
| kudu::MetricLevel::kInfo, |
| 60000LU, 1); |
| METRIC_DEFINE_histogram(server, ranger_server_inbound_queue_size_bytes, |
| "Ranger server inbound queue size (bytes)", |
| kudu::MetricUnit::kBytes, |
| "Number of bytes in the inbound response queue of the Ranger server, recorded " |
| "at the time a new response is read from the pipe and added to the inbound queue", |
| kudu::MetricLevel::kInfo, |
| 4 * 1024 * 1024, 1); |
| METRIC_DEFINE_histogram(server, ranger_server_inbound_queue_time_ms, |
| "Ranger server inbound queue time (ms)", |
| kudu::MetricUnit::kMilliseconds, |
| "Duration of time in ms spent in the Ranger server's inbound response queue", |
| kudu::MetricLevel::kInfo, |
| 60000LU, 1); |
| METRIC_DEFINE_histogram(server, ranger_server_outbound_queue_size_bytes, |
| "Ranger server outbound queue size (bytes)", |
| kudu::MetricUnit::kBytes, |
| "Number of bytes in the outbound request queue of the Ranger server, recorded " |
| "at the time a new request is added to the outbound request queue", |
| kudu::MetricLevel::kInfo, |
| 4 * 1024 * 1024, 1); |
| METRIC_DEFINE_histogram(server, ranger_server_outbound_queue_time_ms, |
| "Ranger server outbound queue time (ms)", |
| kudu::MetricUnit::kMilliseconds, |
| "Duration of time in ms spent in the Ranger server's outbound request queue", |
| kudu::MetricLevel::kInfo, |
| 60000LU, 1); |
| |
| DECLARE_string(keytab_file); |
| DECLARE_string(principal); |
| |
| namespace kudu { |
| namespace ranger { |
| |
| using kudu::security::GetKrb5ConfigFile; |
| using kudu::subprocess::SubprocessMetrics; |
| using kudu::subprocess::SubprocessServer; |
| using std::move; |
| using std::pair; |
| using std::string; |
| using std::unique_ptr; |
| using std::unordered_map; |
| using std::unordered_set; |
| using std::vector; |
| using strings::Substitute; |
| |
| namespace { |
| |
| const char* kDenyNonRangerTableTemplate = "Denying action on table with invalid name $0. " |
| "Use 'kudu table rename_table' to rename it to " |
| "a Ranger-compatible name."; |
| const char* kMainClass = "org.apache.kudu.subprocess.ranger.RangerSubprocessMain"; |
| const char* kRangerClientLogFilename = "kudu-ranger-subprocess"; |
| const char* kRangerClientPropertiesFilename = "kudu-ranger-subprocess-log4j2.properties"; |
| |
| // Returns the path to the JAR file containing the Ranger subprocess. |
| string RangerJarPath() { |
| if (FLAGS_ranger_jar_path.empty()) { |
| string exe; |
| CHECK_OK(Env::Default()->GetExecutablePath(&exe)); |
| const string bin_dir = DirName(exe); |
| return JoinPathSegments(bin_dir, "kudu-subprocess.jar"); |
| } |
| return FLAGS_ranger_jar_path; |
| } |
| |
| // Returns the classpath to be used for the Ranger subprocess. |
| string JavaClasspath() { |
| DCHECK(!FLAGS_ranger_config_path.empty()); |
| return Substitute("$0:$1", RangerJarPath(), FLAGS_ranger_config_path); |
| } |
| |
| string RangerFifoBase() { |
| DCHECK(!FLAGS_ranger_config_path.empty()); |
| const string& fifo_dir = FLAGS_ranger_receiver_fifo_dir.empty() ? |
| FLAGS_ranger_config_path : FLAGS_ranger_receiver_fifo_dir; |
| return JoinPathSegments(fifo_dir, "ranger_receiever_fifo"); |
| } |
| |
| string JavaPath() { |
| if (FLAGS_ranger_java_path.empty()) { |
| auto java_home = getenv("JAVA_HOME"); |
| if (!java_home) { |
| return "java"; |
| } |
| return JoinPathSegments(java_home, "bin/java"); |
| } |
| return FLAGS_ranger_java_path; |
| } |
| |
| bool ValidateRangerConfiguration() { |
| if (!FLAGS_ranger_config_path.empty()) { |
| // First, check the specified Java path. |
| const string java_path = JavaPath(); |
| if (!Env::Default()->FileExists(java_path)) { |
| // Otherwise, since the specified path is not absolute, check if |
| // the Java binary is on the PATH. |
| string p; |
| Status s = Subprocess::Call({ "which", java_path }, "", &p); |
| if (!s.ok()) { |
| LOG(ERROR) << Substitute("--ranger_java_path has invalid java binary path: $0", |
| java_path); |
| return false; |
| } |
| } |
| const string ranger_jar_path = RangerJarPath(); |
| if (!Env::Default()->FileExists(ranger_jar_path)) { |
| LOG(ERROR) << Substitute("--ranger_jar_path has invalid JAR file path: $0", |
| ranger_jar_path); |
| return false; |
| } |
| } |
| return true; |
| } |
| GROUP_FLAG_VALIDATOR(ranger_config_flags, ValidateRangerConfiguration); |
| |
| bool ValidateLog4jLevel(const char* /*flagname*/, const string& value) { |
| static const vector<string> kLevels = { |
| "all", |
| "trace", |
| "debug", |
| "info", |
| "warn", |
| "error", |
| "fatal", |
| "off", |
| }; |
| string vlower = value; |
| ToLowerCase(&vlower); |
| if (std::any_of(kLevels.begin(), kLevels.end(), |
| [&vlower] (const string& level) { return level == vlower; })) { |
| return true; |
| } |
| LOG(ERROR) << Substitute("expected one of {$0} but got $1", |
| JoinStrings(kLevels, ", "), value); |
| return false; |
| } |
| DEFINE_validator(ranger_log_level, &ValidateLog4jLevel); |
| |
| Status GetOrCreateLog4j2PropertiesFile(Env* env, string* logging_properties_path) { |
| const string log_conf_dir = FLAGS_ranger_log_config_dir.empty() ? |
| FLAGS_log_dir : FLAGS_ranger_log_config_dir; |
| // It's generally expected that --log_dir has already been created elsewhere. |
| if (!FLAGS_ranger_log_config_dir.empty() && !env->FileExists(log_conf_dir)) { |
| RETURN_NOT_OK(env->CreateDir(log_conf_dir)); |
| } |
| const string log4j2_properties_path = JoinPathSegments(log_conf_dir, |
| kRangerClientPropertiesFilename); |
| string file_state; |
| bool should_create_file = true; |
| if (env->FileExists(log4j2_properties_path)) { |
| if (FLAGS_ranger_overwrite_log_config) { |
| file_state = "overwritten"; |
| } else { |
| file_state = "existing"; |
| should_create_file = false; |
| } |
| } else { |
| file_state = "new"; |
| } |
| if (should_create_file) { |
| // Write our new properties file to a tmp file first so other processes |
| // don't read a partial file (not expected, but just in case). |
| unique_ptr<WritableFile> tmp_file; |
| string tmp_path; |
| WritableFileOptions opts; |
| opts.is_sensitive = false; |
| RETURN_NOT_OK(env->NewTempWritableFile(opts, |
| Substitute("$0.XXXXXX", log4j2_properties_path), |
| &tmp_path, &tmp_file)); |
| // If anything fails, clean up the tmp file. |
| auto tmp_deleter = MakeScopedCleanup([&] { |
| WARN_NOT_OK(env->DeleteFile(tmp_path), |
| Substitute("Couldn't clean up tmp file $0", tmp_path)); |
| }); |
| string exe; |
| RETURN_NOT_OK(env->GetExecutablePath(&exe)); |
| const string program_name = BaseName(exe); |
| string hostname; |
| RETURN_NOT_OK(GetHostname(&hostname)); |
| const string log_filename = Substitute("$0.$1", kRangerClientLogFilename, hostname); |
| RETURN_NOT_OK(tmp_file->Append( |
| subprocess::Log4j2Properties(program_name, FLAGS_log_dir, log_filename, |
| FLAGS_max_log_size, FLAGS_max_log_files, |
| FLAGS_ranger_log_level, |
| FLAGS_ranger_logtostdout))); |
| RETURN_NOT_OK(tmp_file->Sync()); |
| RETURN_NOT_OK(tmp_file->Close()); |
| // Now atomically swap in our file. |
| RETURN_NOT_OK_PREPEND(env->RenameFile(tmp_path, log4j2_properties_path), |
| Substitute("Failed to rename tmp file $0 to $1", tmp_path, log4j2_properties_path)); |
| tmp_deleter.cancel(); |
| } |
| LOG(INFO) << Substitute("Using $0 properties file: $1", |
| file_state, log4j2_properties_path); |
| *logging_properties_path = log4j2_properties_path; |
| return Status::OK(); |
| } |
| |
| // Builds the arguments to start the Ranger subprocess with the given receiver |
| // fifo path and logging properties file. Specifically pass the principal and |
| // keytab file that the Ranger subprocess will log in with if Kerberos is |
| // enabled. 'args' has the final arguments. Returns 'OK' if arguments |
| // successfully created, error otherwise. |
| Status BuildArgv(const string& fifo_path, const string& log_properties_path, |
| vector<string>* argv) { |
| DCHECK(argv); |
| DCHECK(!FLAGS_ranger_config_path.empty()); |
| // Pass the required arguments to run the Ranger subprocess. |
| vector<string> ret = { |
| JavaPath(), |
| Substitute("-Djava.security.krb5.conf=$0", GetKrb5ConfigFile()), |
| Substitute("-Dlog4j2.configurationFile=$0", log_properties_path), |
| "-cp", JavaClasspath(), kMainClass, |
| }; |
| // When Kerberos is enabled in Kudu, pass both Kudu principal and keytab file |
| // to the Ranger subprocess. |
| if (!FLAGS_keytab_file.empty()) { |
| string configured_principal; |
| RETURN_NOT_OK_PREPEND(security::GetConfiguredPrincipal(FLAGS_principal, &configured_principal), |
| "unable to get the configured principal from for the Ranger subprocess"); |
| ret.emplace_back("-i"); |
| ret.emplace_back(std::move(configured_principal)); |
| ret.emplace_back("-k"); |
| ret.emplace_back(FLAGS_keytab_file); |
| } |
| ret.emplace_back("-o"); |
| ret.emplace_back(fifo_path); |
| *argv = std::move(ret); |
| return Status::OK(); |
| } |
| |
| } // anonymous namespace |
| |
| #define HISTINIT(member, x) member = METRIC_##x.Instantiate(entity) |
| RangerSubprocessMetrics::RangerSubprocessMetrics(const scoped_refptr<MetricEntity>& entity) { |
| HISTINIT(sp_inbound_queue_length, ranger_subprocess_inbound_queue_length); |
| HISTINIT(sp_inbound_queue_time_ms, ranger_subprocess_inbound_queue_time_ms); |
| HISTINIT(sp_outbound_queue_length, ranger_subprocess_outbound_queue_length); |
| HISTINIT(sp_outbound_queue_time_ms, ranger_subprocess_outbound_queue_time_ms); |
| HISTINIT(sp_execution_time_ms, ranger_subprocess_execution_time_ms); |
| HISTINIT(server_inbound_queue_size_bytes, ranger_server_inbound_queue_size_bytes); |
| HISTINIT(server_inbound_queue_time_ms, ranger_server_inbound_queue_time_ms); |
| HISTINIT(server_outbound_queue_size_bytes, ranger_server_outbound_queue_size_bytes); |
| HISTINIT(server_outbound_queue_time_ms, ranger_server_outbound_queue_time_ms); |
| } |
| #undef HISTINIT |
| |
| RangerClient::RangerClient(Env* env, const scoped_refptr<MetricEntity>& metric_entity) |
| : env_(env), metric_entity_(metric_entity) { |
| DCHECK(metric_entity); |
| } |
| |
| Status RangerClient::Start() { |
| VLOG(1) << "Initializing Ranger subprocess server"; |
| string log_properties_path; |
| RETURN_NOT_OK(GetOrCreateLog4j2PropertiesFile(env_, &log_properties_path)); |
| const string fifo_path = SubprocessServer::FifoPath(RangerFifoBase()); |
| vector<string> argv; |
| RETURN_NOT_OK(BuildArgv(fifo_path, log_properties_path, &argv)); |
| subprocess_.reset(new RangerSubprocess(env_, fifo_path, std::move(argv), metric_entity_, |
| "Ranger client subprocess")); |
| return subprocess_->Start(); |
| } |
| |
| // TODO(abukor): refactor to avoid code duplication |
| Status RangerClient::AuthorizeAction(const string& user_name, const ActionPB& action, |
| const string& database, const string& table, bool is_owner, |
| bool requires_delegate_admin, bool* authorized, |
| Scope scope) { |
| DCHECK(subprocess_); |
| RangerRequestListPB req_list; |
| RangerResponseListPB resp_list; |
| req_list.set_user(user_name); |
| |
| RangerRequestPB* req = req_list.add_requests(); |
| |
| req->set_action(action); |
| req->set_database(database); |
| req->set_requires_delegate_admin(requires_delegate_admin); |
| req->set_is_owner(is_owner); |
| // Only pass the table name if this is table level request. |
| if (scope == Scope::TABLE) { |
| req->set_table(table); |
| } |
| |
| RETURN_NOT_OK(subprocess_->Execute(req_list, &resp_list)); |
| |
| CHECK_EQ(1, resp_list.responses_size()); |
| *authorized = resp_list.responses().begin()->allowed(); |
| return Status::OK(); |
| } |
| |
| Status RangerClient::AuthorizeActionMultipleColumns(const string& user_name, const ActionPB& action, |
| const string& database, const string& table, |
| bool is_owner, |
| unordered_set<string>* column_names) { |
| DCHECK(subprocess_); |
| DCHECK(!column_names->empty()); |
| |
| RangerRequestListPB req_list; |
| RangerResponseListPB resp_list; |
| req_list.set_user(user_name); |
| |
| for (const auto& col : *column_names) { |
| auto req = req_list.add_requests(); |
| req->set_action(action); |
| req->set_database(database); |
| req->set_table(table); |
| req->set_column(col); |
| req->set_is_owner(is_owner); |
| } |
| |
| RETURN_NOT_OK(subprocess_->Execute(req_list, &resp_list)); |
| |
| DCHECK_EQ(column_names->size(), resp_list.responses_size()); |
| |
| unordered_set<string> allowed_columns; |
| for (auto i = 0; i < req_list.requests_size(); ++i) { |
| if (resp_list.responses(i).allowed()) { |
| EmplaceOrDie(&allowed_columns, move(req_list.requests(i).column())); |
| } |
| } |
| |
| *column_names = move(allowed_columns); |
| |
| return Status::OK(); |
| } |
| |
| Status RangerClient::AuthorizeActionMultipleTables(const string& user_name, const ActionPB& action, |
| unordered_map<string, bool>* tables) { |
| DCHECK(subprocess_); |
| |
| RangerRequestListPB req_list; |
| RangerResponseListPB resp_list; |
| req_list.set_user(user_name); |
| |
| vector<pair<string, bool>> orig_table_names; |
| |
| for (const auto& table : *tables) { |
| string db; |
| Slice tbl; |
| |
| auto s = ParseRangerTableIdentifier(table.first, &db, &tbl); |
| if (PREDICT_TRUE(s.ok())) { |
| orig_table_names.emplace_back(table); |
| |
| auto req = req_list.add_requests(); |
| req->set_action(action); |
| req->set_database(db); |
| req->set_table(tbl.ToString()); |
| req->set_is_owner(table.second); |
| } else { |
| LOG(WARNING) << Substitute(kDenyNonRangerTableTemplate, table.first); |
| } |
| } |
| |
| RETURN_NOT_OK(subprocess_->Execute(req_list, &resp_list)); |
| |
| DCHECK_EQ(orig_table_names.size(), resp_list.responses_size()); |
| |
| unordered_map<string, bool> allowed_tables; |
| for (auto i = 0; i < orig_table_names.size(); ++i) { |
| if (resp_list.responses(i).allowed()) { |
| EmplaceOrDie(&allowed_tables, move(orig_table_names[i])); |
| } |
| } |
| |
| *tables = move(allowed_tables); |
| |
| return Status::OK(); |
| } |
| |
| Status RangerClient::AuthorizeActions(const string& user_name, const string& database, |
| const string& table, bool is_owner, |
| unordered_set<ActionPB, ActionHash>* actions, |
| Scope scope) { |
| DCHECK(subprocess_); |
| DCHECK(!actions->empty()); |
| |
| RangerRequestListPB req_list; |
| RangerResponseListPB resp_list; |
| req_list.set_user(user_name); |
| |
| for (const auto& action : *actions) { |
| auto req = req_list.add_requests(); |
| req->set_action(action); |
| req->set_database(database); |
| if (scope == Scope::TABLE) { |
| req->set_table(table); |
| req->set_is_owner(is_owner); |
| } |
| } |
| |
| RETURN_NOT_OK(subprocess_->Execute(req_list, &resp_list)); |
| |
| DCHECK_EQ(actions->size(), resp_list.responses_size()); |
| |
| unordered_set<ActionPB, ActionHash> allowed_actions; |
| for (auto i = 0; i < req_list.requests_size(); ++i) { |
| if (resp_list.responses(i).allowed()) { |
| EmplaceOrDie(&allowed_actions, move(req_list.requests(i).action())); |
| } |
| } |
| |
| *actions = move(allowed_actions); |
| |
| return Status::OK(); |
| } |
| |
| Status RangerClient::RefreshPolicies() { |
| DCHECK(subprocess_); |
| |
| RangerRequestListPB req_list; |
| RangerResponseListPB resp_list; |
| |
| req_list.mutable_control_request()->set_refresh_policies(true); |
| |
| RETURN_NOT_OK(subprocess_->Execute(req_list, &resp_list)); |
| |
| if (PREDICT_TRUE(!resp_list.control_response().success())) { |
| string err = resp_list.control_response().error(); |
| return Status::RemoteError(err); |
| } |
| |
| return Status::OK(); |
| } |
| |
| } // namespace ranger |
| } // namespace kudu |