blob: 2e76d550e16475ad8c6573acc86a83347ccc4a13 [file] [log] [blame]
// Copyright 2014 Cloudera, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <gtest/gtest.h>
#include <boost/assign/list_of.hpp>
#include "kudu/client/client.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/integration-tests/mini_cluster.h"
#include "kudu/master/mini_master.h"
#include "kudu/tools/data_gen_util.h"
#include "kudu/tools/ksck_remote.h"
#include "kudu/util/monotime.h"
#include "kudu/util/random.h"
#include "kudu/util/test_util.h"
DECLARE_int32(heartbeat_interval_ms);
namespace kudu {
namespace tools {
using client::KuduColumnSchema;
using client::KuduInsert;
using client::KuduSession;
using client::KuduSchemaBuilder;
using client::KuduTable;
using client::KuduTableCreator;
using std::tr1::static_pointer_cast;
using std::tr1::shared_ptr;
using std::vector;
using std::string;
using strings::Substitute;
static const char *kTableName = "ksck-test-table";
class RemoteKsckTest : public KuduTest {
public:
RemoteKsckTest()
: random_(SeedRandom()) {
KuduSchemaBuilder b;
b.AddColumn("key")->Type(KuduColumnSchema::INT32)->NotNull()->PrimaryKey();
b.AddColumn("int_val")->Type(KuduColumnSchema::INT32)->NotNull();
CHECK_OK(b.Build(&schema_));
}
virtual void SetUp() OVERRIDE {
KuduTest::SetUp();
// Speed up testing, saves about 700ms per TEST_F.
FLAGS_heartbeat_interval_ms = 10;
MiniClusterOptions opts;
opts.num_tablet_servers = 3;
mini_cluster_.reset(new MiniCluster(env_.get(), opts));
ASSERT_OK(mini_cluster_->Start());
master_rpc_addr_ = mini_cluster_->mini_master()->bound_rpc_addr();
// Connect to the cluster.
ASSERT_OK(client::KuduClientBuilder()
.add_master_server_addr(master_rpc_addr_.ToString())
.Build(&client_));
// Create one table.
gscoped_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
ASSERT_OK(table_creator->table_name(kTableName)
.schema(&schema_)
.num_replicas(3)
.split_rows(GenerateSplitRows())
.Create());
// Make sure we can open the table.
ASSERT_OK(client_->OpenTable(kTableName, &client_table_));
ASSERT_OK(RemoteKsckMaster::Build(master_rpc_addr_, &master_));
cluster_.reset(new KsckCluster(master_));
ksck_.reset(new Ksck(cluster_));
}
virtual void TearDown() OVERRIDE {
if (mini_cluster_) {
mini_cluster_->Shutdown();
mini_cluster_.reset();
}
KuduTest::TearDown();
}
// Writes rows to the table until the continue_writing flag is set to false.
//
// Public for use with boost::bind.
void GenerateRowWritesLoop(CountDownLatch* started_writing,
const AtomicBool& continue_writing,
Promise<Status>* promise) {
shared_ptr<KuduTable> table;
Status status;
status = client_->OpenTable(kTableName, &table);
if (!status.ok()) {
promise->Set(status);
}
shared_ptr<KuduSession> session(client_->NewSession());
session->SetTimeoutMillis(10000);
status = session->SetFlushMode(KuduSession::MANUAL_FLUSH);
if (!status.ok()) {
promise->Set(status);
}
for (uint64_t i = 0; continue_writing.Load(); i++) {
gscoped_ptr<KuduInsert> insert(table->NewInsert());
GenerateDataForRow(table->schema(), i, &random_, insert->mutable_row());
status = session->Apply(insert.release());
if (!status.ok()) {
promise->Set(status);
}
status = session->Flush();
if (!status.ok()) {
promise->Set(status);
}
started_writing->CountDown(1);
}
promise->Set(Status::OK());
}
protected:
// Generate a set of split rows for tablets used in this test.
vector<const KuduPartialRow*> GenerateSplitRows() {
vector<const KuduPartialRow*> split_rows;
vector<int> split_nums = boost::assign::list_of(33)(66);
BOOST_FOREACH(int i, split_nums) {
KuduPartialRow* row = schema_.NewRow();
CHECK_OK(row->SetInt32(0, i));
split_rows.push_back(row);
}
return split_rows;
}
Status GenerateRowWrites(uint64_t num_rows) {
shared_ptr<KuduTable> table;
RETURN_NOT_OK(client_->OpenTable(kTableName, &table));
shared_ptr<KuduSession> session(client_->NewSession());
session->SetTimeoutMillis(10000);
RETURN_NOT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
for (uint64_t i = 0; i < num_rows; i++) {
VLOG(1) << "Generating write for row id " << i;
gscoped_ptr<KuduInsert> insert(table->NewInsert());
GenerateDataForRow(table->schema(), i, &random_, insert->mutable_row());
RETURN_NOT_OK(session->Apply(insert.release()));
if (i > 0 && i % 1000 == 0) {
RETURN_NOT_OK(session->Flush());
}
}
RETURN_NOT_OK(session->Flush());
return Status::OK();
}
shared_ptr<Ksck> ksck_;
shared_ptr<client::KuduClient> client_;
private:
Sockaddr master_rpc_addr_;
shared_ptr<MiniCluster> mini_cluster_;
client::KuduSchema schema_;
shared_ptr<client::KuduTable> client_table_;
shared_ptr<KsckMaster> master_;
shared_ptr<KsckCluster> cluster_;
Random random_;
};
TEST_F(RemoteKsckTest, TestMasterOk) {
ASSERT_OK(ksck_->CheckMasterRunning());
}
TEST_F(RemoteKsckTest, TestTabletServersOk) {
LOG(INFO) << "Fetching table and tablet info...";
ASSERT_OK(ksck_->FetchTableAndTabletInfo());
LOG(INFO) << "Checking tablet servers are running...";
ASSERT_OK(ksck_->CheckTabletServersRunning());
}
TEST_F(RemoteKsckTest, TestTableConsistency) {
MonoTime deadline = MonoTime::Now(MonoTime::FINE);
deadline.AddDelta(MonoDelta::FromSeconds(30));
Status s;
while (MonoTime::Now(MonoTime::FINE).ComesBefore(deadline)) {
ASSERT_OK(ksck_->FetchTableAndTabletInfo());
s = ksck_->CheckTablesConsistency();
if (s.ok()) {
break;
}
SleepFor(MonoDelta::FromMilliseconds(10));
}
ASSERT_OK(s);
}
TEST_F(RemoteKsckTest, TestChecksum) {
uint64_t num_writes = 100;
LOG(INFO) << "Generating row writes...";
ASSERT_OK(GenerateRowWrites(num_writes));
MonoTime deadline = MonoTime::Now(MonoTime::FINE);
deadline.AddDelta(MonoDelta::FromSeconds(30));
Status s;
while (MonoTime::Now(MonoTime::FINE).ComesBefore(deadline)) {
ASSERT_OK(ksck_->FetchTableAndTabletInfo());
s = ksck_->ChecksumData(vector<string>(),
vector<string>(),
ChecksumOptions(MonoDelta::FromSeconds(1), 16, false, 0));
if (s.ok()) {
break;
}
SleepFor(MonoDelta::FromMilliseconds(10));
}
ASSERT_OK(s);
}
TEST_F(RemoteKsckTest, TestChecksumTimeout) {
uint64_t num_writes = 10000;
LOG(INFO) << "Generating row writes...";
ASSERT_OK(GenerateRowWrites(num_writes));
ASSERT_OK(ksck_->FetchTableAndTabletInfo());
// Use an impossibly low timeout value of zero!
Status s = ksck_->ChecksumData(vector<string>(),
vector<string>(),
ChecksumOptions(MonoDelta::FromNanoseconds(0), 16, false, 0));
ASSERT_TRUE(s.IsTimedOut()) << "Expected TimedOut Status, got: " << s.ToString();
}
TEST_F(RemoteKsckTest, TestChecksumSnapshot) {
CountDownLatch started_writing(1);
AtomicBool continue_writing(true);
Promise<Status> promise;
scoped_refptr<Thread> writer_thread;
Thread::Create("RemoteKsckTest", "TestChecksumSnapshot",
&RemoteKsckTest::GenerateRowWritesLoop, this,
&started_writing, boost::cref(continue_writing), &promise,
&writer_thread);
CHECK(started_writing.WaitFor(MonoDelta::FromSeconds(30)));
uint64_t ts = client_->GetLatestObservedTimestamp();
MonoTime start(MonoTime::Now(MonoTime::FINE));
MonoTime deadline = start;
deadline.AddDelta(MonoDelta::FromSeconds(30));
Status s;
// TODO: We need to loop here because safe time is not yet implemented.
// Remove this loop when that is done. See KUDU-1056.
while (true) {
ASSERT_OK(ksck_->FetchTableAndTabletInfo());
Status s = ksck_->ChecksumData(vector<string>(), vector<string>(),
ChecksumOptions(MonoDelta::FromSeconds(10), 16, true, ts));
if (s.ok()) break;
if (deadline.ComesBefore(MonoTime::Now(MonoTime::FINE))) break;
SleepFor(MonoDelta::FromMilliseconds(10));
}
if (!s.ok()) {
LOG(WARNING) << Substitute("Timed out after $0 waiting for ksck to become consistent on TS $1. "
"Status: $2",
MonoTime::Now(MonoTime::FINE).GetDeltaSince(start).ToString(),
ts, s.ToString());
EXPECT_OK(s); // To avoid ASAN complaints due to thread reading the CountDownLatch.
}
continue_writing.Store(false);
ASSERT_OK(promise.Get());
writer_thread->Join();
}
// Test that followers & leader wait until safe time to respond to a snapshot
// scan at current timestamp. TODO: Safe time not yet implemented. See KUDU-1056.
TEST_F(RemoteKsckTest, DISABLED_TestChecksumSnapshotCurrentTimestamp) {
CountDownLatch started_writing(1);
AtomicBool continue_writing(true);
Promise<Status> promise;
scoped_refptr<Thread> writer_thread;
Thread::Create("RemoteKsckTest", "TestChecksumSnapshot",
&RemoteKsckTest::GenerateRowWritesLoop, this,
&started_writing, boost::cref(continue_writing), &promise,
&writer_thread);
CHECK(started_writing.WaitFor(MonoDelta::FromSeconds(30)));
ASSERT_OK(ksck_->FetchTableAndTabletInfo());
ASSERT_OK(ksck_->ChecksumData(vector<string>(), vector<string>(),
ChecksumOptions(MonoDelta::FromSeconds(10), 16, true,
ChecksumOptions::kCurrentTimestamp)));
continue_writing.Store(false);
ASSERT_OK(promise.Get());
writer_thread->Join();
}
} // namespace tools
} // namespace kudu