| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "kudu/tools/ksck_remote.h" |
| |
| #include <cstdint> |
| #include <functional> |
| #include <memory> |
| #include <sstream> |
| #include <string> |
| #include <thread> |
| #include <vector> |
| |
| #include <gflags/gflags.h> |
| #include <glog/logging.h> |
| #include <gtest/gtest.h> |
| |
| #include "kudu/client/client.h" |
| #include "kudu/client/schema.h" |
| #include "kudu/client/shared_ptr.h" // IWYU pragma: keep |
| #include "kudu/client/write_op.h" |
| #include "kudu/common/partial_row.h" |
| #include "kudu/gutil/basictypes.h" |
| #include "kudu/gutil/macros.h" |
| #include "kudu/gutil/port.h" |
| #include "kudu/gutil/stl_util.h" |
| #include "kudu/gutil/strings/substitute.h" |
| #include "kudu/integration-tests/data_gen_util.h" |
| #include "kudu/master/mini_master.h" |
| #include "kudu/mini-cluster/internal_mini_cluster.h" |
| #include "kudu/rebalance/cluster_status.h" |
| #include "kudu/tools/ksck.h" |
| #include "kudu/tools/ksck_checksum.h" |
| #include "kudu/tools/ksck_results.h" |
| #include "kudu/tserver/mini_tablet_server.h" |
| #include "kudu/util/atomic.h" |
| #include "kudu/util/countdown_latch.h" |
| #include "kudu/util/flag_tags.h" |
| #include "kudu/util/monotime.h" |
| #include "kudu/util/net/net_util.h" |
| #include "kudu/util/net/sockaddr.h" |
| #include "kudu/util/path_util.h" |
| #include "kudu/util/promise.h" |
| #include "kudu/util/random.h" |
| #include "kudu/util/scoped_cleanup.h" |
| #include "kudu/util/status.h" |
| #include "kudu/util/test_macros.h" |
| #include "kudu/util/test_util.h" |
| |
| DECLARE_bool(checksum_scan); |
| DECLARE_bool(consensus); |
| DECLARE_int32(heartbeat_interval_ms); |
| DECLARE_int32(safe_time_max_lag_ms); |
| DECLARE_int32(scanner_max_wait_ms); |
| DECLARE_int32(tablet_history_max_age_sec); |
| DECLARE_int32(timestamp_update_period_ms); |
| DECLARE_int32(wait_before_setting_snapshot_timestamp_ms); |
| DECLARE_string(location_mapping_cmd); |
| |
| DEFINE_int32(experimental_flag_for_ksck_test, 0, |
| "A flag marked experimental so it can be used to test ksck's " |
| "unusual flag detection features"); |
| TAG_FLAG(experimental_flag_for_ksck_test, experimental); |
| |
| using kudu::client::KuduColumnSchema; |
| using kudu::client::KuduInsert; |
| using kudu::client::KuduScanToken; |
| using kudu::client::KuduScanTokenBuilder; |
| using kudu::client::KuduSchemaBuilder; |
| using kudu::client::KuduSession; |
| using kudu::client::KuduTable; |
| using kudu::client::KuduTableAlterer; |
| using kudu::client::KuduTableCreator; |
| using kudu::client::sp::shared_ptr; |
| using kudu::cluster::InternalMiniCluster; |
| using kudu::cluster::InternalMiniClusterOptions; |
| using std::string; |
| using std::thread; |
| using std::unique_ptr; |
| using std::vector; |
| using strings::Substitute; |
| |
| namespace kudu { |
| namespace tools { |
| |
| static const char *kTableName = "ksck-test-table"; |
| |
| class RemoteKsckTest : public KuduTest { |
| public: |
| RemoteKsckTest() |
| : random_(SeedRandom()) { |
| KuduSchemaBuilder b; |
| b.AddColumn("key")->Type(KuduColumnSchema::INT32)->NotNull()->PrimaryKey(); |
| b.AddColumn("int_val")->Type(KuduColumnSchema::INT32)->NotNull(); |
| CHECK_OK(b.Build(&schema_)); |
| } |
| |
| virtual void SetUp() OVERRIDE { |
| KuduTest::SetUp(); |
| |
| // Speed up testing, saves about 700ms per TEST_F. |
| FLAGS_heartbeat_interval_ms = 10; |
| |
| InternalMiniClusterOptions opts; |
| |
| opts.num_masters = 3; |
| opts.num_tablet_servers = 3; |
| mini_cluster_.reset(new InternalMiniCluster(env_, opts)); |
| ASSERT_OK(mini_cluster_->Start()); |
| |
| // Connect to the cluster. |
| ASSERT_OK(mini_cluster_->CreateClient(nullptr, &client_)); |
| |
| // Create one table. |
| unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator()); |
| #pragma GCC diagnostic push |
| #pragma GCC diagnostic ignored "-Wdeprecated-declarations" |
| ASSERT_OK(table_creator->table_name(kTableName) |
| .schema(&schema_) |
| .num_replicas(3) |
| .set_range_partition_columns({ "key" }) |
| .split_rows(GenerateSplitRows()) |
| .Create()); |
| #pragma GCC diagnostic pop |
| // Make sure we can open the table. |
| shared_ptr<KuduTable> client_table; |
| ASSERT_OK(client_->OpenTable(kTableName, &client_table)); |
| |
| vector<string> master_addresses; |
| master_addresses.reserve(opts.num_masters); |
| for (int i = 0; i < mini_cluster_->num_masters(); i++) { |
| master_addresses.push_back( |
| mini_cluster_->mini_master(i)->bound_rpc_addr_str()); |
| } |
| ASSERT_OK(RemoteKsckCluster::Build(master_addresses, &cluster_)); |
| ksck_.reset(new Ksck(cluster_, &err_stream_)); |
| } |
| |
| virtual void TearDown() OVERRIDE { |
| if (mini_cluster_) { |
| mini_cluster_->Shutdown(); |
| mini_cluster_.reset(); |
| } |
| KuduTest::TearDown(); |
| } |
| |
| // Writes rows to the table until the continue_writing flag is set to false. |
| void GenerateRowWritesLoop(CountDownLatch* started_writing, |
| const AtomicBool& continue_writing, |
| Promise<Status>* promise) { |
| shared_ptr<KuduTable> table; |
| Status status; |
| SCOPED_CLEANUP({ |
| promise->Set(status); |
| started_writing->CountDown(1); |
| }); |
| status = client_->OpenTable(kTableName, &table); |
| if (!status.ok()) { |
| return; |
| } |
| shared_ptr<KuduSession> session(client_->NewSession()); |
| session->SetTimeoutMillis(10000); |
| status = session->SetFlushMode(KuduSession::MANUAL_FLUSH); |
| if (!status.ok()) { |
| return; |
| } |
| |
| for (uint64_t i = 0; continue_writing.Load(); i++) { |
| unique_ptr<KuduInsert> insert(table->NewInsert()); |
| GenerateDataForRow(table->schema(), i, &random_, insert->mutable_row()); |
| status = session->Apply(insert.release()); |
| if (!status.ok()) { |
| return; |
| } |
| status = session->Flush(); |
| if (!status.ok()) { |
| return; |
| } |
| // Wait for the first 100 writes so that it's very likely all replicas have committed a |
| // message in each tablet; otherwise, safe time might not have been updated on all replicas |
| // and some might refuse snapshot scans because of lag. |
| if (i > 100) { |
| started_writing->CountDown(1); |
| } |
| } |
| } |
| |
| protected: |
| // Generate a set of split rows for tablets used in this test. |
| vector<const KuduPartialRow*> GenerateSplitRows() { |
| vector<const KuduPartialRow*> split_rows; |
| int num_tablets = AllowSlowTests() ? 10 : 3; |
| for (int i = 1; i < num_tablets; i++) { |
| KuduPartialRow* row = schema_.NewRow(); |
| CHECK_OK(row->SetInt32(0, i * 10)); |
| split_rows.push_back(row); |
| } |
| return split_rows; |
| } |
| |
| Status GenerateRowWrites(uint64_t num_rows) { |
| shared_ptr<KuduTable> table; |
| RETURN_NOT_OK(client_->OpenTable(kTableName, &table)); |
| shared_ptr<KuduSession> session(client_->NewSession()); |
| session->SetTimeoutMillis(10000); |
| RETURN_NOT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_BACKGROUND)); |
| for (uint64_t i = 0; i < num_rows; i++) { |
| VLOG(1) << "Generating write for row id " << i; |
| unique_ptr<KuduInsert> insert(table->NewInsert()); |
| GenerateDataForRow(table->schema(), i, &random_, insert->mutable_row()); |
| RETURN_NOT_OK(session->Apply(insert.release())); |
| } |
| RETURN_NOT_OK(session->Flush()); |
| return Status::OK(); |
| } |
| |
| void CreateTableWithNoTablet(const string& table_name) { |
| // Create a table with one range partition. |
| client::KuduSchema schema; |
| KuduSchemaBuilder b; |
| b.AddColumn("key")->Type(KuduColumnSchema::INT32)->NotNull()->PrimaryKey(); |
| b.AddColumn("value")->Type(KuduColumnSchema::INT32)->NotNull(); |
| ASSERT_OK(b.Build(&schema)); |
| unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator()); |
| int lower_bound = 0; |
| int upper_bound = 100; |
| unique_ptr<KuduPartialRow> lower(schema.NewRow()); |
| unique_ptr<KuduPartialRow> upper(schema.NewRow()); |
| ASSERT_OK(lower->SetInt32("key", lower_bound)); |
| ASSERT_OK(upper->SetInt32("key", upper_bound)); |
| ASSERT_OK(table_creator->table_name(table_name) |
| .schema(&schema) |
| .set_range_partition_columns({ "key" }) |
| .add_range_partition(lower.release(),upper.release()) |
| .num_replicas(3) |
| .Create()); |
| |
| // Drop range partition. |
| lower.reset(schema.NewRow()); |
| upper.reset(schema.NewRow()); |
| ASSERT_OK(lower->SetInt32("key", lower_bound)); |
| ASSERT_OK(upper->SetInt32("key", upper_bound)); |
| unique_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer(table_name)); |
| table_alterer->DropRangePartition(lower.release(), upper.release()); |
| ASSERT_OK(table_alterer->Alter()); |
| } |
| |
| Status GetTabletIds(vector<string>* tablet_ids) { |
| shared_ptr<KuduTable> table; |
| RETURN_NOT_OK(client_->OpenTable(kTableName, &table)); |
| |
| vector<KuduScanToken*> tokens; |
| ElementDeleter deleter(&tokens); |
| |
| KuduScanTokenBuilder builder(table.get()); |
| RETURN_NOT_OK(builder.Build(&tokens)); |
| for (const auto* t : tokens) { |
| tablet_ids->emplace_back(t->tablet().id()); |
| } |
| return Status::OK(); |
| } |
| |
| unique_ptr<InternalMiniCluster> mini_cluster_; |
| unique_ptr<Ksck> ksck_; |
| shared_ptr<client::KuduClient> client_; |
| std::shared_ptr<KsckCluster> cluster_; |
| |
| // Captures logged messages from ksck. |
| std::ostringstream err_stream_; |
| |
| private: |
| client::KuduSchema schema_; |
| Random random_; |
| }; |
| |
| TEST_F(RemoteKsckTest, TestClusterOk) { |
| ASSERT_OK(ksck_->CheckMasterHealth()); |
| ASSERT_OK(ksck_->CheckMasterUnusualFlags()); |
| ASSERT_OK(ksck_->CheckMasterConsensus()); |
| ASSERT_OK(ksck_->CheckClusterRunning()); |
| ASSERT_OK(ksck_->FetchTableAndTabletInfo()); |
| ASSERT_OK(ksck_->FetchInfoFromTabletServers()); |
| ASSERT_OK(ksck_->CheckTabletServerUnusualFlags()); |
| } |
| |
| TEST_F(RemoteKsckTest, TestCheckUnusualFlags) { |
| FLAGS_experimental_flag_for_ksck_test = 1; |
| |
| ASSERT_OK(ksck_->CheckMasterHealth()); |
| ASSERT_OK(ksck_->CheckMasterUnusualFlags()); |
| ASSERT_OK(ksck_->CheckClusterRunning()); |
| ASSERT_OK(ksck_->FetchTableAndTabletInfo()); |
| ASSERT_OK(ksck_->FetchInfoFromTabletServers()); |
| ASSERT_OK(ksck_->CheckTabletServerUnusualFlags()); |
| ASSERT_OK(ksck_->PrintResults()); |
| |
| const string& err_string = err_stream_.str(); |
| ASSERT_STR_CONTAINS(err_string, |
| "Some masters have unsafe, experimental, or hidden flags set"); |
| ASSERT_STR_CONTAINS(err_string, |
| "Some tablet servers have unsafe, experimental, or hidden flags set"); |
| ASSERT_STR_CONTAINS(err_string, "experimental_flag_for_ksck_test"); |
| } |
| |
| TEST_F(RemoteKsckTest, TestTabletServerMismatchedUUID) { |
| ASSERT_OK(ksck_->CheckMasterHealth()); |
| ASSERT_OK(ksck_->CheckMasterUnusualFlags()); |
| ASSERT_OK(ksck_->CheckMasterConsensus()); |
| ASSERT_OK(ksck_->CheckClusterRunning()); |
| ASSERT_OK(ksck_->FetchTableAndTabletInfo()); |
| |
| tserver::MiniTabletServer* tablet_server = mini_cluster_->mini_tablet_server(0); |
| string old_uuid = tablet_server->uuid(); |
| string root_dir = mini_cluster_->GetTabletServerFsRoot(0) + "2"; |
| HostPort address = HostPort(tablet_server->bound_rpc_addr()); |
| |
| tablet_server->Shutdown(); |
| tserver::MiniTabletServer new_tablet_server(root_dir, address); |
| ASSERT_OK(new_tablet_server.Start()); |
| ASSERT_OK(new_tablet_server.WaitStarted()); |
| |
| string new_uuid = new_tablet_server.uuid(); |
| |
| ASSERT_TRUE(ksck_->FetchInfoFromTabletServers().IsNetworkError()); |
| ASSERT_OK(ksck_->PrintResults()); |
| |
| string match_string = "Error from $0: Remote error: ID reported by tablet server " |
| "($1) doesn't match the expected ID: $2 (WRONG_SERVER_UUID)"; |
| ASSERT_STR_CONTAINS(err_stream_.str(), Substitute(match_string, address.ToString(), |
| new_uuid, old_uuid)); |
| tserver::MiniTabletServer* ts = mini_cluster_->mini_tablet_server(1); |
| ASSERT_STR_CONTAINS(err_stream_.str(), Substitute("$0 | $1 | HEALTHY | <none>", |
| ts->uuid(), |
| ts->bound_rpc_addr().ToString())); |
| ts = mini_cluster_->mini_tablet_server(2); |
| ASSERT_STR_CONTAINS(err_stream_.str(), Substitute("$0 | $1 | HEALTHY | <none>", |
| ts->uuid(), |
| ts->bound_rpc_addr().ToString())); |
| } |
| |
| TEST_F(RemoteKsckTest, TestTableConsistency) { |
| MonoTime deadline = MonoTime::Now() + MonoDelta::FromSeconds(30); |
| Status s; |
| while (MonoTime::Now() < deadline) { |
| ASSERT_OK(ksck_->CheckMasterHealth()); |
| ASSERT_OK(ksck_->CheckMasterUnusualFlags()); |
| ASSERT_OK(ksck_->CheckMasterConsensus()); |
| ASSERT_OK(ksck_->CheckClusterRunning()); |
| ASSERT_OK(ksck_->FetchTableAndTabletInfo()); |
| ASSERT_OK(ksck_->FetchInfoFromTabletServers()); |
| s = ksck_->CheckTablesConsistency(); |
| if (s.ok()) { |
| break; |
| } |
| SleepFor(MonoDelta::FromMilliseconds(10)); |
| } |
| ASSERT_OK(s); |
| } |
| |
| TEST_F(RemoteKsckTest, TestChecksum) { |
| uint64_t num_writes = 100; |
| LOG(INFO) << "Generating row writes..."; |
| ASSERT_OK(GenerateRowWrites(num_writes)); |
| |
| MonoTime deadline = MonoTime::Now() + MonoDelta::FromSeconds(30); |
| Status s; |
| while (MonoTime::Now() < deadline) { |
| ASSERT_OK(ksck_->CheckMasterHealth()); |
| ASSERT_OK(ksck_->CheckMasterUnusualFlags()); |
| ASSERT_OK(ksck_->CheckMasterConsensus()); |
| ASSERT_OK(ksck_->CheckClusterRunning()); |
| ASSERT_OK(ksck_->FetchTableAndTabletInfo()); |
| ASSERT_OK(ksck_->FetchInfoFromTabletServers()); |
| |
| err_stream_.str(""); |
| s = ksck_->ChecksumData(KsckChecksumOptions(MonoDelta::FromSeconds(1), |
| MonoDelta::FromSeconds(1), |
| 16, false, 0)); |
| if (s.ok()) { |
| // Check the status message at the end of the checksum. |
| // We expect '0B from disk' because we didn't write enough data to trigger a flush |
| // in this short-running test. |
| ASSERT_STR_CONTAINS(err_stream_.str(), |
| AllowSlowTests() ? |
| "0/30 replicas remaining (0B from disk, 300 rows summed)" : |
| "0/9 replicas remaining (0B from disk, 300 rows summed)"); |
| break; |
| } |
| SleepFor(MonoDelta::FromMilliseconds(10)); |
| } |
| ASSERT_OK(s); |
| } |
| |
| TEST_F(RemoteKsckTest, TestChecksumTimeout) { |
| uint64_t num_writes = 10000; |
| LOG(INFO) << "Generating row writes..."; |
| ASSERT_OK(GenerateRowWrites(num_writes)); |
| ASSERT_OK(ksck_->CheckClusterRunning()); |
| ASSERT_OK(ksck_->FetchTableAndTabletInfo()); |
| ASSERT_OK(ksck_->FetchInfoFromTabletServers()); |
| // Use an impossibly low timeout value of zero! |
| Status s = ksck_->ChecksumData(KsckChecksumOptions(MonoDelta::FromNanoseconds(0), |
| MonoDelta::FromSeconds(5), |
| 16, false, 0)); |
| ASSERT_TRUE(s.IsTimedOut()) << "Expected TimedOut Status, got: " << s.ToString(); |
| } |
| |
| TEST_F(RemoteKsckTest, TestChecksumSnapshot) { |
| CountDownLatch started_writing(1); |
| AtomicBool continue_writing(true); |
| Promise<Status> promise; |
| |
| // Allow the checksum scan to wait for longer in case it takes a while for the |
| // writer thread to advance safe time. |
| FLAGS_scanner_max_wait_ms = 10000; |
| |
| thread writer_thread([&]() { |
| this->GenerateRowWritesLoop(&started_writing, continue_writing, &promise); |
| }); |
| { |
| SCOPED_CLEANUP({ |
| continue_writing.Store(false); |
| writer_thread.join(); |
| }); |
| started_writing.Wait(); |
| |
| // The writer thread may have errored out before actually writing anything; |
| // check for this before proceeding with the test. |
| const auto* s = promise.WaitFor(MonoDelta::FromSeconds(0)); |
| if (s) { |
| ASSERT_OK(*s); |
| } |
| |
| uint64_t ts = client_->GetLatestObservedTimestamp(); |
| ASSERT_OK(ksck_->CheckMasterHealth()); |
| ASSERT_OK(ksck_->CheckMasterUnusualFlags()); |
| ASSERT_OK(ksck_->CheckMasterConsensus()); |
| ASSERT_OK(ksck_->CheckClusterRunning()); |
| ASSERT_OK(ksck_->FetchTableAndTabletInfo()); |
| ASSERT_OK(ksck_->FetchInfoFromTabletServers()); |
| ASSERT_OK(ksck_->ChecksumData(KsckChecksumOptions(MonoDelta::FromSeconds(10), |
| MonoDelta::FromSeconds(10), |
| 16, true, ts))); |
| } |
| ASSERT_OK(promise.Get()); |
| } |
| |
| // Test that followers & leader wait until safe time to respond to a snapshot |
| // scan at current timestamp. |
| TEST_F(RemoteKsckTest, TestChecksumSnapshotCurrentTimestamp) { |
| CountDownLatch started_writing(1); |
| AtomicBool continue_writing(true); |
| Promise<Status> promise; |
| |
| // Allow the checksum scan to wait for longer in case it takes a while for the |
| // writer thread to advance safe time. |
| FLAGS_scanner_max_wait_ms = 10000; |
| |
| thread writer_thread([&]() { |
| this->GenerateRowWritesLoop(&started_writing, continue_writing, &promise); |
| }); |
| { |
| SCOPED_CLEANUP({ |
| continue_writing.Store(false); |
| writer_thread.join(); |
| }); |
| started_writing.Wait(); |
| |
| // The writer thread may have errored out before actually writing anything; |
| // check for this before proceeding with the test. |
| const auto* s = promise.WaitFor(MonoDelta::FromSeconds(0)); |
| if (s) { |
| ASSERT_OK(*s); |
| } |
| |
| ASSERT_OK(ksck_->CheckMasterHealth()); |
| ASSERT_OK(ksck_->CheckMasterUnusualFlags()); |
| ASSERT_OK(ksck_->CheckMasterConsensus()); |
| ASSERT_OK(ksck_->CheckClusterRunning()); |
| ASSERT_OK(ksck_->FetchTableAndTabletInfo()); |
| ASSERT_OK(ksck_->FetchInfoFromTabletServers()); |
| // It's possible for scans to fail because the tablets haven't been written |
| // to yet and haven't elected a leader. |
| ASSERT_EVENTUALLY([&] { |
| ASSERT_OK(ksck_->ChecksumData(KsckChecksumOptions(MonoDelta::FromSeconds(10), |
| MonoDelta::FromSeconds(10), |
| 16, true, |
| KsckChecksumOptions::kCurrentTimestamp))); |
| }); |
| } |
| ASSERT_OK(promise.Get()); |
| } |
| |
| // Regression test for KUDU-2179: If the checksum process takes long enough that |
| // the snapshot timestamp falls behind the ancient history mark, new replica |
| // checksums will fail. |
| TEST_F(RemoteKsckTest, TestChecksumSnapshotLastingLongerThanAHM) { |
| // This test is really slow because -tablet_history_max_age_sec's lowest |
| // acceptable value is 1. |
| SKIP_IF_SLOW_NOT_ALLOWED(); |
| |
| // This test relies on somewhat precise timing: the timestamp update must |
| // happen during the wait to start the checksum, for each tablet. It's likely |
| // this sometimes won't happen in builds that are slower, so we'll just |
| // disable the test for those builds. |
| #if defined(THREAD_SANITIZER) || defined(ADDRESS_SANITIZER) |
| LOG(WARNING) << "test is skipped in TSAN and ASAN builds"; |
| return; |
| #endif |
| |
| // Write something so we have rows to checksum, and because we need a valid |
| // timestamp from the client to use for a checksum scan. |
| constexpr uint64_t num_writes = 100; |
| LOG(INFO) << "Generating row writes..."; |
| ASSERT_OK(GenerateRowWrites(num_writes)); |
| |
| // Update timestamps frequently. |
| FLAGS_timestamp_update_period_ms = 200; |
| // Keep history for 1 second. This means snapshot scans with a timestamp older |
| // than 1 second will be rejected. |
| FLAGS_tablet_history_max_age_sec = 1; |
| // Wait for the AHM to pass before assigning a timestamp. |
| FLAGS_wait_before_setting_snapshot_timestamp_ms = 1100; |
| |
| ASSERT_OK(ksck_->CheckMasterHealth()); |
| ASSERT_OK(ksck_->CheckMasterUnusualFlags()); |
| ASSERT_OK(ksck_->CheckMasterConsensus()); |
| ASSERT_OK(ksck_->CheckClusterRunning()); |
| ASSERT_OK(ksck_->FetchTableAndTabletInfo()); |
| ASSERT_OK(ksck_->FetchInfoFromTabletServers()); |
| |
| // Run a checksum scan at the latest timestamp known to the client. This |
| // should fail, since we will wait until after the AHM has passed to start |
| // any scans. |
| constexpr int timeout_sec = 30; |
| constexpr int scan_concurrency = 16; |
| constexpr bool use_snapshot = true; |
| uint64_t ts = client_->GetLatestObservedTimestamp(); |
| Status s = ksck_->ChecksumData(KsckChecksumOptions( |
| MonoDelta::FromSeconds(timeout_sec), |
| MonoDelta::FromSeconds(timeout_sec), |
| scan_concurrency, |
| use_snapshot, |
| ts)); |
| ASSERT_TRUE(s.IsAborted()) << s.ToString(); |
| ASSERT_OK(ksck_->PrintResults()); |
| ASSERT_STR_CONTAINS(err_stream_.str(), |
| "Invalid argument: snapshot scan end timestamp is earlier than " |
| "the ancient history mark."); |
| |
| // Now let's try again using the special current timestamp, which will run |
| // checksums using timestamps updated from the servers, and should succeed. |
| ASSERT_OK(ksck_->ChecksumData(KsckChecksumOptions( |
| MonoDelta::FromSeconds(timeout_sec), |
| MonoDelta::FromSeconds(timeout_sec), |
| scan_concurrency, |
| use_snapshot, |
| KsckChecksumOptions::kCurrentTimestamp))); |
| } |
| |
| TEST_F(RemoteKsckTest, TestLeaderMasterDown) { |
| // Make sure ksck's client is created with the current leader master and that |
| // all masters are healthy. |
| ASSERT_OK(ksck_->CheckMasterHealth()); |
| ASSERT_OK(ksck_->CheckMasterUnusualFlags()); |
| ASSERT_OK(ksck_->CheckMasterConsensus()); |
| ASSERT_OK(ksck_->CheckClusterRunning()); |
| |
| // Shut down the leader master. |
| int leader_idx; |
| ASSERT_OK(mini_cluster_->GetLeaderMasterIndex(&leader_idx)); |
| mini_cluster_->mini_master(leader_idx)->Shutdown(); |
| |
| // Check that the bad master health is detected. |
| ASSERT_TRUE(ksck_->CheckMasterHealth().IsNetworkError()); |
| |
| // Try to ksck. The underlying client will need to find the new leader master |
| // in order for the test to pass. |
| ASSERT_OK(ksck_->FetchTableAndTabletInfo()); |
| ASSERT_OK(ksck_->FetchInfoFromTabletServers()); |
| } |
| |
| TEST_F(RemoteKsckTest, TestClusterWithLocation) { |
| const string location_cmd_path = JoinPathSegments(GetTestExecutableDirectory(), |
| "testdata/first_argument.sh"); |
| const string location = "/foo"; |
| FLAGS_location_mapping_cmd = Substitute("$0 $1", location_cmd_path, location); |
| |
| // After setting the --location_mapping_cmd flag it's necessary to restart |
| // the masters to pick up the new value. |
| for (auto idx = 0; idx < mini_cluster_->num_masters(); ++idx) { |
| auto* master = mini_cluster_->mini_master(idx); |
| master->Shutdown(); |
| ASSERT_OK(master->Start()); |
| } |
| |
| ASSERT_OK(mini_cluster_->AddTabletServer()); |
| ASSERT_EQ(4, mini_cluster_->num_tablet_servers()); |
| |
| // In case of TSAN builds and running the test at inferior machines |
| // with lot of concurrent activity, the masters and tablet servers run Raft |
| // re-elections from time to time. Also, establishing and negotiation |
| // a connection takes much longer. To avoid flakiness of this test scenario, |
| // few calls below are wrapped into ASSERT_EVENTUALLY(). |
| // |
| // TODO(KUDU-2709): remove ASSERT_EVENTUALLY around CheckMasterConsensus |
| // when KUDU-2709 is addressed. |
| ASSERT_EVENTUALLY([&]() { |
| ASSERT_OK(ksck_->CheckMasterHealth()); // Need to refresh master cstate. |
| ASSERT_OK(ksck_->CheckMasterConsensus()); |
| }); |
| ASSERT_EVENTUALLY([&]() { |
| ASSERT_OK(ksck_->CheckClusterRunning()); |
| }); |
| ASSERT_EVENTUALLY([&]() { |
| ASSERT_OK(ksck_->FetchTableAndTabletInfo()); |
| }); |
| ASSERT_EVENTUALLY([&]() { |
| ASSERT_OK(ksck_->FetchInfoFromTabletServers()); |
| }); |
| |
| ASSERT_OK(ksck_->PrintResults()); |
| const string& err_string = err_stream_.str(); |
| |
| // With the flag set and masters restarted, all tablet servers are assigned |
| // with location '/foo': both the existing ones and the newly added. |
| for (int idx = 0; idx < mini_cluster_->num_tablet_servers(); ++idx) { |
| auto *ts = mini_cluster_->mini_tablet_server(idx); |
| ASSERT_STR_CONTAINS(err_string, Substitute("$0 | $1 | HEALTHY | $2", |
| ts->uuid(), |
| ts->bound_rpc_addr().ToString(), |
| location)); |
| } |
| ASSERT_STR_CONTAINS(err_string, |
| "Tablet Server Location Summary\n" |
| " Location | Count\n" |
| "----------+---------\n"); |
| ASSERT_STR_CONTAINS(err_string, |
| " /foo | 4\n"); |
| } |
| |
| // Test filtering on a cluster with no table. |
| TEST_F(RemoteKsckTest, TestFilterOnNoTableCluster) { |
| client_->DeleteTable(kTableName); |
| cluster_->set_table_filters({ "ksck-test-table" }); |
| FLAGS_checksum_scan = true; |
| FLAGS_consensus = false; |
| ASSERT_OK(ksck_->RunAndPrintResults()); |
| ASSERT_STR_CONTAINS(err_stream_.str(), |
| "The cluster doesn't have any matching tables"); |
| } |
| |
| // Test filtering on a non-matching table pattern. |
| TEST_F(RemoteKsckTest, TestNonMatchingTableFilter) { |
| cluster_->set_table_filters({ "fake-table" }); |
| FLAGS_checksum_scan = true; |
| FLAGS_consensus = false; |
| ASSERT_TRUE(ksck_->RunAndPrintResults().IsRuntimeError()); |
| const vector<Status>& error_messages = ksck_->results().error_messages; |
| ASSERT_EQ(1, error_messages.size()); |
| EXPECT_EQ("Not found: checksum scan error: No table found. Filter: table_filters=fake-table", |
| error_messages[0].ToString()); |
| ASSERT_STR_CONTAINS(err_stream_.str(), |
| "The cluster doesn't have any matching tables"); |
| } |
| |
| // Test filtering with a matching table pattern. |
| TEST_F(RemoteKsckTest, TestMatchingTableFilter) { |
| uint64_t num_writes = 100; |
| LOG(INFO) << "Generating row writes..."; |
| ASSERT_OK(GenerateRowWrites(num_writes)); |
| |
| cluster_->set_table_filters({ "ksck-te*" }); |
| FLAGS_checksum_scan = true; |
| FLAGS_consensus = false; |
| MonoTime deadline = MonoTime::Now() + MonoDelta::FromSeconds(30); |
| Status s; |
| while (MonoTime::Now() < deadline) { |
| ksck_.reset(new Ksck(cluster_, &err_stream_)); |
| s = ksck_->RunAndPrintResults(); |
| if (s.ok()) { |
| // Check the status message at the end of the checksum. |
| // We expect '0B from disk' because we didn't write enough data to trigger a flush |
| // in this short-running test. |
| ASSERT_STR_CONTAINS(err_stream_.str(), |
| AllowSlowTests() ? |
| "0/30 replicas remaining (0B from disk, 300 rows summed)" : |
| "0/9 replicas remaining (0B from disk, 300 rows summed)"); |
| break; |
| } |
| SleepFor(MonoDelta::FromMilliseconds(10)); |
| } |
| ASSERT_OK(s); |
| } |
| |
| // Test filtering on a table with no tablet. |
| TEST_F(RemoteKsckTest, TestFilterOnNotabletTable) { |
| NO_FATALS(CreateTableWithNoTablet("other-table")); |
| cluster_->set_table_filters({ "other-table" }); |
| FLAGS_checksum_scan = true; |
| FLAGS_consensus = false; |
| ASSERT_OK(ksck_->RunAndPrintResults()); |
| ASSERT_STR_CONTAINS(err_stream_.str(), |
| "The cluster doesn't have any matching tablets"); |
| } |
| |
| // Test filtering on a non-matching tablet id pattern. |
| TEST_F(RemoteKsckTest, TestNonMatchingTabletIdFilter) { |
| cluster_->set_tablet_id_filters({ "tablet-id-fake" }); |
| FLAGS_checksum_scan = true; |
| FLAGS_consensus = false; |
| ASSERT_TRUE(ksck_->RunAndPrintResults().IsRuntimeError()); |
| const vector<Status>& error_messages = ksck_->results().error_messages; |
| ASSERT_EQ(1, error_messages.size()); |
| EXPECT_EQ( |
| "Not found: checksum scan error: No tablet replicas found. " |
| "Filter: tablet_id_filters=tablet-id-fake", |
| error_messages[0].ToString()); |
| ASSERT_STR_CONTAINS(err_stream_.str(), |
| "The cluster doesn't have any matching tablets"); |
| } |
| |
| // Test filtering with a matching tablet ID pattern. |
| TEST_F(RemoteKsckTest, TestMatchingTabletIdFilter) { |
| uint64_t num_writes = 300; |
| LOG(INFO) << "Generating row writes..."; |
| ASSERT_OK(GenerateRowWrites(num_writes)); |
| |
| vector<string> tablet_ids; |
| ASSERT_OK(GetTabletIds(&tablet_ids)); |
| ASSERT_EQ(tablet_ids.size(), AllowSlowTests() ? 10 : 3); |
| |
| cluster_->set_tablet_id_filters({ tablet_ids[0] }); |
| FLAGS_checksum_scan = true; |
| FLAGS_consensus = false; |
| MonoTime deadline = MonoTime::Now() + MonoDelta::FromSeconds(30); |
| Status s; |
| while (MonoTime::Now() < deadline) { |
| ksck_.reset(new Ksck(cluster_, &err_stream_)); |
| s = ksck_->RunAndPrintResults(); |
| if (s.ok()) { |
| // Check the status message at the end of the checksum. |
| // We expect '0B from disk' because we didn't write enough data to trigger a flush |
| // in this short-running test. |
| ASSERT_STR_CONTAINS(err_stream_.str(), |
| "0/3 replicas remaining (0B from disk, 30 rows summed)"); |
| break; |
| } |
| SleepFor(MonoDelta::FromMilliseconds(10)); |
| } |
| ASSERT_OK(s); |
| } |
| |
| TEST_F(RemoteKsckTest, TestTableFiltersNoMatch) { |
| cluster_->set_table_filters({ "fake-table" }); |
| FLAGS_consensus = false; |
| ASSERT_OK(ksck_->RunAndPrintResults()); |
| ASSERT_STR_CONTAINS(err_stream_.str(), "The cluster doesn't have any matching tables"); |
| ASSERT_STR_NOT_CONTAINS(err_stream_.str(), |
| " | Total Count\n" |
| "----------------+-------------\n"); |
| } |
| |
| TEST_F(RemoteKsckTest, TestTableFilters) { |
| NO_FATALS(CreateTableWithNoTablet("other-table")); |
| cluster_->set_table_filters({ "ksck-test-table" }); |
| FLAGS_consensus = false; |
| ASSERT_OK(ksck_->RunAndPrintResults()); |
| ASSERT_STR_CONTAINS(err_stream_.str(), |
| AllowSlowTests() ? |
| " | Total Count\n" |
| "----------------+-------------\n" |
| " Masters | 3\n" |
| " Tablet Servers | 3\n" |
| " Tables | 1\n" |
| " Tablets | 10\n" |
| " Replicas | 30\n" : |
| " | Total Count\n" |
| "----------------+-------------\n" |
| " Masters | 3\n" |
| " Tablet Servers | 3\n" |
| " Tables | 1\n" |
| " Tablets | 3\n" |
| " Replicas | 9\n"); |
| } |
| |
| TEST_F(RemoteKsckTest, TestTabletFiltersNoMatch) { |
| cluster_->set_tablet_id_filters({ "tablet-id-fake" }); |
| FLAGS_consensus = false; |
| ASSERT_OK(ksck_->RunAndPrintResults()); |
| ASSERT_STR_CONTAINS(err_stream_.str(), "The cluster doesn't have any matching tablets"); |
| ASSERT_STR_NOT_CONTAINS(err_stream_.str(), |
| " | Total Count\n" |
| "----------------+-------------\n"); |
| } |
| |
| TEST_F(RemoteKsckTest, TestTabletFilters) { |
| vector<string> tablet_ids; |
| ASSERT_OK(GetTabletIds(&tablet_ids)); |
| ASSERT_EQ(tablet_ids.size(), AllowSlowTests() ? 10 : 3); |
| |
| cluster_->set_tablet_id_filters({ tablet_ids[0] }); |
| FLAGS_consensus = false; |
| ASSERT_OK(ksck_->RunAndPrintResults()); |
| ASSERT_STR_CONTAINS(err_stream_.str(), |
| " | Total Count\n" |
| "----------------+-------------\n" |
| " Masters | 3\n" |
| " Tablet Servers | 3\n" |
| " Tables | 1\n" |
| " Tablets | 1\n" |
| " Replicas | 3\n"); |
| } |
| |
| // Make sure the tablet summaries have the 'range_key_begin' field set |
| // correspondingly for tables with and without range partitions. |
| TEST_F(RemoteKsckTest, RangeKeysInTabletSummaries) { |
| static constexpr const char* const kColumn0 = "c0"; |
| static constexpr const char* const kColumn1 = "c1"; |
| static constexpr const char* const kTableWithRanges = "table_with_ranges"; |
| static constexpr const char* const kTableWithoutRanges = "table_without_ranges"; |
| |
| SKIP_IF_SLOW_NOT_ALLOWED(); |
| |
| client::KuduSchema schema; |
| KuduSchemaBuilder b; |
| b.AddColumn(kColumn0)->Type(KuduColumnSchema::INT32)->NotNull(); |
| b.AddColumn(kColumn1)->Type(KuduColumnSchema::INT32)->NotNull(); |
| b.SetPrimaryKey({ kColumn0, kColumn1 }); |
| ASSERT_OK(b.Build(&schema)); |
| |
| // Create range-partitioned table. |
| { |
| unique_ptr<KuduPartialRow> lower_bound(schema.NewRow()); |
| ASSERT_OK(lower_bound->SetInt32(kColumn0, 0)); |
| unique_ptr<KuduPartialRow> upper_bound(schema.NewRow()); |
| ASSERT_OK(upper_bound->SetInt32(kColumn0, 1000)); |
| |
| unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator()); |
| ASSERT_OK(table_creator->table_name(kTableWithRanges) |
| .schema(&schema) |
| .set_range_partition_columns({ kColumn0 }) |
| .add_range_partition(lower_bound.release(), upper_bound.release()) |
| .add_hash_partitions({ kColumn1 }, 2) |
| .num_replicas(3) |
| .Create()); |
| } |
| |
| // Create a table with no range partitions. |
| { |
| unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator()); |
| ASSERT_OK(table_creator->table_name(kTableWithoutRanges) |
| .schema(&schema) |
| .add_hash_partitions({ kColumn1 }, 3) |
| .num_replicas(3) |
| .Create()); |
| } |
| |
| ASSERT_OK(ksck_->FetchTableAndTabletInfo()); |
| ASSERT_OK(ksck_->FetchInfoFromTabletServers()); |
| // This scenario doesn't care about consistency of the tables in terms of |
| // matching Raft status between the catalog manager and tablet servers, etc. |
| // For this scenario, the important point is just to collect the information |
| // on tablets. |
| ignore_result(ksck_->CheckTablesConsistency()); |
| |
| vector<string> range_keys; |
| vector<string> no_range_keys; |
| const auto& tablet_summaries = ksck_->results().cluster_status.tablet_summaries; |
| for (const auto& summary : tablet_summaries) { |
| if (summary.table_name == kTableWithoutRanges) { |
| no_range_keys.emplace_back(summary.range_key_begin); |
| } else if (summary.table_name == kTableWithRanges) { |
| range_keys.emplace_back(summary.range_key_begin); |
| } |
| } |
| |
| // Check the results for the range-paritioned table. |
| // There is 1 range partition [0, 1000) and 2 hash buckets. |
| ASSERT_EQ(2, range_keys.size()); |
| for (const auto& key : range_keys) { |
| ASSERT_FALSE(key.empty()) << key; |
| } |
| // The keys for the beginning of the range partition should be the same for |
| // every tablet reported: that's exactly the same range partition, |
| // but different hash buckets. |
| ASSERT_EQ(range_keys.front(), range_keys.back()); |
| |
| // Check the results for the table without range partitions. |
| // There are no range partitions: just 3 hash buckets, so all the range start |
| // keys in the tablet summaries should be empty. |
| ASSERT_EQ(3, no_range_keys.size()); |
| for (const auto& key : no_range_keys) { |
| ASSERT_TRUE(key.empty()) << key; |
| } |
| } |
| |
| } // namespace tools |
| } // namespace kudu |