| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include <algorithm> |
| #include <cstdint> |
| #include <cstdlib> |
| #include <iterator> |
| #include <map> |
| #include <memory> |
| #include <ostream> |
| #include <string> |
| #include <utility> |
| #include <vector> |
| |
| #include <glog/logging.h> |
| #include <gtest/gtest.h> |
| |
| #include "kudu/client/client-test-util.h" |
| #include "kudu/client/client.h" |
| #include "kudu/client/schema.h" |
| #include "kudu/client/shared_ptr.h" // IWYU pragma: keep |
| #include "kudu/client/value.h" |
| #include "kudu/client/write_op.h" |
| #include "kudu/common/common.pb.h" |
| #include "kudu/common/partial_row.h" |
| #include "kudu/gutil/map-util.h" |
| #include "kudu/gutil/port.h" |
| #include "kudu/gutil/stl_util.h" |
| #include "kudu/gutil/strings/join.h" |
| #include "kudu/gutil/strings/substitute.h" |
| #include "kudu/integration-tests/cluster_verifier.h" |
| #include "kudu/mini-cluster/external_mini_cluster.h" |
| #include "kudu/util/monotime.h" |
| #include "kudu/util/random.h" |
| #include "kudu/util/status.h" |
| #include "kudu/util/test_macros.h" |
| #include "kudu/util/test_util.h" |
| |
| using kudu::client::KuduClient; |
| using kudu::client::KuduColumnSchema; |
| using kudu::client::KuduColumnStorageAttributes; |
| using kudu::client::KuduError; |
| using kudu::client::KuduScanner; |
| using kudu::client::KuduSchema; |
| using kudu::client::KuduSchemaBuilder; |
| using kudu::client::KuduSession; |
| using kudu::client::KuduTable; |
| using kudu::client::KuduTableAlterer; |
| using kudu::client::KuduTableCreator; |
| using kudu::client::KuduValue; |
| using kudu::client::KuduWriteOperation; |
| using kudu::client::sp::shared_ptr; |
| using kudu::cluster::ExternalMiniCluster; |
| using kudu::cluster::ExternalMiniClusterOptions; |
| using std::make_pair; |
| using std::map; |
| using std::pair; |
| using std::string; |
| using std::unique_ptr; |
| using std::vector; |
| using strings::Substitute; |
| using strings::SubstituteAndAppend; |
| |
| namespace kudu { |
| |
| const char* kTableName = "default.test_table"; |
| const int kMaxColumns = 30; |
| const uint32_t kMaxRangePartitions = 32; |
| const vector<KuduColumnStorageAttributes::CompressionType> kCompressionTypes = |
| { KuduColumnStorageAttributes::NO_COMPRESSION, |
| KuduColumnStorageAttributes::SNAPPY, |
| KuduColumnStorageAttributes::LZ4, |
| KuduColumnStorageAttributes::ZLIB }; |
| const vector <KuduColumnStorageAttributes::EncodingType> kInt32Encodings = |
| { KuduColumnStorageAttributes::PLAIN_ENCODING, |
| KuduColumnStorageAttributes::RLE, |
| KuduColumnStorageAttributes::BIT_SHUFFLE }; |
| // A block size of 0 applies the server-side default. |
| const vector<int32_t> kBlockSizes = {0, 2 * 1024 * 1024, |
| 4 * 1024 * 1024, 8 * 1024 * 1024}; |
| |
| // Parameterized based on HmsMode. |
| class AlterTableRandomized : public KuduTest, |
| public ::testing::WithParamInterface<HmsMode> { |
| public: |
| void SetUp() override { |
| KuduTest::SetUp(); |
| |
| ExternalMiniClusterOptions opts; |
| opts.num_tablet_servers = 3; |
| opts.hms_mode = GetParam(); |
| // This test produces tables with lots of columns. With container preallocation, |
| // we end up using quite a bit of disk space. So, we disable it. |
| opts.extra_tserver_flags.emplace_back("--log_container_preallocate_bytes=0"); |
| cluster_.reset(new ExternalMiniCluster(std::move(opts))); |
| ASSERT_OK(cluster_->Start()); |
| |
| ASSERT_OK(cluster_->CreateClient(nullptr, &client_)); |
| } |
| |
| void TearDown() override { |
| cluster_->Shutdown(); |
| KuduTest::TearDown(); |
| } |
| |
| void RestartTabletServer(int idx) { |
| auto* ts = cluster_->tablet_server(idx); |
| LOG(INFO) << Substitute("Restarting TS $0 (index $1)", ts->uuid(), idx); |
| ts->Shutdown(); |
| CHECK_OK(ts->Restart()); |
| CHECK_OK(cluster_->WaitForTabletsRunning(ts, -1, MonoDelta::FromSeconds(60))); |
| LOG(INFO) << Substitute("TS $0 (index $1) restarted", ts->uuid(), idx); |
| } |
| |
| void RestartMaster() { |
| LOG(INFO) << "Restarting Master"; |
| cluster_->master()->Shutdown(); |
| CHECK_OK(cluster_->master()->Restart()); |
| CHECK_OK(cluster_->master()->WaitForCatalogManager()); |
| CHECK_OK(cluster_->WaitForTabletServerCount(3, MonoDelta::FromSeconds(60))); |
| LOG(INFO) << "Master Restarted"; |
| } |
| |
| protected: |
| unique_ptr<ExternalMiniCluster> cluster_; |
| shared_ptr<KuduClient> client_; |
| }; |
| |
| // Run the test with the HMS integration enabled and disabled. |
| INSTANTIATE_TEST_SUITE_P(HmsConfigurations, AlterTableRandomized, ::testing::ValuesIn( |
| vector<HmsMode> { HmsMode::NONE, HmsMode::ENABLE_METASTORE_INTEGRATION } |
| )); |
| |
| struct RowState { |
| // We use this special value to denote NULL values. |
| // We ensure that we never insert or update to this value except in the case of |
| // NULLable columns. |
| static const int32_t kNullValue = 0xdeadbeef; |
| // We use this special value to denote default values. |
| // We ensure that we never insert or update to this value except when the |
| // column should be assigned its current default value. |
| static const int32_t kDefaultValue = 0xbabecafe; |
| vector<pair<string, int32_t>> cols; |
| |
| string ToString() const { |
| string ret = "("; |
| bool first = true; |
| for (const auto& e : cols) { |
| if (!first) { |
| ret.append(", "); |
| } |
| first = false; |
| if (e.second == kNullValue) { |
| SubstituteAndAppend(&ret, "int32 $0=$1", e.first, "NULL"); |
| } else { |
| SubstituteAndAppend(&ret, "int32 $0=$1", e.first, e.second); |
| } |
| } |
| ret.push_back(')'); |
| return ret; |
| } |
| }; |
| |
| struct TableState { |
| TableState() |
| : rand_(SeedRandom()) { |
| col_names_.emplace_back("key"); |
| col_nullable_.push_back(false); |
| col_defaults_.push_back(0); |
| AddRangePartition(); |
| } |
| |
| int32_t GetRandomNewRowKey() { |
| CHECK(!range_partitions_.empty()); |
| while (true) { |
| auto partition = range_partitions_.begin(); |
| std::advance(partition, rand_.Uniform(range_partitions_.size())); |
| |
| int32_t rowkey = partition->first + rand_.Uniform(partition->second - partition->first); |
| if (!ContainsKey(rows_, rowkey)) { |
| return rowkey; |
| } |
| } |
| } |
| |
| string GetRandomNewColumnName() { |
| while (true) { |
| string name = Substitute("c$0", rand_.Uniform(1000)); |
| if (std::find(col_names_.begin(), col_names_.end(), name) == col_names_.end()) { |
| return name; |
| } |
| } |
| } |
| |
| // Returns the name of a random column not in the primary key. |
| string GetRandomExistingColumnName() { |
| CHECK(col_names_.size() > 1); |
| return col_names_[1 + rand_.Uniform(col_names_.size() - 1)]; |
| } |
| |
| int32_t GetRandomExistingRowKey() { |
| CHECK(!rows_.empty()); |
| |
| auto row = rows_.begin(); |
| std::advance(row, rand_.Uniform(rows_.size())); |
| return row->first; |
| } |
| |
| // Generates a random row. |
| void GenRandomRow(vector<pair<string, int32_t>>* row) { |
| int32_t key = GetRandomNewRowKey(); |
| |
| int32_t seed = rand_.Next(); |
| if (seed == RowState::kNullValue || seed == RowState::kDefaultValue) { |
| seed++; |
| } |
| |
| row->clear(); |
| row->push_back(make_pair(col_names_[0], key)); |
| for (int i = 1; i < col_names_.size(); i++) { |
| int32_t val; |
| if (col_nullable_[i] && seed % 2 == 1) { |
| val = RowState::kNullValue; |
| } else if (seed % 3 == 0) { |
| val = RowState::kDefaultValue; |
| } else { |
| val = seed; |
| } |
| row->push_back(make_pair(col_names_[i], val)); |
| } |
| } |
| |
| bool Insert(const vector<pair<string, int32_t>>& data) { |
| DCHECK_EQ(col_names_[0], data[0].first); |
| int32_t key = data[0].second; |
| if (ContainsKey(rows_, key)) return false; |
| |
| auto r = new RowState; |
| r->cols = data; |
| for (int i = 1; i < r->cols.size(); i++) { |
| if (r->cols[i].second == RowState::kDefaultValue) { |
| r->cols[i].second = col_defaults_[i]; |
| } |
| } |
| rows_[key].reset(r); |
| return true; |
| } |
| |
| bool Update(const vector<pair<string, int32_t>>& data) { |
| DCHECK_EQ(col_names_[0], data[0].first); |
| int32_t key = data[0].second; |
| if (!ContainsKey(rows_, key)) return false; |
| |
| rows_[key]->cols = data; |
| return true; |
| } |
| |
| void Delete(int32_t row_key) { |
| unique_ptr<RowState> r = EraseKeyReturnValuePtr(&rows_, row_key); |
| CHECK(r) << "row key " << row_key << " not found"; |
| } |
| |
| void AddColumnWithDefault(const string& name, int32_t def, bool nullable) { |
| col_names_.push_back(name); |
| col_nullable_.push_back(nullable); |
| col_defaults_.push_back(def); |
| for (auto& e : rows_) { |
| e.second->cols.push_back(make_pair(name, def)); |
| } |
| } |
| |
| void DropColumn(const string& name) { |
| auto col_it = std::find(col_names_.begin(), col_names_.end(), name); |
| int index = col_it - col_names_.begin(); |
| col_names_.erase(col_it); |
| col_nullable_.erase(col_nullable_.begin() + index); |
| col_defaults_.erase(col_defaults_.begin() + index); |
| for (auto& e : rows_) { |
| e.second->cols.erase(e.second->cols.begin() + index); |
| } |
| } |
| |
| void RenameColumn(const string& existing_name, const string& new_name) { |
| auto iter = std::find(col_names_.begin(), col_names_.end(), existing_name); |
| CHECK(iter != col_names_.end()); |
| int index = iter - col_names_.begin(); |
| for (auto& e : rows_) { |
| e.second->cols[index].first = new_name; |
| } |
| *iter = new_name; |
| } |
| |
| void ChangeDefault(const string& name, int32_t new_def) { |
| auto col_it = std::find(col_names_.begin(), col_names_.end(), name); |
| CHECK(col_it != col_names_.end()); |
| col_defaults_[col_it - col_names_.begin()] = new_def; |
| } |
| |
| pair<int32_t, int32_t> AddRangePartition() { |
| CHECK(range_partitions_.size() < kMaxRangePartitions); |
| while (true) { |
| uint32_t width = INT32_MAX / kMaxRangePartitions; |
| int32_t lower_bound = width * rand_.Uniform(kMaxRangePartitions); |
| int32_t upper_bound = lower_bound + width; |
| CHECK(upper_bound > lower_bound); |
| |
| if (InsertIfNotPresent(&range_partitions_, make_pair(lower_bound, upper_bound))) { |
| return make_pair(lower_bound, upper_bound); |
| } |
| } |
| } |
| |
| pair<int32_t, int32_t> DropRangePartition() { |
| CHECK(!range_partitions_.empty()); |
| |
| auto partition = range_partitions_.begin(); |
| std::advance(partition, rand_.Uniform(range_partitions_.size())); |
| |
| int32_t lower_bound = partition->first; |
| int32_t upper_bound = partition->second; |
| |
| range_partitions_.erase(partition); |
| |
| rows_.erase(rows_.lower_bound(lower_bound), rows_.lower_bound(upper_bound)); |
| return make_pair(lower_bound, upper_bound); |
| } |
| |
| void ToStrings(vector<string>* strs) { |
| strs->clear(); |
| for (const auto& e : rows_) { |
| strs->push_back(e.second->ToString()); |
| } |
| } |
| |
| // The name of each column. |
| vector<string> col_names_; |
| |
| // For each column, whether it is NULLable. |
| // Has the same length as col_names_. |
| vector<bool> col_nullable_; |
| |
| // For each column, its current write default. |
| // Has the same length as col_names_. |
| vector<int32_t> col_defaults_; |
| |
| map<int32_t, unique_ptr<RowState>> rows_; |
| |
| // The lower and upper bounds of all range partitions in the table. |
| map<int32_t, int32_t> range_partitions_; |
| |
| Random rand_; |
| }; |
| |
| struct MirrorTable { |
| explicit MirrorTable(shared_ptr<KuduClient> client) |
| : client_(std::move(client)) {} |
| |
| Status Create() { |
| KuduSchema schema; |
| KuduSchemaBuilder b; |
| b.AddColumn("key")->Type(KuduColumnSchema::INT32)->NotNull()->PrimaryKey(); |
| CHECK_OK(b.Build(&schema)); |
| unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator()); |
| table_creator->table_name(kTableName) |
| .schema(&schema) |
| .set_range_partition_columns({ "key" }) |
| .num_replicas(3); |
| |
| for (const auto& partition : ts_.range_partitions_) { |
| unique_ptr<KuduPartialRow> lower(schema.NewRow()); |
| unique_ptr<KuduPartialRow> upper(schema.NewRow()); |
| RETURN_NOT_OK(lower->SetInt32("key", partition.first)); |
| RETURN_NOT_OK(upper->SetInt32("key", partition.second)); |
| table_creator->add_range_partition(lower.release(), upper.release()); |
| } |
| |
| return table_creator->Create(); |
| } |
| |
| void InsertRandomRow() { |
| vector<pair<string, int32_t>> row; |
| ts_.GenRandomRow(&row); |
| Status s = DoRealOp(row, INSERT); |
| if (s.IsAlreadyPresent()) { |
| CHECK(!ts_.Insert(row)) << "real table said already-present, fake table succeeded"; |
| } |
| CHECK_OK(s); |
| |
| CHECK(ts_.Insert(row)); |
| } |
| |
| void DeleteRandomRow() { |
| if (ts_.rows_.empty()) return; |
| int32_t row_key = ts_.GetRandomExistingRowKey(); |
| vector<pair<string, int32_t>> del; |
| del.push_back(make_pair(ts_.col_names_[0], row_key)); |
| CHECK_OK(DoRealOp(del, DELETE)); |
| |
| ts_.Delete(row_key); |
| } |
| |
| ATTRIBUTE_NO_SANITIZE_INTEGER |
| void UpdateRandomRow(uint32_t rand) { |
| if (ts_.rows_.empty()) return; |
| int32_t row_key = ts_.GetRandomExistingRowKey(); |
| |
| vector<pair<string, int32_t>> update; |
| update.push_back(make_pair(ts_.col_names_[0], row_key)); |
| for (int i = 1; i < num_columns(); i++) { |
| // This is expected to overflow. |
| int32_t val = rand * i; |
| if (val == RowState::kNullValue) val++; |
| if (val == RowState::kDefaultValue) val++; |
| if (ts_.col_nullable_[i] && val % 2 == 1) { |
| val = RowState::kNullValue; |
| } |
| update.push_back(make_pair(ts_.col_names_[i], val)); |
| } |
| |
| if (update.size() == 1) { |
| // No columns got updated. Just ignore this update. |
| return; |
| } |
| |
| Status s = DoRealOp(update, UPDATE); |
| if (s.IsNotFound()) { |
| CHECK(!ts_.Update(update)) << "real table said not-found, fake table succeeded"; |
| return; |
| } |
| CHECK_OK(s); |
| |
| CHECK(ts_.Update(update)); |
| } |
| |
| void RandomAlterTable() { |
| LOG(INFO) << "Beginning Alterations"; |
| |
| // This schema must outlive the table alterer, since the rows in add/drop |
| // range partition hold a pointer to it. |
| KuduSchema schema; |
| CHECK_OK(client_->GetTableSchema(kTableName, &schema)); |
| unique_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer(kTableName)); |
| |
| int step_count = 1 + ts_.rand_.Uniform(10); |
| for (int step = 0; step < step_count; step++) { |
| int r = ts_.rand_.Uniform(9); |
| if (r < 1 && num_columns() < kMaxColumns) { |
| AddAColumn(table_alterer.get()); |
| } else if (r < 2 && num_columns() > 1) { |
| DropAColumn(table_alterer.get()); |
| } else if (num_range_partitions() == 0 || |
| (r < 3 && num_range_partitions() < kMaxRangePartitions)) { |
| AddARangePartition(schema, table_alterer.get()); |
| } else if (r < 4 && num_columns() > 1) { |
| RenameAColumn(table_alterer.get()); |
| } else if (r < 5 && num_columns() > 1) { |
| RenamePrimaryKeyColumn(table_alterer.get()); |
| } else if (r < 6 && num_columns() > 1) { |
| RenameAColumn(table_alterer.get()); |
| } else if (r < 7 && num_columns() > 1) { |
| ChangeADefault(table_alterer.get()); |
| } else if (r < 8 && num_columns() > 1) { |
| ChangeAStorageAttribute(table_alterer.get()); |
| } else { |
| DropARangePartition(schema, table_alterer.get()); |
| } |
| } |
| LOG(INFO) << "Committing Alterations"; |
| CHECK_OK(table_alterer->Alter()); |
| } |
| |
| void AddAColumn(KuduTableAlterer* table_alterer) { |
| string name = ts_.GetRandomNewColumnName(); |
| LOG(INFO) << "Adding column " << name << ", existing columns: " |
| << JoinStrings(ts_.col_names_, ", "); |
| |
| int32_t default_value = rand(); |
| bool nullable = rand() % 2 == 1; |
| |
| // Add to the real table. |
| if (nullable) { |
| default_value = RowState::kNullValue; |
| table_alterer->AddColumn(name)->Type(KuduColumnSchema::INT32); |
| } else { |
| table_alterer->AddColumn(name)->Type(KuduColumnSchema::INT32)->NotNull() |
| ->Default(KuduValue::FromInt(default_value)); |
| } |
| |
| // Add to the mirror state. |
| ts_.AddColumnWithDefault(name, default_value, nullable); |
| } |
| |
| void DropAColumn(KuduTableAlterer* table_alterer) { |
| string name = ts_.GetRandomExistingColumnName(); |
| LOG(INFO) << "Dropping column " << name << ", existing columns: " |
| << JoinStrings(ts_.col_names_, ", "); |
| table_alterer->DropColumn(name); |
| ts_.DropColumn(name); |
| } |
| |
| void RenameAColumn(KuduTableAlterer* table_alterer) { |
| string original_name = ts_.GetRandomExistingColumnName(); |
| string new_name = ts_.GetRandomNewColumnName(); |
| LOG(INFO) << "Renaming column " << original_name << " to " << new_name; |
| table_alterer->AlterColumn(original_name)->RenameTo(new_name); |
| ts_.RenameColumn(original_name, new_name); |
| } |
| |
| void RenamePrimaryKeyColumn(KuduTableAlterer* table_alterer) { |
| string new_name = ts_.GetRandomNewColumnName(); |
| LOG(INFO) << "Renaming PrimaryKey column " << ts_.col_names_[0] << " to " << new_name; |
| table_alterer->AlterColumn(ts_.col_names_[0])->RenameTo(new_name); |
| ts_.RenameColumn(ts_.col_names_[0], new_name); |
| } |
| |
| void ChangeADefault(KuduTableAlterer* table_alterer) { |
| string name = ts_.GetRandomExistingColumnName(); |
| int32_t new_def = ts_.rand_.Next(); |
| if (new_def == RowState::kNullValue || new_def == RowState::kDefaultValue) { |
| new_def++; |
| } |
| LOG(INFO) << "Changing default of column " << name << " to " << new_def; |
| table_alterer->AlterColumn(name)->Default(KuduValue::FromInt(new_def)); |
| ts_.ChangeDefault(name, new_def); |
| } |
| |
| void ChangeAStorageAttribute(KuduTableAlterer* table_alterer) { |
| string name = ts_.GetRandomExistingColumnName(); |
| int type = ts_.rand_.Uniform(3); |
| if (type == 0) { |
| int i = ts_.rand_.Uniform(kCompressionTypes.size()); |
| LOG(INFO) << "Changing compression of column " << name << |
| " to " << kCompressionTypes[i]; |
| table_alterer->AlterColumn(name)->Compression(kCompressionTypes[i]); |
| } else if (type == 1) { |
| int i = ts_.rand_.Uniform(kInt32Encodings.size()); |
| LOG(INFO) << "Changing encoding of column " << name << |
| " to " << kInt32Encodings[i]; |
| table_alterer->AlterColumn(name)->Encoding(kInt32Encodings[i]); |
| } else { |
| int i = ts_.rand_.Uniform(kBlockSizes.size()); |
| LOG(INFO) << "Changing block size of column " << name << |
| " to " << kBlockSizes[i]; |
| table_alterer->AlterColumn(name)->BlockSize(kBlockSizes[i]); |
| } |
| } |
| |
| void AddARangePartition(const KuduSchema& schema, KuduTableAlterer* table_alterer) { |
| auto bounds = ts_.AddRangePartition(); |
| LOG(INFO) << "Adding range partition: [" << bounds.first << ", " << bounds.second << ")" |
| << " resulting partitions: (" |
| << JoinKeysAndValuesIterator(ts_.range_partitions_.begin(), |
| ts_.range_partitions_.end(), |
| ", ", "], (") << ")"; |
| |
| KuduTableCreator::RangePartitionBound lower_bound_type = KuduTableCreator::INCLUSIVE_BOUND; |
| KuduTableCreator::RangePartitionBound upper_bound_type = KuduTableCreator::EXCLUSIVE_BOUND; |
| int32_t lower_bound_value = bounds.first; |
| int32_t upper_bound_value = bounds.second; |
| |
| if (ts_.rand_.OneIn(2) && lower_bound_value > INT32_MIN) { |
| lower_bound_type = KuduTableCreator::EXCLUSIVE_BOUND; |
| lower_bound_value -= 1; |
| } |
| |
| if (ts_.rand_.OneIn(2)) { |
| upper_bound_type = KuduTableCreator::INCLUSIVE_BOUND; |
| upper_bound_value -= 1; |
| } |
| |
| unique_ptr<KuduPartialRow> lower_bound(schema.NewRow()); |
| CHECK_OK(lower_bound->SetInt32(schema.Column(0).name(), lower_bound_value)); |
| unique_ptr<KuduPartialRow> upper_bound(schema.NewRow()); |
| CHECK_OK(upper_bound->SetInt32(schema.Column(0).name(), upper_bound_value)); |
| |
| table_alterer->AddRangePartition(lower_bound.release(), upper_bound.release(), |
| lower_bound_type, upper_bound_type); |
| } |
| |
| void DropARangePartition(KuduSchema& schema, KuduTableAlterer* table_alterer) { |
| auto bounds = ts_.DropRangePartition(); |
| LOG(INFO) << "Dropping range partition: [" << bounds.first << ", " << bounds.second << ")" |
| << " resulting partitions: (" |
| << JoinKeysAndValuesIterator(ts_.range_partitions_.begin(), |
| ts_.range_partitions_.end(), |
| ", ", "], (") << ")"; |
| |
| unique_ptr<KuduPartialRow> lower_bound(schema.NewRow()); |
| CHECK_OK(lower_bound->SetInt32(schema.Column(0).name(), bounds.first)); |
| unique_ptr<KuduPartialRow> upper_bound(schema.NewRow()); |
| CHECK_OK(upper_bound->SetInt32(schema.Column(0).name(), bounds.second)); |
| |
| table_alterer->DropRangePartition(lower_bound.release(), upper_bound.release()); |
| } |
| |
| int num_columns() const { |
| return ts_.col_names_.size(); |
| } |
| |
| int num_rows() const { |
| return ts_.rows_.size(); |
| } |
| |
| int num_range_partitions() const { |
| return ts_.range_partitions_.size(); |
| } |
| |
| void Verify() { |
| LOG(INFO) << "Verifying " << ts_.rows_.size() << " rows in " |
| << ts_.range_partitions_.size() << " tablets"; |
| // First scan the real table |
| vector<string> rows; |
| { |
| shared_ptr<KuduTable> table; |
| CHECK_OK(client_->OpenTable(kTableName, &table)); |
| KuduScanner scanner(table.get()); |
| ASSERT_OK(scanner.SetSelection(KuduClient::LEADER_ONLY)); |
| ASSERT_OK(scanner.SetFaultTolerant()); |
| scanner.SetTimeoutMillis(60000); |
| ASSERT_OK(ScanToStrings(&scanner, &rows)); |
| } |
| |
| // Then get our mock table. |
| vector<string> expected; |
| ts_.ToStrings(&expected); |
| |
| // They should look the same. |
| ASSERT_EQ(expected, rows); |
| } |
| |
| private: |
| enum OpType { |
| INSERT, UPDATE, DELETE |
| }; |
| |
| Status DoRealOp(const vector<pair<string, int32_t>>& data, OpType op_type) { |
| VLOG(1) << "Applying op " << op_type << " " << data[0].first << ": " << data[0].second; |
| shared_ptr<KuduSession> session = client_->NewSession(); |
| shared_ptr<KuduTable> table; |
| RETURN_NOT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH)); |
| session->SetTimeoutMillis(60 * 1000); |
| RETURN_NOT_OK(client_->OpenTable(kTableName, &table)); |
| unique_ptr<KuduWriteOperation> op; |
| switch (op_type) { |
| case INSERT: op.reset(table->NewInsert()); break; |
| case UPDATE: op.reset(table->NewUpdate()); break; |
| case DELETE: op.reset(table->NewDelete()); break; |
| } |
| for (int i = 0; i < data.size(); i++) { |
| const auto& d = data[i]; |
| if (d.second == RowState::kNullValue) { |
| CHECK_OK(op->mutable_row()->SetNull(d.first)); |
| } else if (d.second == RowState::kDefaultValue) { |
| if (ts_.col_defaults_[i] == RowState::kNullValue) { |
| CHECK_OK(op->mutable_row()->SetNull(d.first)); |
| } else { |
| CHECK_OK(op->mutable_row()->SetInt32(d.first, ts_.col_defaults_[i])); |
| } |
| } else { |
| CHECK_OK(op->mutable_row()->SetInt32(d.first, d.second)); |
| } |
| } |
| RETURN_NOT_OK(session->Apply(op.release())); |
| Status s = session->Flush(); |
| if (s.ok()) { |
| return s; |
| } |
| |
| std::vector<KuduError*> errors; |
| ElementDeleter d(&errors); |
| bool overflow; |
| session->GetPendingErrors(&errors, &overflow); |
| CHECK_EQ(errors.size(), 1); |
| return errors[0]->status(); |
| } |
| |
| shared_ptr<KuduClient> client_; |
| TableState ts_; |
| }; |
| |
| // Stress test for various alter table scenarios. This performs a random |
| // sequence of the following operations: |
| // - Restart the master |
| // - Restart a random tablet server |
| // - Alter the table |
| // - Add a column |
| // - Drop a column |
| // - Add a range partition |
| // - Drop a range partition |
| // - Rename a column (including primary key columns) |
| // - Change a column's default value |
| // - Change the encoding or compression of a column |
| // - Insert a row (using the latest schema) |
| // - Delete a random row |
| // - Update a row (all columns with the latest schema) |
| // |
| // During the sequence of operations, a "mirror" of the table in memory is kept |
| // up to date. We periodically scan the actual table, and ensure that the data |
| // in Kudu matches our in-memory "mirror". |
| TEST_P(AlterTableRandomized, TestRandomSequence) { |
| MirrorTable t(client_); |
| ASSERT_OK(t.Create()); |
| |
| Random rng(SeedRandom()); |
| |
| const int n_iters = AllowSlowTests() ? 2000 : 1000; |
| for (int i = 0; i < n_iters; i++) { |
| // Perform different operations with varying probability. |
| // We mostly insert and update, with occasional deletes, |
| // and more occasional table alterations or restarts. |
| |
| int r = rng.Uniform(1000); |
| |
| if (r < 3) { |
| RestartMaster(); |
| } else if (r < 10) { |
| RestartTabletServer(rng.Uniform(cluster_->num_tablet_servers())); |
| } else if (r < 35 || t.num_range_partitions() == 0) { |
| t.RandomAlterTable(); |
| } else if (r < 500) { |
| t.InsertRandomRow(); |
| } else if (r < 750) { |
| t.DeleteRandomRow(); |
| } else { |
| t.UpdateRandomRow(rng.Next()); |
| } |
| |
| if (i % 1000 == 0) { |
| NO_FATALS(t.Verify()); |
| } |
| } |
| |
| NO_FATALS(t.Verify()); |
| |
| // Not only should the data returned by a scanner match what we expect, |
| // we also expect all of the replicas to agree with each other. |
| LOG(INFO) << "Verifying cluster"; |
| ClusterVerifier v(cluster_.get()); |
| NO_FATALS(v.CheckCluster()); |
| } |
| |
| } // namespace kudu |