// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include "kudu/ranger/ranger_client.h"

#include <cstddef>
#include <memory>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>

#include <boost/container_hash/extensions.hpp>
#include <gflags/gflags_declare.h>
#include <glog/logging.h>
#include <google/protobuf/any.pb.h>
#include <gtest/gtest.h>

#include "kudu/gutil/map-util.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/join.h"
#include "kudu/gutil/strings/split.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/postgres/mini_postgres.h"
#include "kudu/ranger/mini_ranger.h"
#include "kudu/ranger/ranger.pb.h"
#include "kudu/subprocess/server.h"
#include "kudu/subprocess/subprocess.pb.h"
#include "kudu/util/env.h"
#include "kudu/util/env_util.h"
#include "kudu/util/faststring.h"
#include "kudu/util/metrics.h"
#include "kudu/util/path_util.h"
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"

DECLARE_string(log_dir);
DECLARE_string(ranger_config_path);
DECLARE_string(ranger_log_config_dir);
DECLARE_string(ranger_log_level);
DECLARE_bool(ranger_logtostdout);
DECLARE_bool(ranger_overwrite_log_config);

using boost::hash_combine;
using kudu::env_util::ListFilesInDir;
using kudu::subprocess::SubprocessMetrics;
using kudu::subprocess::SubprocessRequestPB;
using kudu::subprocess::SubprocessResponsePB;
using kudu::subprocess::SubprocessServer;
using std::move;
using std::string;
using std::unordered_map;
using std::unordered_set;
using std::vector;
using strings::SkipEmpty;
using strings::Split;
using strings::Substitute;
using kudu::postgres::MiniPostgres;

namespace kudu {
namespace ranger {

struct AuthorizedAction {
  string user_name;
  ActionPB action;
  string database_name;
  string table_name;
  string column_name;

  bool operator==(const AuthorizedAction& rhs) const {
    return user_name == rhs.user_name &&
      action == rhs.action &&
      database_name == rhs.database_name &&
      table_name == rhs.table_name &&
      column_name == rhs.column_name;
  }
};

struct AuthorizedActionHash {
 public:
  size_t operator()(const AuthorizedAction& action) const {
    size_t seed = 0;
    hash_combine(seed, action.action);
    hash_combine(seed, action.column_name);
    hash_combine(seed, action.database_name);
    hash_combine(seed, action.table_name);
    hash_combine(seed, action.user_name);

    return seed;
  }
};

class MockSubprocessServer : public SubprocessServer {
 public:
  unordered_set<AuthorizedAction, AuthorizedActionHash> next_response_;

  Status Init() override {
    // don't want to start anything
    return Status::OK();
  }

  ~MockSubprocessServer() override {}

  MockSubprocessServer()
      : SubprocessServer(Env::Default(), "", {"mock"}, SubprocessMetrics()) {}

  Status Execute(SubprocessRequestPB* req,
                 SubprocessResponsePB* resp) override {
    RangerRequestListPB req_list;
    CHECK(req->request().UnpackTo(&req_list));

    RangerResponseListPB resp_list;

    for (const RangerRequestPB& req : req_list.requests()) {
      AuthorizedAction action;
      action.user_name = req_list.user();
      action.action = req.action();
      action.table_name = req.table();
      action.database_name = req.database();
      action.column_name = req.has_column() ? req.column() : "";

      auto r = resp_list.add_responses();
      r->set_allowed(ContainsKey(next_response_, action));
    }

    resp->mutable_response()->PackFrom(resp_list);

    return Status::OK();
  }
};

class RangerClientTest : public KuduTest {
 public:
  RangerClientTest() :
    client_(env_, METRIC_ENTITY_server.Instantiate(&metric_registry_, "ranger_client-test")) {}

  void SetUp() override {
    std::unique_ptr<MockSubprocessServer> server(new MockSubprocessServer());
    next_response_ = &server->next_response_;
    client_.ReplaceServerForTests(std::move(server));
  }

  void Allow(string user_name, ActionPB action, string database_name,
             string table_name, string column_name = "") {
    AuthorizedAction authorized_action;
    authorized_action.user_name = move(user_name);
    authorized_action.action = action;
    authorized_action.database_name = move(database_name);
    authorized_action.table_name = move(table_name);
    authorized_action.column_name = move(column_name);

    next_response_->emplace(authorized_action);
  }

 protected:
  MetricRegistry metric_registry_;
  unordered_set<AuthorizedAction, AuthorizedActionHash>* next_response_;
  RangerClient client_;
};

TEST_F(RangerClientTest, TestAuthorizeCreateTableUnauthorized) {
  bool authorized;
  ASSERT_OK(client_.AuthorizeAction("jdoe", ActionPB::CREATE, "bar", "baz", /*is_owner=*/false,
                                    /*requires_delegate_admin=*/false, &authorized));
  ASSERT_FALSE(authorized);
}

TEST_F(RangerClientTest, TestAuthorizeCreateTableAuthorized) {
  Allow("jdoe", ActionPB::CREATE, "foo", "bar");
  bool authorized;
  ASSERT_OK(client_.AuthorizeAction("jdoe", ActionPB::CREATE, "foo", "bar", /*is_owner=*/false,
                                    /*requires_delegate_admin=*/false, &authorized));
  ASSERT_TRUE(authorized);
}

TEST_F(RangerClientTest, TestAuthorizeListNoTables) {
  unordered_map<string, bool> tables;
  ASSERT_OK(client_.AuthorizeActionMultipleTables("jdoe", ActionPB::METADATA, &tables));
  ASSERT_EQ(0, tables.size());
}

TEST_F(RangerClientTest, TestAuthorizeListNoTablesAuthorized) {
  unordered_map<string, bool> tables;
  tables.emplace("foo.bar", false);
  tables.emplace("foo.baz", false);
  ASSERT_OK(client_.AuthorizeActionMultipleTables("jdoe", ActionPB::METADATA, &tables));
  ASSERT_EQ(0, tables.size());
}

TEST_F(RangerClientTest, TestAuthorizeMetadataSubsetOfTablesAuthorized) {
  Allow("jdoe", ActionPB::METADATA, "default", "foobar");
  unordered_map<string, bool> tables;
  tables.emplace("default.foobar", false);
  tables.emplace("barbaz", false);
  ASSERT_OK(client_.AuthorizeActionMultipleTables("jdoe", ActionPB::METADATA, &tables));
  ASSERT_EQ(1, tables.size());
  ASSERT_EQ("default.foobar", tables.begin()->first);
}

TEST_F(RangerClientTest, TestAuthorizeMetadataAllAuthorized) {
  Allow("jdoe", ActionPB::METADATA, "default", "foobar");
  Allow("jdoe", ActionPB::METADATA, "default", "barbaz");
  unordered_map<string, bool> tables;
  tables.emplace("default.foobar", false);
  tables.emplace("barbaz", false);
  ASSERT_OK(client_.AuthorizeActionMultipleTables("jdoe", ActionPB::METADATA, &tables));
  ASSERT_EQ(2, tables.size());
  ASSERT_TRUE(ContainsKey(tables, "default.foobar"));
  ASSERT_TRUE(ContainsKey(tables, "barbaz"));
}

TEST_F(RangerClientTest, TestAuthorizeMetadataAllNonRanger) {
  unordered_map<string, bool> tables;
  tables.emplace("foo.", false);
  tables.emplace(".bar", false);
  ASSERT_OK(client_.AuthorizeActionMultipleTables("jdoe", ActionPB::METADATA, &tables));
  ASSERT_EQ(0, tables.size());
}

TEST_F(RangerClientTest, TestAuthorizeMetadataNoneAuthorizedContainsNonRanger) {
  unordered_map<string, bool> tables;
  tables.emplace("foo.", false);
  tables.emplace(".bar", false);
  tables.emplace("foo.bar", false);
  tables.emplace("foo.baz", false);
  ASSERT_OK(client_.AuthorizeActionMultipleTables("jdoe", ActionPB::METADATA, &tables));
  ASSERT_EQ(0, tables.size());
}

TEST_F(RangerClientTest, TestAuthorizeMetadataAllAuthorizedContainsNonRanger) {
  Allow("jdoe", ActionPB::METADATA, "default", "foobar");
  Allow("jdoe", ActionPB::METADATA, "default", "barbaz");
  unordered_map<string, bool> tables;
  tables.emplace("default.foobar", false);
  tables.emplace("barbaz", false);
  tables.emplace("foo.", false);
  ASSERT_OK(client_.AuthorizeActionMultipleTables("jdoe", ActionPB::METADATA, &tables));
  ASSERT_EQ(2, tables.size());
  ASSERT_TRUE(ContainsKey(tables, "default.foobar"));
  ASSERT_TRUE(ContainsKey(tables, "barbaz"));
  ASSERT_FALSE(ContainsKey(tables, "foo."));
}

TEST_F(RangerClientTest, TestAuthorizeScanSubsetAuthorized) {
  Allow("jdoe", ActionPB::SELECT, "default", "foobar", "col1");
  Allow("jdoe", ActionPB::SELECT, "default", "foobar", "col3");
  unordered_set<string> columns;
  columns.emplace("col1");
  columns.emplace("col2");
  columns.emplace("col3");
  columns.emplace("col4");
  ASSERT_OK(client_.AuthorizeActionMultipleColumns("jdoe", ActionPB::SELECT,
                                                   "default", "foobar", /*is_owner=*/false,
                                                   &columns));
  ASSERT_EQ(2, columns.size());
  ASSERT_TRUE(ContainsKey(columns, "col1"));
  ASSERT_TRUE(ContainsKey(columns, "col3"));
  ASSERT_FALSE(ContainsKey(columns, "col2"));
  ASSERT_FALSE(ContainsKey(columns, "col4"));
}

TEST_F(RangerClientTest, TestAuthorizeScanAllColumnsAuthorized) {
  Allow("jdoe", ActionPB::SELECT, "default", "foobar", "col1");
  Allow("jdoe", ActionPB::SELECT, "default", "foobar", "col2");
  Allow("jdoe", ActionPB::SELECT, "default", "foobar", "col3");
  Allow("jdoe", ActionPB::SELECT, "default", "foobar", "col4");
  unordered_set<string> columns;
  columns.emplace("col1");
  columns.emplace("col2");
  columns.emplace("col3");
  columns.emplace("col4");
  ASSERT_OK(client_.AuthorizeActionMultipleColumns("jdoe", ActionPB::SELECT,
                                                   "default", "foobar", /*is_owner=*/false,
                                                   &columns));
  ASSERT_EQ(4, columns.size());
  ASSERT_TRUE(ContainsKey(columns, "col1"));
  ASSERT_TRUE(ContainsKey(columns, "col2"));
  ASSERT_TRUE(ContainsKey(columns, "col3"));
  ASSERT_TRUE(ContainsKey(columns, "col4"));
}

TEST_F(RangerClientTest, TestAuthorizeScanNoColumnsAuthorized) {
  unordered_set<string> columns;
  for (int i = 0; i < 4; ++i) {
    columns.emplace(Substitute("col$0", i));
  }
  ASSERT_OK(client_.AuthorizeActionMultipleColumns("jdoe", ActionPB::SELECT,
                                                   "default", "foobar", /*is_owner=*/false,
                                                   &columns));
  ASSERT_EQ(0, columns.size());
}

TEST_F(RangerClientTest, TestAuthorizeActionsNoneAuthorized) {
  unordered_set<ActionPB, ActionHash> actions;
  actions.emplace(ActionPB::DROP);
  actions.emplace(ActionPB::SELECT);
  actions.emplace(ActionPB::INSERT);
  ASSERT_OK(client_.AuthorizeActions("jdoe", "default", "foobar", /*is_owner=*/false, &actions));
  ASSERT_EQ(0, actions.size());
}

TEST_F(RangerClientTest, TestAuthorizeActionsSomeAuthorized) {
  Allow("jdoe", ActionPB::SELECT, "default", "foobar");
  unordered_set<ActionPB, ActionHash> actions;
  actions.emplace(ActionPB::DROP);
  actions.emplace(ActionPB::SELECT);
  actions.emplace(ActionPB::INSERT);
  ASSERT_OK(client_.AuthorizeActions("jdoe", "default", "foobar", /*is_owner=*/false, &actions));
  ASSERT_EQ(1, actions.size());
  ASSERT_TRUE(ContainsKey(actions, ActionPB::SELECT));
}

TEST_F(RangerClientTest, TestAuthorizeActionsAllAuthorized) {
  Allow("jdoe", ActionPB::DROP, "default", "foobar");
  Allow("jdoe", ActionPB::SELECT, "default", "foobar");
  Allow("jdoe", ActionPB::INSERT, "default", "foobar");
  unordered_set<ActionPB, ActionHash> actions;
  actions.emplace(ActionPB::DROP);
  actions.emplace(ActionPB::SELECT);
  actions.emplace(ActionPB::INSERT);
  ASSERT_OK(client_.AuthorizeActions("jdoe", "default", "foobar", /*is_owner=*/false, &actions));
  ASSERT_EQ(3, actions.size());
}

class RangerClientTestBase : public KuduTest {
 public:
  RangerClientTestBase()
      : test_dir_(GetTestDataDirectory()) {}

  void SetUp() override {
    metric_entity_ = METRIC_ENTITY_server.Instantiate(&metric_registry_, "ranger_client-test");
    FLAGS_ranger_log_level = "debug";
    FLAGS_ranger_logtostdout = true;
    FLAGS_ranger_config_path = test_dir_;
    FLAGS_ranger_log_config_dir = JoinPathSegments(test_dir_, "log_conf");
    FLAGS_log_dir = JoinPathSegments(test_dir_, "logs");
    ASSERT_OK(env_->CreateDir(FLAGS_log_dir));
    ASSERT_OK(InitializeRanger());
  }

  Status InitializeRanger() {
    ranger_.reset(new MiniRanger("127.0.0.1", std::make_shared<MiniPostgres>("127.0.0.1")));
    RETURN_NOT_OK(ranger_->Start());
    // Create a policy so the Ranger client policy refresher can pick something
    // up. In some environments the absense of policies can cause the plugin to
    // wait for a policy to appear indefinitely.
    // See KUDU-3154 and RANGER-2899 for more details.
    // TODO(awong): remove this when RANGER-2899 is fixed.
    PolicyItem item({"user"}, {ActionPB::METADATA}, false);
    AuthorizationPolicy policy;
    policy.databases.emplace_back("db");
    policy.tables.emplace_back("table");
    policy.items.emplace_back(std::move(item));
    RETURN_NOT_OK(ranger_->AddPolicy(std::move(policy)));
    RETURN_NOT_OK(ranger_->CreateClientConfig(test_dir_));
    client_.reset(new RangerClient(env_, metric_entity_));
    return client_->Start();
  }

 protected:
  const string test_dir_;
  MetricRegistry metric_registry_;

  scoped_refptr<MetricEntity> metric_entity_;
  std::unique_ptr<MiniRanger> ranger_;
  std::unique_ptr<RangerClient> client_;
};

namespace {

// Looks in --log_dir and returns the full Kudu Ranger subprocess log filename
// and the lines contained therein.
Status GetLinesFromLogFile(Env* env, string* file, vector<string>* lines) {
  vector<string> matches;
  RETURN_NOT_OK(env->Glob(JoinPathSegments(FLAGS_log_dir, "kudu-ranger-subprocess*.log"),
                                           &matches));
  if (matches.size() != 1) {
    return Status::Corruption(Substitute("Expected one file but got $0: $1",
                                         matches.size(), JoinStrings(matches, ", ")));
  }
  faststring contents;
  const string& full_log_file = matches[0];
  RETURN_NOT_OK(ReadFileToString(env, full_log_file, &contents));
  *file = full_log_file;
  *lines = Split(contents.ToString(), "\n", SkipEmpty());
  return Status::OK();
}

} // anonymous namespace

TEST_F(RangerClientTestBase, TestLogging) {
  {
    // Check that a logging configuration was produced by the Ranger client.
    vector<string> files;
    ASSERT_OK(ListFilesInDir(env_, FLAGS_ranger_log_config_dir, &files));
    SCOPED_TRACE(JoinStrings(files, "\n"));
    ASSERT_STRINGS_ANY_MATCH(files, ".*log4j2.properties");
  }
  // Make a request. It doesn't matter whether it succeeds or not -- debug logs
  // should include info about each request.
  bool authorized;
  ASSERT_OK(client_->AuthorizeAction("user", ActionPB::ALL, "db", "table", /*is_owner=*/false,
                                     /*requires_delegate_admin=*/false, &authorized));
  ASSERT_FALSE(authorized);
  {
    // Check that the Ranger client logs some DEBUG messages.
    vector<string> lines;
    string log_file;
    ASSERT_OK(GetLinesFromLogFile(env_, &log_file, &lines));
    ASSERT_STRINGS_ANY_MATCH(lines, ".*DEBUG.*");
    // Delete the log file so we can start a fresh log.
    ASSERT_OK(env_->DeleteFile(log_file));
  }
  // Try reconfiguring our logging to spit out info-level logs and start a new
  // log file by restarting the Ranger client. Since we're set up to not
  // overwrite the logging config, this won't be effective -- we should still
  // see DEBUG messages.
  FLAGS_ranger_log_level = "info";
  FLAGS_ranger_overwrite_log_config = false;
  client_.reset(new RangerClient(env_, metric_entity_));
  ASSERT_OK(client_->Start());
  ASSERT_OK(client_->AuthorizeAction("user", ActionPB::ALL, "db", "table", /*is_owner=*/false,
                                     /*requires_delegate_admin=*/false, &authorized));
  ASSERT_FALSE(authorized);
  {
    // Our logs should still contain DEBUG messages since we didn't update the
    // logging configuration.
    vector<string> lines;
    string log_file;
    ASSERT_OK(GetLinesFromLogFile(env_, &log_file, &lines));
    ASSERT_STRINGS_ANY_MATCH(lines, ".*DEBUG.*");
    // Delete the log file so we can start a fresh log.
    ASSERT_OK(env_->DeleteFile(log_file));
  }
  // Now try again but this time overwrite the logging configuration.
  FLAGS_ranger_overwrite_log_config = true;
  client_.reset(new RangerClient(env_, metric_entity_));
  ASSERT_OK(client_->Start());
  ASSERT_OK(client_->AuthorizeAction("user", ActionPB::ALL, "db", "table", /*is_owner=*/false,
                                     /*requires_delegate_admin=*/false, &authorized));
  ASSERT_FALSE(authorized);
  {
    // We shouldn't see any DEBUG messages since the client is configured to
    // use INFO-level logging.
    vector<string> lines;
    string unused;
    ASSERT_OK(GetLinesFromLogFile(env_, &unused, &lines));
    // We should see no DEBUG messages.
    for (const auto& l : lines) {
      ASSERT_STR_NOT_CONTAINS(l, "DEBUG");
    }
  }
}

} // namespace ranger
} // namespace kudu
