blob: cbf1bfdf4c71008009cfa13d6957091ddd494365 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <algorithm>
#include <map>
#include <vector>
#include "kudu/client/client-test-util.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/integration-tests/cluster_verifier.h"
#include "kudu/integration-tests/external_mini_cluster.h"
#include "kudu/util/random.h"
#include "kudu/util/random_util.h"
#include "kudu/util/test_util.h"
namespace kudu {
using client::KuduClient;
using client::KuduClientBuilder;
using client::KuduColumnSchema;
using client::KuduError;
using client::KuduInsert;
using client::KuduSchema;
using client::KuduSchemaBuilder;
using client::KuduSession;
using client::KuduTable;
using client::KuduTableAlterer;
using client::KuduTableCreator;
using client::KuduValue;
using client::KuduWriteOperation;
using client::sp::shared_ptr;
using std::make_pair;
using std::map;
using std::pair;
using std::vector;
using strings::SubstituteAndAppend;
const char* kTableName = "test-table";
const int kMaxColumns = 30;
class AlterTableRandomized : public KuduTest {
public:
virtual void SetUp() OVERRIDE {
KuduTest::SetUp();
ExternalMiniClusterOptions opts;
opts.num_tablet_servers = 3;
// Because this test performs a lot of alter tables, we end up flushing
// and rewriting metadata files quite a bit. Globally disabling fsync
// speeds the test runtime up dramatically.
opts.extra_tserver_flags.push_back("--never_fsync");
// This test produces tables with lots of columns. With container preallocation,
// we end up using quite a bit of disk space. So, we disable it.
opts.extra_tserver_flags.push_back("--log_container_preallocate_bytes=0");
cluster_.reset(new ExternalMiniCluster(opts));
ASSERT_OK(cluster_->Start());
KuduClientBuilder builder;
ASSERT_OK(cluster_->CreateClient(builder, &client_));
}
virtual void TearDown() OVERRIDE {
cluster_->Shutdown();
KuduTest::TearDown();
}
void RestartTabletServer(int idx) {
LOG(INFO) << "Restarting TS " << idx;
cluster_->tablet_server(idx)->Shutdown();
CHECK_OK(cluster_->tablet_server(idx)->Restart());
CHECK_OK(cluster_->WaitForTabletsRunning(cluster_->tablet_server(idx),
MonoDelta::FromSeconds(60)));
}
protected:
gscoped_ptr<ExternalMiniCluster> cluster_;
shared_ptr<KuduClient> client_;
};
struct RowState {
// We use this special value to denote NULL values.
// We ensure that we never insert or update to this value except in the case of
// NULLable columns.
static const int32_t kNullValue = 0xdeadbeef;
vector<pair<string, int32_t> > cols;
string ToString() const {
string ret = "(";
typedef pair<string, int32_t> entry;
bool first = true;
for (const entry& e : cols) {
if (!first) {
ret.append(", ");
}
first = false;
if (e.second == kNullValue) {
SubstituteAndAppend(&ret, "int32 $0=$1", e.first, "NULL");
} else {
SubstituteAndAppend(&ret, "int32 $0=$1", e.first, e.second);
}
}
ret.push_back(')');
return ret;
}
};
struct TableState {
TableState() {
col_names_.push_back("key");
col_nullable_.push_back(false);
}
~TableState() {
STLDeleteValues(&rows_);
}
void GenRandomRow(int32_t key, int32_t seed,
vector<pair<string, int32_t> >* row) {
if (seed == RowState::kNullValue) {
seed++;
}
row->clear();
row->push_back(make_pair("key", key));
for (int i = 1; i < col_names_.size(); i++) {
int32_t val;
if (col_nullable_[i] && seed % 2 == 1) {
val = RowState::kNullValue;
} else {
val = seed;
}
row->push_back(make_pair(col_names_[i], val));
}
}
bool Insert(const vector<pair<string, int32_t> >& data) {
DCHECK_EQ("key", data[0].first);
int32_t key = data[0].second;
if (ContainsKey(rows_, key)) return false;
auto r = new RowState;
r->cols = data;
rows_[key] = r;
return true;
}
bool Update(const vector<pair<string, int32_t> >& data) {
DCHECK_EQ("key", data[0].first);
int32_t key = data[0].second;
if (!ContainsKey(rows_, key)) return false;
RowState* r = rows_[key];
r->cols = data;
return true;
}
void Delete(int32_t row_key) {
RowState* r = EraseKeyReturnValuePtr(&rows_, row_key);
CHECK(r) << "row key " << row_key << " not found";
delete r;
}
void AddColumnWithDefault(const string& name, int32_t def, bool nullable) {
col_names_.push_back(name);
col_nullable_.push_back(nullable);
for (entry& e : rows_) {
e.second->cols.push_back(make_pair(name, def));
}
}
void DropColumn(const string& name) {
auto col_it = std::find(col_names_.begin(), col_names_.end(), name);
int index = col_it - col_names_.begin();
col_names_.erase(col_it);
col_nullable_.erase(col_nullable_.begin() + index);
for (entry& e : rows_) {
e.second->cols.erase(e.second->cols.begin() + index);
}
}
int32_t GetRandomRowKey(int32_t rand) {
CHECK(!rows_.empty());
int idx = rand % rows_.size();
map<int32_t, RowState*>::const_iterator it = rows_.begin();
for (int i = 0; i < idx; i++) {
++it;
}
return it->first;
}
void ToStrings(vector<string>* strs) {
strs->clear();
for (const entry& e : rows_) {
strs->push_back(e.second->ToString());
}
}
// The name of each column.
vector<string> col_names_;
// For each column, whether it is NULLable.
// Has the same length as col_names_.
vector<bool> col_nullable_;
typedef pair<const int32_t, RowState*> entry;
map<int32_t, RowState*> rows_;
};
struct MirrorTable {
explicit MirrorTable(shared_ptr<KuduClient> client)
: client_(std::move(client)) {}
Status Create() {
KuduSchema schema;
KuduSchemaBuilder b;
b.AddColumn("key")->Type(KuduColumnSchema::INT32)->NotNull()->PrimaryKey();
CHECK_OK(b.Build(&schema));
gscoped_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
RETURN_NOT_OK(table_creator->table_name(kTableName)
.schema(&schema)
.num_replicas(3)
.Create());
return Status::OK();
}
bool TryInsert(int32_t row_key, int32_t rand) {
vector<pair<string, int32_t> > row;
ts_.GenRandomRow(row_key, rand, &row);
Status s = DoRealOp(row, INSERT);
if (s.IsAlreadyPresent()) {
CHECK(!ts_.Insert(row)) << "real table said already-present, fake table succeeded";
return false;
}
CHECK_OK(s);
CHECK(ts_.Insert(row));
return true;
}
void DeleteRandomRow(uint32_t rand) {
if (ts_.rows_.empty()) return;
int32_t row_key = ts_.GetRandomRowKey(rand);
vector<pair<string, int32_t> > del;
del.push_back(make_pair("key", row_key));
CHECK_OK(DoRealOp(del, DELETE));
ts_.Delete(row_key);
}
void UpdateRandomRow(uint32_t rand) {
if (ts_.rows_.empty()) return;
int32_t row_key = ts_.GetRandomRowKey(rand);
vector<pair<string, int32_t> > update;
update.push_back(make_pair("key", row_key));
for (int i = 1; i < num_columns(); i++) {
int32_t val = rand * i;
if (val == RowState::kNullValue) val++;
if (ts_.col_nullable_[i] && val % 2 == 1) {
val = RowState::kNullValue;
}
update.push_back(make_pair(ts_.col_names_[i], val));
}
if (update.size() == 1) {
// No columns got updated. Just ignore this update.
return;
}
Status s = DoRealOp(update, UPDATE);
if (s.IsNotFound()) {
CHECK(!ts_.Update(update)) << "real table said not-found, fake table succeeded";
return;
}
CHECK_OK(s);
CHECK(ts_.Update(update));
}
void AddAColumn(const string& name) {
int32_t default_value = rand();
bool nullable = rand() % 2 == 1;
// Add to the real table.
gscoped_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer(kTableName));
if (nullable) {
default_value = RowState::kNullValue;
table_alterer->AddColumn(name)->Type(KuduColumnSchema::INT32);
} else {
table_alterer->AddColumn(name)->Type(KuduColumnSchema::INT32)->NotNull()
->Default(KuduValue::FromInt(default_value));
}
ASSERT_OK(table_alterer->Alter());
// Add to the mirror state.
ts_.AddColumnWithDefault(name, default_value, nullable);
}
void DropAColumn(const string& name) {
gscoped_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer(kTableName));
CHECK_OK(table_alterer->DropColumn(name)->Alter());
ts_.DropColumn(name);
}
void DropRandomColumn(int seed) {
if (num_columns() == 1) return;
string name = ts_.col_names_[1 + (seed % (num_columns() - 1))];
DropAColumn(name);
}
int num_columns() const {
return ts_.col_names_.size();
}
void Verify() {
// First scan the real table
vector<string> rows;
{
shared_ptr<KuduTable> table;
CHECK_OK(client_->OpenTable(kTableName, &table));
client::ScanTableToStrings(table.get(), &rows);
}
std::sort(rows.begin(), rows.end());
// Then get our mock table.
vector<string> expected;
ts_.ToStrings(&expected);
// They should look the same.
ASSERT_EQ(rows, expected);
}
private:
enum OpType {
INSERT, UPDATE, DELETE
};
Status DoRealOp(const vector<pair<string, int32_t> >& data,
OpType op_type) {
shared_ptr<KuduSession> session = client_->NewSession();
shared_ptr<KuduTable> table;
RETURN_NOT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
session->SetTimeoutMillis(15 * 1000);
RETURN_NOT_OK(client_->OpenTable(kTableName, &table));
gscoped_ptr<KuduWriteOperation> op;
switch (op_type) {
case INSERT: op.reset(table->NewInsert()); break;
case UPDATE: op.reset(table->NewUpdate()); break;
case DELETE: op.reset(table->NewDelete()); break;
}
for (const auto& d : data) {
if (d.second == RowState::kNullValue) {
CHECK_OK(op->mutable_row()->SetNull(d.first));
} else {
CHECK_OK(op->mutable_row()->SetInt32(d.first, d.second));
}
}
RETURN_NOT_OK(session->Apply(op.release()));
Status s = session->Flush();
if (s.ok()) {
return s;
}
std::vector<KuduError*> errors;
ElementDeleter d(&errors);
bool overflow;
session->GetPendingErrors(&errors, &overflow);
CHECK_EQ(errors.size(), 1);
return errors[0]->status();
}
shared_ptr<KuduClient> client_;
TableState ts_;
};
// Stress test for various alter table scenarios. This performs a random sequence of:
// - insert a row (using the latest schema)
// - delete a random row
// - update a row (all columns with the latest schema)
// - add a new column
// - drop a column
// - restart the tablet server
//
// During the sequence of operations, a "mirror" of the table in memory is kept up to
// date. We periodically scan the actual table, and ensure that the data in Kudu
// matches our in-memory "mirror".
TEST_F(AlterTableRandomized, TestRandomSequence) {
MirrorTable t(client_);
ASSERT_OK(t.Create());
Random rng(SeedRandom());
const int n_iters = AllowSlowTests() ? 2000 : 1000;
for (int i = 0; i < n_iters; i++) {
// Perform different operations with varying probability.
// We mostly insert and update, with occasional deletes,
// and more occasional table alterations or restarts.
int r = rng.Uniform(1000);
if (r < 400) {
t.TryInsert(1000000 + rng.Uniform(1000000), rng.Next());
} else if (r < 600) {
t.UpdateRandomRow(rng.Next());
} else if (r < 920) {
t.DeleteRandomRow(rng.Next());
} else if (r < 970) {
if (t.num_columns() < kMaxColumns) {
t.AddAColumn(strings::Substitute("c$0", i));
}
} else if (r < 995) {
t.DropRandomColumn(rng.Next());
} else {
RestartTabletServer(rng.Uniform(cluster_->num_tablet_servers()));
}
if (i % 1000 == 0) {
NO_FATALS(t.Verify());
}
}
NO_FATALS(t.Verify());
// Not only should the data returned by a scanner match what we expect,
// we also expect all of the replicas to agree with each other.
ClusterVerifier v(cluster_.get());
NO_FATALS(v.CheckCluster());
}
} // namespace kudu