#include <cstdint>
#include <functional>
#include <memory>
#include <mutex>
#include <ostream>
#include <string>
#include <thread>
#include <utility>
#include <vector>
#include <gflags/gflags_declare.h>
#include <glog/logging.h>
#include <google/protobuf/util/message_differencer.h>
#include <gtest/gtest.h>
#include "kudu/client/client-internal.h"
#include "kudu/client/client-test-util.h"
#include "kudu/client/client.h"
#include "kudu/client/client.pb.h"
#include "kudu/client/schema.h"
#include "kudu/client/shared_ptr.h" // IWYU pragma: keep
#include "kudu/client/write_op.h"
#include "kudu/common/partial_row.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/gutil/walltime.h"
#include "kudu/master/catalog_manager.h"
#include "kudu/master/master.h"
#include "kudu/master/mini_master.h"
#include "kudu/mini-cluster/internal_mini_cluster.h"
#include "kudu/security/crypto.h"
#include "kudu/security/openssl_util.h"
#include "kudu/security/token.pb.h"
#include "kudu/security/token_signer.h"
#include "kudu/tablet/key_value_test_schema.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
using google::protobuf::util::MessageDifferencer;
using std::string;
using std::thread;
using std::unique_ptr;
using std::vector;
namespace kudu {
class RWMutex;
using cluster::InternalMiniCluster;
using cluster::InternalMiniClusterOptions;
using client::AuthenticationCredentialsPB;
using client::sp::shared_ptr;
using client::KuduClient;
using client::KuduClientBuilder;
using client::KuduError;
using client::KuduInsert;
using client::KuduScanner;
using client::KuduSchema;
using client::KuduSession;
using client::KuduTable;
using client::KuduTableCreator;
using security::DataFormat;
using security::PrivateKey;
using security::SignedTokenPB;
using security::TablePrivilegePB;
using security::TokenSigner;
using security::TokenSigningPrivateKeyPB;
using strings::Substitute;
namespace {
// Relatively low timeout used so we don't have to wait too long for an
// "invalid token" error.
const int kRpcTimeoutSecs = 3;
const int kOperationTimeoutSecs = kRpcTimeoutSecs * 3;
// Inserts a single row to the given key-value table for the given key.
Status InsertKeyToTable(KuduTable* table, KuduSession* session, int key) {
unique_ptr<KuduInsert> insert(table->NewInsert());
KuduPartialRow* row = insert->mutable_row();
RETURN_NOT_OK(row->SetInt32(0, key));
RETURN_NOT_OK(row->SetInt32(1, key));
return session->Apply(insert.release());
vector<Status> GetSessionErrors(KuduSession* session) {
vector<KuduError*> errors;
session->GetPendingErrors(&errors, nullptr);
vector<Status> ret(errors.size());
for (int i = 0; i < errors.size(); i++) {
ret[i] = errors[i]->status();
ElementDeleter deleter(&errors);
return ret;
// Scans values from the given table.
Status ScanFromTable(KuduTable* table) {
KuduScanner scanner(table);
scanner.SetTimeoutMillis(kOperationTimeoutSecs * 1000);
vector<string> rows;
return ScanToStrings(&scanner, &rows);
} // anonymous namespace
class AuthzTokenTest : public KuduTest {
: schema_(KuduSchema::FromSchema(CreateKeyValueTestSchema())) {}
const char* const kTableName = "test-table";
const char* const kUser = "token-user";
const char* const kBadUser = "bad-token-user";
// Helper to get the authz token for 'table_id' from the client's cache.
static bool FetchCachedAuthzToken(
KuduClient* client, const string& table_id, SignedTokenPB* token) {
return client->data_->FetchCachedAuthzToken(table_id, token);
// Helper to store the authz token for 'table_id' to the client's cache.
static void StoreAuthzToken(KuduClient* client,
const string& table_id,
const SignedTokenPB& token) {
client->data_->StoreAuthzToken(table_id, token);
void SetUp() override {
FLAGS_tserver_enforce_access_control = true;
FLAGS_authz_token_validity_seconds = 1;
// Create a table with a basic schema.
cluster_.reset(new InternalMiniCluster(env_, InternalMiniClusterOptions()));
// Sets up the client_ and client_table_ members.
Status SetupClientAndTable() {
RETURN_NOT_OK(CreateClientForUser(kUser, &client_));
unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
.set_range_partition_columns({ "key" })
RETURN_NOT_OK(client_->OpenTable(kTableName, &client_table_));
table_id_ = client_table_->id();
return Status::OK();
// Inserts the next appropriate row to the table.
Status InsertToTable(KuduTable* table) {
shared_ptr<KuduSession> session(table->client()->NewSession());
RETURN_NOT_OK(InsertKeyToTable(table, session.get(), next_row_key_++));
return session->Flush();
// Creates a client for the given user.
Status CreateClientForUser(const string& user, shared_ptr<KuduClient>* client) const {
// Many tests will expect operations to fail, so let's get there quicker by
// setting a low timeout.
KuduClientBuilder client_builder;
string authn_creds;
AuthenticationCredentialsPB authn_pb;
return cluster_->CreateClient(&client_builder, client);
// Gets the current number of GetTableSchema requests the master has serviced.
// This increments whenever a client opens a table or gets a new authz token.
uint64_t NumGetTableSchemaRequests() const {
const auto& ent = cluster_->mini_master()->master()->metric_entity();
return METRIC_handler_latency_kudu_master_MasterService_GetTableSchema
// Inserts the next row into the table, expecting an error. Returns the
// session error, rather than the usual coarse-grained IOError.
Status InsertToTableSessionError(KuduTable* table) {
KuduClient* client = table->client();
shared_ptr<KuduSession> session = client->NewSession();
RETURN_NOT_OK(InsertKeyToTable(table, session.get(), next_row_key_++));
Status s = session->Flush();
if (!s.IsIOError()) {
return s;
vector<Status> errors = GetSessionErrors(session.get());
if (errors.size() != 1) {
return Status::RuntimeError(Substitute("expected 1 error, got $0",
return errors[0];
const KuduSchema schema_;
unique_ptr<InternalMiniCluster> cluster_;
// Client authenticated as the default user.
shared_ptr<KuduClient> client_;
// Table created with 'client_'.
shared_ptr<KuduTable> client_table_;
string table_id_;
// The next row key to insert.
int next_row_key_ = 0;
namespace {
// Functors with which the client can send requests.
Status InsertRequestor(AuthzTokenTest* test, KuduTable* table) {
return test->InsertToTable(table);
Status ScanRequestor(AuthzTokenTest* /*test*/, KuduTable* table) {
return ScanFromTable(table);
} // anonymous namespace
// Tests parameterized on different data operations.
typedef std::function<Status(AuthzTokenTest*, KuduTable*)> RequestorFunc;
class ReacquireAuthzTokenTest : public AuthzTokenTest,
public ::testing::WithParamInterface<RequestorFunc> {};
// Test scenarios that lead the client to retrieve a new token.
TEST_P(ReacquireAuthzTokenTest, TestInvalidAuthzTokens) {
auto client_func = GetParam();
// First, let's do a sanity check that initial authz tokens allow the user to
// perform all actions.
SignedTokenPB first_token;
ASSERT_TRUE(FetchCachedAuthzToken(client_.get(), table_id_, &first_token));
ASSERT_OK(client_func(this, client_table_.get()));
// The above operations shouldn't have required getting a new token.
SignedTokenPB same_token;
ASSERT_TRUE(FetchCachedAuthzToken(client_.get(), table_id_, &same_token));
ASSERT_TRUE(MessageDifferencer::Equals(first_token, same_token));
shared_ptr<KuduClient> bad_client;
ASSERT_OK(CreateClientForUser(kBadUser, &bad_client));
shared_ptr<KuduTable> bad_table;
ASSERT_OK(bad_client->OpenTable(kTableName, &bad_table));
LOG(INFO) << "Trying to use the wrong user's token...";
SignedTokenPB bad_token;
ASSERT_TRUE(FetchCachedAuthzToken(client_.get(), table_id_, &bad_token));
StoreAuthzToken(bad_client.get(), table_id_, bad_token);
// The bad client should succeed after being told go retrieve a new token for
// the correct user. Check that it received a different token.
ASSERT_OK(client_func(this, bad_table.get()));
SignedTokenPB new_token;
ASSERT_TRUE(FetchCachedAuthzToken(bad_client.get(), table_id_, &new_token));
ASSERT_FALSE(MessageDifferencer::Equals(bad_token, new_token));
// Replace the token used by the client with one that is malformed by
// messing with the token data. The server should respond such that the
// client circled back to the master and got a new token.
LOG(INFO) << "Trying to use a bad signature...";
string bad_signature = std::move(*new_token.mutable_signature());
// Flip the bits of the signature.
for (int i = 0; i < bad_signature.length(); i++) {
auto& byte = bad_signature[i];
byte = ~byte;
bad_token = std::move(new_token);
StoreAuthzToken(bad_client.get(), table_id_, bad_token);
ASSERT_OK(client_func(this, bad_table.get()));
// The client should have received a new token.
ASSERT_TRUE(FetchCachedAuthzToken(bad_client.get(), table_id_, &new_token));
ASSERT_FALSE(MessageDifferencer::Equals(bad_token, new_token));
TEST_P(ReacquireAuthzTokenTest, TestExpiredAuthzTokens) {
// We sleep for a bit to allow the expiration of tokens.
auto client_func = GetParam();
// Ensure that expired authz tokens will lead the client to retry with a new
// token upon writing/scanning.
uint64_t initial_reqs = NumGetTableSchemaRequests();
SleepFor(MonoDelta::FromSeconds(FLAGS_authz_token_validity_seconds + 1));
ASSERT_OK(client_func(this, client_table_.get()));
ASSERT_GT(NumGetTableSchemaRequests(), initial_reqs);
INSTANTIATE_TEST_SUITE_P(RequestorFuncs, ReacquireAuthzTokenTest,
// Test to ensure tokens with no privileges will disallow operations.
TEST_F(AuthzTokenTest, TestUnprivilegedAuthzTokens) {
// Replace the token used by the client with one that has no permissions.
// Since the token is well-formed, but does not have the sufficient
// privileges to perform the actions, the client going back to the master
// for a new token will not work, and the user will see an error.
LOG(INFO) << "Trying to use an unprivileged token...";
SignedTokenPB unprivileged_token;
TablePrivilegePB no_privilege;
kUser, std::move(no_privilege), &unprivileged_token));
StoreAuthzToken(client_.get(), table_id_, unprivileged_token);
shared_ptr<KuduSession> bad_session(client_->NewSession());
ASSERT_OK(InsertKeyToTable(client_table_.get(), bad_session.get(), next_row_key_++));
// Write sessions will accumulate a bunch of non-authorized errors, veiling
// them in an IOError.
Status s = bad_session->Flush();
ASSERT_TRUE(s.IsIOError()) << s.ToString();
vector<Status> errors = GetSessionErrors(bad_session.get());
for (const auto& e : errors) {
ASSERT_TRUE(e.IsRemoteError()) << e.ToString();
ASSERT_STR_CONTAINS(e.ToString(), "Not authorized");
// Scans will return a remote error with an appropriate message.
s = ScanFromTable(client_table_.get());
ASSERT_TRUE(s.IsRemoteError()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(), "Not authorized");
// Test that ensures we retry when we send a token signed by a TSK that hasn't
// percolated to the tservers. In such cases, the tablet server should respond
// with an ERROR_UNAVAILABLE error and the request should be retried.
TEST_F(AuthzTokenTest, TestUnknownTsk) {
// Create a TSK with a high enough sequence number that it will be unknown to
// the server.
TokenSigningPrivateKeyPB tsk;
PrivateKey private_key;
int key_size = UseLargeKeys() ? 2048 : 512;
ASSERT_OK(GeneratePrivateKey(key_size, &private_key));
string private_key_str_der;
ASSERT_OK(private_key.ToString(&private_key_str_der, DataFormat::DER));
tsk.set_expire_unix_epoch_seconds(WallTime_Now() + 3600);
TablePrivilegePB privilege;
// Create a token signer to use our surprise TSK. The intervals don't matter.
TokenSigner signer(100, 100, 100);
ASSERT_OK(signer.ImportKeys({ tsk }));
SignedTokenPB token;
ASSERT_OK(signer.GenerateAuthzToken(kUser, std::move(privilege), &token));
StoreAuthzToken(client_.get(), table_id_, token);
// The operations will see ERROR_UNAVAILABLE and keep retrying, hoping for
// the TSK to make its way to the server.
Status s = ScanFromTable(client_table_.get());
LOG(INFO) << s.ToString();
ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
s = InsertToTableSessionError(client_table_.get());
ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
// Only after we import the TSK do operations succeed.
ASSERT_OK(cluster_->mini_master()->master()->token_signer()->ImportKeys({ tsk }));
// Test what happens when the single-master deployment responds with a
// retriable error when getting a new authz token.
TEST_F(AuthzTokenTest, TestSingleMasterUnavailable) {
// We sleep in this test to ensure our scan has time to retry.
// Warm the client's meta cache so it doesn't need to go to the master for
// the location of the tserver.
// Set up the client such that its first operation will require it to go back
// to the master (in this case, by giving it a token for the wrong user).
shared_ptr<KuduClient> bad_client;
ASSERT_OK(CreateClientForUser("bad-token-user", &bad_client));
shared_ptr<KuduTable> bad_table;
ASSERT_OK(bad_client->OpenTable(kTableName, &bad_table));
SignedTokenPB bad_token;
ASSERT_TRUE(FetchCachedAuthzToken(bad_client.get(), table_id_, &bad_token));
StoreAuthzToken(client_.get(), table_id_, bad_token);
// Take the leader lock on the master, which will prevent successful attempts
// to get a new token, but will allow retries.
std::unique_lock<RWMutex> leader_lock(
// After a while, the client operation will time out.
Status s = ScanFromTable(client_table_.get());
LOG(INFO) << s.ToString();
ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
// If we let go of the leader lock while the operation is still in flight,
// the operation should succeed. At this point, the client still shouldn't
// have an authz token.
thread scanner([&] {
s = ScanFromTable(client_table_.get());
// Wait for a full RPC timeout to make the scan retry once more before
// letting go of the leader lock.
SleepFor(MonoDelta::FromSeconds(kRpcTimeoutSecs + 1));
// Test with utilities to prevent the master(s) from providing authz tokens.
// The test is also configured such that the masters will frequently change
// leadership.
class BadMultiMasterAuthzTokenTest : public AuthzTokenTest {
void SetUp() override {
// We're going to make elections more frequent, so set some non-runtime
// flags up front. The values for these and the below flags are chosen to
// not be flaky, even running with stress in TSAN mode.
FLAGS_leader_failure_max_missed_heartbeat_periods = 1;
FLAGS_raft_heartbeat_interval_ms = 200;
InternalMiniClusterOptions opts;
opts.num_masters = 3;
cluster_.reset(new InternalMiniCluster(env_, std::move(opts)));
// Enforce access control, and set the rest of the election flags.
FLAGS_tserver_enforce_access_control = true;
FLAGS_raft_enable_pre_election = false;
FLAGS_consensus_inject_latency_ms_in_notifications = FLAGS_raft_heartbeat_interval_ms / 3;
// Test what happens when the multimaster deployment undergoes frequent leader
// changes. Tokens should still be issued and failures to get a token should be
// retried.
TEST_F(BadMultiMasterAuthzTokenTest, TestMasterElectionStorms) {
// Set up the tablet servers such that they'll force the client to go back to
// the master for a new token.
FLAGS_tserver_inject_invalid_authz_token_ratio = 1.0;
// Despite the master leader elections, new tokens should be receieved.
// After a while, operations should time out because the authz tokens are all
// invalid. The scanner will enrich the returned status with the last error
// received from the tablet server.
Status s = ScanFromTable(client_table_.get());
ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(), "Not authorized");
// Do the same for inserts.
shared_ptr<KuduSession> session = client_->NewSession();
ASSERT_OK(InsertKeyToTable(client_table_.get(), session.get(), next_row_key_++));
s = session->Flush();
vector<Status> errors = GetSessionErrors(session.get());
for (const auto& e : errors) {
ASSERT_TRUE(e.IsTimedOut()) << e.ToString();
// TODO(awong): refactor WriteRpc so it spits out the tserver error that
// caused it to attempt getting another token.
ASSERT_STR_CONTAINS(e.ToString(), "RetrieveAuthzToken timed out");
// Now ease up the error injection on the tserver to ensure the tokens we get
// are useable.
FLAGS_tserver_inject_invalid_authz_token_ratio = 0.5;
// Test in which the master does not support creating authz tokens.
class LegacyMasterAuthzTokenTest : public AuthzTokenTest {
void SetUp() override {
FLAGS_master_support_authz_tokens = false;
FLAGS_tserver_enforce_access_control = false;
cluster_.reset(new InternalMiniCluster(env_, InternalMiniClusterOptions()));
// Ensures the client can still communicate with servers that do not support
// authz tokens.
TEST_F(LegacyMasterAuthzTokenTest, TestAuthzTokensNotSupported) {
// Client should have no problems connecting to an old cluster.
// In the unexpected case that the tservers enforce access control but we
// have an old master, a scan will fail upon being asked to reacquire an
// authz token, learning it is not supported.
FLAGS_tserver_enforce_access_control = true;
Status s = ScanFromTable(client_table_.get());
ASSERT_TRUE(s.IsNotSupported()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(), "does not support RetrieveAuthzToken");
// The same will happen for writes.
s = InsertToTableSessionError(client_table_.get());
ASSERT_TRUE(s.IsNotSupported()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(), "does not support RetrieveAuthzToken");
} // namespace kudu