blob: 5c52f861c4bc1ca8f9f338217e2c4f042d4ec3bc [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <stdint.h>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include <gflags/gflags_declare.h>
#include <glog/logging.h>
#include <gtest/gtest.h>
#include "kudu/client/client-test-util.h"
#include "kudu/client/client.h"
#include "kudu/client/client.pb.h"
#include "kudu/client/schema.h"
#include "kudu/client/shared_ptr.h" // IWYU pragma: keep
#include "kudu/client/write_op.h"
#include "kudu/common/partial_row.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/mini-cluster/internal_mini_cluster.h"
#include "kudu/tablet/key_value_test_schema.h"
#include "kudu/util/monotime.h"
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
DECLARE_int32(heartbeat_interval_ms);
DECLARE_string(superuser_acl);
using std::string;
using std::unique_ptr;
using std::vector;
DECLARE_bool(enable_table_write_limit);
DECLARE_bool(tserver_enforce_access_control);
DECLARE_double(table_write_limit_ratio);
DECLARE_int32(log_segment_size_mb);
DECLARE_int32(flush_threshold_mb);
DECLARE_int32(flush_threshold_secs);
DECLARE_int32(tablet_history_max_age_sec);
DECLARE_int64(authz_token_validity_seconds);
DECLARE_int64(table_disk_size_limit);
DECLARE_int64(table_row_count_limit);
namespace kudu {
using cluster::InternalMiniCluster;
using cluster::InternalMiniClusterOptions;
using client::AuthenticationCredentialsPB;
using client::sp::shared_ptr;
using client::KuduClient;
using client::KuduClientBuilder;
using client::KuduDelete;
using client::KuduError;
using client::KuduInsert;
using client::KuduScanner;
using client::KuduSchema;
using client::KuduSession;
using client::KuduTable;
using client::KuduTableAlterer;
using client::KuduTableCreator;
using client::KuduTableStatistics;
using client::KuduUpdate;
namespace {
// Relatively low timeout used so we don't have to wait too long for an
// "invalid token" error.
const int kRpcTimeoutSecs = 3;
const int kOperationTimeoutSecs = kRpcTimeoutSecs * 3;
// Inserts a single row to the given key-value table for the given key.
Status InsertKeyToTable(KuduTable* table, KuduSession* session, int key) {
RETURN_NOT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
unique_ptr<KuduInsert> insert(table->NewInsert());
KuduPartialRow* row = insert->mutable_row();
RETURN_NOT_OK(row->SetInt32(0, key));
RETURN_NOT_OK(row->SetInt32(1, key));
return session->Apply(insert.release());
}
// Update table value according to key.
Status UpdateKeyToTable(KuduTable* table, KuduSession* session, int key, int value) {
RETURN_NOT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
unique_ptr<KuduUpdate> update(table->NewUpdate());
KuduPartialRow* row = update->mutable_row();
RETURN_NOT_OK(row->SetInt32("key", key));
RETURN_NOT_OK(row->SetInt32("val", value));
return session->Apply(update.release());
}
Status DeleteKeyToTable(KuduTable* table, KuduSession* session, int key) {
RETURN_NOT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
unique_ptr<KuduDelete> del(table->NewDelete());
KuduPartialRow* row = del->mutable_row();
RETURN_NOT_OK(row->SetInt32("key", key));
return session->Apply(del.release());
}
vector<Status> GetSessionErrors(KuduSession* session) {
vector<KuduError*> errors;
session->GetPendingErrors(&errors, nullptr);
vector<Status> ret(errors.size());
for (int i = 0; i < errors.size(); i++) {
ret[i] = errors[i]->status();
}
ElementDeleter deleter(&errors);
return ret;
}
Status ScanWholeTable(KuduTable* table, vector<string>* rows) {
KuduScanner scanner(table);
scanner.SetTimeoutMillis(kOperationTimeoutSecs * 1000);
return ScanToStrings(&scanner, rows);
}
Status SetTableLimit(const string& table_name,
const shared_ptr<KuduClient>& client,
int64_t disk_size_limit,
int64_t row_count_limit) {
unique_ptr<KuduTableAlterer> alterer(
client->NewTableAlterer(table_name));
return alterer->SetTableDiskSizeLimit(disk_size_limit)
->SetTableRowCountLimit(row_count_limit)
->Alter();
}
} // anonymous namespace
class DisableWriteWhenExceedingQuotaTest : public KuduTest {
public:
DisableWriteWhenExceedingQuotaTest()
: schema_(KuduSchema::FromSchema(CreateKeyValueTestSchema())) {}
const char* const kTableName = "test-table";
const char* const kUser = "token-user";
const char* const kSuperUser = "super-user";
static const int64_t kRowCountLimit = 20;
void SetUp() override {
KuduTest::SetUp();
FLAGS_superuser_acl = kSuperUser;
FLAGS_tserver_enforce_access_control = true;
FLAGS_authz_token_validity_seconds = 1;
FLAGS_enable_table_write_limit = true;
FLAGS_table_row_count_limit = 2;
FLAGS_table_disk_size_limit = 1024 * 1024 * 2;
FLAGS_log_segment_size_mb = 1;
FLAGS_flush_threshold_mb = 0;
FLAGS_tablet_history_max_age_sec = 1;
FLAGS_flush_threshold_secs = 1;
// Create a table with a basic schema.
cluster_.reset(new InternalMiniCluster(env_, InternalMiniClusterOptions()));
ASSERT_OK(cluster_->Start());
ASSERT_OK(SetupClientAndTable());
}
// Sets up the client_ and client_table_ members.
Status SetupClient(const string& user) {
RETURN_NOT_OK(CreateClientForUser(user, &client_));
RETURN_NOT_OK(client_->OpenTable(kTableName, &client_table_));
table_id_ = client_table_->id();
return Status::OK();
}
// Sets up the client_ and client_table_ members.
Status SetupClientAndTable() {
RETURN_NOT_OK(CreateClientForUser(kUser, &client_));
unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
RETURN_NOT_OK(table_creator->table_name(kTableName)
.schema(&schema_)
.num_replicas(1)
.set_range_partition_columns({ "key" })
.Create());
RETURN_NOT_OK(client_->OpenTable(kTableName, &client_table_));
table_id_ = client_table_->id();
return Status::OK();
}
// Creates a client for the given user.
Status CreateClientForUser(const string& user, shared_ptr<KuduClient>* client) const {
// Many tests will expect operations to fail, so let's get there quicker by
// setting a low timeout.
KuduClientBuilder client_builder;
client_builder.default_rpc_timeout(MonoDelta::FromSeconds(kRpcTimeoutSecs));
string authn_creds;
AuthenticationCredentialsPB authn_pb;
authn_pb.set_real_user(user);
CHECK(authn_pb.SerializeToString(&authn_creds));
client_builder.import_authentication_credentials(std::move(authn_creds));
return cluster_->CreateClient(&client_builder, client);
}
// Wait for the tservers update table statisitcs to master.
static void WaitForTServerUpdatesStatisticsToMaster(int ms = FLAGS_heartbeat_interval_ms * 10) {
SleepFor(MonoDelta::FromMilliseconds(ms));
}
// Disable write privilege through authz token when exceeding size quota
void TestSizeLimit(int64_t row_count_limit = kRowCountLimit) {
shared_ptr<KuduSession> good_session(client_->NewSession());
Status s = Status::OK();
int k = 0;
for (k = 0; k < row_count_limit; k++) {
ASSERT_OK(InsertKeyToTable(client_table_.get(), good_session.get(), k));
s = good_session->Flush();
if (!s.ok()) {
// break the loop once the write failed
break;
}
WaitForTServerUpdatesStatisticsToMaster(1000);
}
ASSERT_TRUE(s.IsIOError()) << s.ToString();
if (!s.ok()) {
vector<Status> errors = GetSessionErrors(good_session.get());
for (const auto& e : errors) {
ASSERT_TRUE(e.IsRemoteError()) << e.ToString();
ASSERT_STR_CONTAINS(e.ToString(), "Not authorized");
}
}
// Scanning still works.
vector<string> rows;
ASSERT_OK(ScanWholeTable(client_table_.get(), &rows));
ASSERT_EQ(rows.size(), k);
// Update also blocked
ASSERT_OK(UpdateKeyToTable(client_table_.get(), good_session.get(), 0, 1000));
s = good_session->Flush();
ASSERT_TRUE(s.IsIOError()) << s.ToString();
vector<Status> errors = GetSessionErrors(good_session.get());
for (const auto& e : errors) {
ASSERT_TRUE(e.IsRemoteError()) << e.ToString();
ASSERT_STR_CONTAINS(e.ToString(), "Not authorized");
}
// Delete operation is taken as UPDATE, which will also increase table size.
// So, remove all rows in order to reclaim the space
for (k--; k >= 0; k--) {
good_session = client_->NewSession();
ASSERT_OK(DeleteKeyToTable(client_table_.get(), good_session.get(), k));
ASSERT_OK(good_session->Flush());
WaitForTServerUpdatesStatisticsToMaster(1000);
}
// Suppose the table is empty
rows.clear();
ASSERT_OK(ScanWholeTable(client_table_.get(), &rows));
ASSERT_EQ(rows.size(), 0);
// Insertion should work again
ASSERT_OK(InsertKeyToTable(client_table_.get(), good_session.get(), 0));
ASSERT_OK(good_session->Flush());
WaitForTServerUpdatesStatisticsToMaster(1000);
// Remove the just inserted row to clean the table
good_session = client_->NewSession();
ASSERT_OK(DeleteKeyToTable(client_table_.get(), good_session.get(), 0));
ASSERT_OK(good_session->Flush());
WaitForTServerUpdatesStatisticsToMaster(1000);
}
// Disable write privilege through authz token when exceeding rows quota
void TestRowLimit() {
// insert 2 rows to fill the quota
shared_ptr<KuduSession> good_session(client_->NewSession());
auto key = next_row_key_++;
ASSERT_OK(InsertKeyToTable(client_table_.get(), good_session.get(), key));
ASSERT_OK(good_session->Flush());
WaitForTServerUpdatesStatisticsToMaster();
ASSERT_OK(InsertKeyToTable(client_table_.get(), good_session.get(), key + 1));
Status s = good_session->Flush();
WaitForTServerUpdatesStatisticsToMaster();
// exceeds row quota and failed to update
shared_ptr<KuduSession> bad_session(client_->NewSession());
ASSERT_OK(InsertKeyToTable(client_table_.get(), bad_session.get(), key + 2));
s = bad_session->Flush();
ASSERT_TRUE(s.IsIOError()) << s.ToString();
vector<Status> errors = GetSessionErrors(bad_session.get());
for (const auto& e : errors) {
ASSERT_TRUE(e.IsRemoteError()) << e.ToString();
ASSERT_STR_CONTAINS(e.ToString(), "Not authorized");
}
ASSERT_OK(UpdateKeyToTable(client_table_.get(), bad_session.get(), key, key + 2));
s = bad_session->Flush();
ASSERT_TRUE(s.IsIOError()) << s.ToString();
errors = GetSessionErrors(bad_session.get());
for (const auto& e : errors) {
ASSERT_TRUE(e.IsRemoteError()) << e.ToString();
ASSERT_STR_CONTAINS(e.ToString(), "Not authorized");
}
// Scans still works.
vector<string> rows;
ASSERT_OK(ScanWholeTable(client_table_.get(), &rows));
ASSERT_EQ(rows.size(), 2);
// remove one row to avoid reaching quota
good_session = client_->NewSession();
ASSERT_OK(DeleteKeyToTable(client_table_.get(), good_session.get(), key));
ASSERT_OK(good_session->Flush());
WaitForTServerUpdatesStatisticsToMaster();
// Check rows
rows.clear();
ASSERT_OK(ScanWholeTable(client_table_.get(), &rows));
ASSERT_EQ(rows.size(), 1);
// reinsert should be allowed
ASSERT_OK(InsertKeyToTable(client_table_.get(), good_session.get(), key + 3));
ASSERT_OK(good_session->Flush());
// empty the table by deleting all rows
ASSERT_OK(DeleteKeyToTable(client_table_.get(), good_session.get(), key + 3));
ASSERT_OK(good_session->Flush());
WaitForTServerUpdatesStatisticsToMaster();
ASSERT_OK(DeleteKeyToTable(client_table_.get(), good_session.get(), key + 1));
ASSERT_OK(good_session->Flush());
}
// change the table limit through admin user
void ModifyLimit(int64_t disk_size_limit, int64_t row_count_limit) {
ASSERT_OK(SetupClient(kSuperUser));
ASSERT_OK(SetTableLimit(kTableName, client_, disk_size_limit, row_count_limit));
}
protected:
const KuduSchema schema_;
unique_ptr<InternalMiniCluster> cluster_;
// Client authenticated as the default user.
shared_ptr<KuduClient> client_;
// Table created with 'client_'.
shared_ptr<KuduTable> client_table_;
string table_id_;
// The next row key to insert.
int next_row_key_ = 0;
};
// Refuse the table limit change if the alteration contains any other non-table-limit
// related operation, like renaming, changing owner, adding/altering/dropping column, etc.
TEST_F(DisableWriteWhenExceedingQuotaTest, TestDisallowedToChangeLimitMixedWithOtherOps) {
const int64_t on_disk_size_limit = 100L * 1024 * 1024;
const int64_t row_count_limit = 10L;
ASSERT_OK(SetupClient(kSuperUser));
// Failed to change table disk size limit because it contains another
// non-table-limit related operation: rename the table
unique_ptr<KuduTableAlterer> alterer(client_->NewTableAlterer(kTableName));
Status s = alterer->RenameTo("failedRenaming")
->SetTableDiskSizeLimit(on_disk_size_limit)
->Alter();
ASSERT_TRUE(s.IsConfigurationError()) << s.ToString();
// Failed to change the row count limit of the table because it contains
// another non-table-limit related operation: set owner.
s = alterer->SetOwner("failedSetOwner")
->SetTableRowCountLimit(row_count_limit)
->Alter();
ASSERT_TRUE(s.IsConfigurationError()) << s.ToString();
// It is successful only changing the table 'limit'.
ASSERT_OK(SetTableLimit(kTableName, client_, on_disk_size_limit, row_count_limit));
// restart the cluster to verify it again
cluster_->Shutdown();
ASSERT_OK(cluster_->Start());
unique_ptr<KuduTableStatistics> statistics;
KuduTableStatistics *table_statistics;
ASSERT_OK(client_->GetTableStatistics(kTableName, &table_statistics));
statistics.reset(table_statistics);
ASSERT_EQ(on_disk_size_limit, statistics->on_disk_size_limit());
ASSERT_EQ(row_count_limit, statistics->live_row_count_limit());
}
// Refuse the table limit change if the user is not superuser
TEST_F(DisableWriteWhenExceedingQuotaTest, TestOnlySuperUserAllowedToChangeTableLimit) {
const int64_t on_disk_size_limit = 100L * 1024 * 1024;
const int64_t row_count_limit = 10L;
Status s = SetTableLimit(kTableName, client_, on_disk_size_limit, row_count_limit);
ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
NO_FATALS(ModifyLimit(on_disk_size_limit, row_count_limit));
ASSERT_OK(SetupClient(kSuperUser));
ASSERT_OK(SetTableLimit(kTableName, client_, on_disk_size_limit, row_count_limit));
// restart the cluster to verify it again
cluster_->Shutdown();
ASSERT_OK(cluster_->Start());
unique_ptr<KuduTableStatistics> statistics;
KuduTableStatistics *table_statistics;
ASSERT_OK(client_->GetTableStatistics(kTableName, &table_statistics));
statistics.reset(table_statistics);
ASSERT_EQ(on_disk_size_limit, statistics->on_disk_size_limit());
ASSERT_EQ(row_count_limit, statistics->live_row_count_limit());
}
// Verify the table's row limit
TEST_F(DisableWriteWhenExceedingQuotaTest, TestDisableWritePrivilegeWhenExceedingRowsQuota) {
NO_FATALS(TestRowLimit());
// restart the cluster to verify it again
cluster_->Shutdown();
ASSERT_OK(cluster_->Start());
ASSERT_OK(SetupClient(kUser));
NO_FATALS(TestRowLimit());
}
// Verify the table's disk size limit.
TEST_F(DisableWriteWhenExceedingQuotaTest, TestDisableWritePrivilegeWhenExceedingSizeQuota) {
// Modify the limits for the table size: allow all the test rows to be
// inserted but set a restrictive limit on the size of the table's data on
// disk. The latter limit is to be hit while inserting the test rows. The
// limit is tuned to allow for at least one row to be inserted after all the
// flush, compaction, and GC jazz performed in the context of TestSizeLimit(),
// taking into account all the auxiliary data kept on disk by corresponding
// tablet servers.
int64_t row_limit = 100 * kRowCountLimit;
NO_FATALS(ModifyLimit(1024 * (1024 + 110), row_limit));
// refresh the client
ASSERT_OK(SetupClient(kUser));
NO_FATALS(TestSizeLimit(row_limit));
// restart the cluster to verify it again
cluster_->Shutdown();
ASSERT_OK(cluster_->Start());
ASSERT_OK(SetupClient(kUser));
NO_FATALS(TestSizeLimit(row_limit));
}
TEST_F(DisableWriteWhenExceedingQuotaTest, TestDisableWriteWhenExceedingRowsQuotaWithFactor) {
FLAGS_table_write_limit_ratio = 0.5;
// restart the cluster to make the limit factor take effect
cluster_->Shutdown();
ASSERT_OK(cluster_->Start());
// modify the table limit to allow more rows but small size
NO_FATALS(ModifyLimit(10L * 1024 * 1024, 4L));
// refresh the client
ASSERT_OK(SetupClient(kUser));
NO_FATALS(TestRowLimit());
// modify the table limit to allow more rows but small size
NO_FATALS(ModifyLimit(2L * 1024 * 1024 + 120L * 1024, kRowCountLimit));
// refresh the client
ASSERT_OK(SetupClient(kUser));
NO_FATALS(TestSizeLimit());
}
} // namespace kudu