| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include <cstdlib> |
| #include <iostream> |
| #include <memory> |
| #include <string> |
| #include <thread> |
| #include <unordered_map> |
| #include <utility> |
| #include <vector> |
| |
| #include <boost/optional/optional.hpp> |
| #include <gflags/gflags.h> |
| #include <glog/logging.h> |
| #include <glog/stl_logging.h> |
| #include <gtest/gtest.h> |
| |
| #include "kudu/client/client.h" |
| #include "kudu/client/schema.h" |
| #include "kudu/client/shared_ptr.h" // IWYU pragma: keep |
| #include "kudu/common/common.pb.h" |
| #include "kudu/common/partial_row.h" |
| #include "kudu/gutil/port.h" |
| #include "kudu/gutil/ref_counted.h" |
| #include "kudu/gutil/stl_util.h" |
| #include "kudu/gutil/strings/substitute.h" |
| #include "kudu/integration-tests/cluster_itest_util.h" |
| #include "kudu/master/catalog_manager.h" |
| #include "kudu/master/master-test-util.h" |
| #include "kudu/master/master.h" |
| #include "kudu/master/master.pb.h" |
| #include "kudu/master/master.proxy.h" |
| #include "kudu/master/mini_master.h" |
| #include "kudu/mini-cluster/internal_mini_cluster.h" |
| #include "kudu/rpc/messenger.h" |
| #include "kudu/util/atomic.h" |
| #include "kudu/util/cow_object.h" |
| #include "kudu/util/monotime.h" |
| #include "kudu/util/net/sockaddr.h" |
| #include "kudu/util/pb_util.h" |
| #include "kudu/util/scoped_cleanup.h" |
| #include "kudu/util/status.h" |
| #include "kudu/util/stopwatch.h" |
| #include "kudu/util/test_macros.h" |
| #include "kudu/util/test_util.h" |
| |
| using boost::none; |
| using kudu::client::KuduClient; |
| using kudu::client::KuduClientBuilder; |
| using kudu::client::KuduColumnSchema; |
| using kudu::client::KuduSchema; |
| using kudu::client::KuduSchemaBuilder; |
| using kudu::client::KuduTableCreator; |
| using kudu::cluster::InternalMiniCluster; |
| using kudu::cluster::InternalMiniClusterOptions; |
| using kudu::itest::CreateTabletServerMap; |
| using kudu::itest::TabletServerMap; |
| using kudu::master::MasterServiceProxy; |
| using kudu::rpc::Messenger; |
| using kudu::rpc::MessengerBuilder; |
| |
| DECLARE_int32(heartbeat_interval_ms); |
| DECLARE_bool(log_preallocate_segments); |
| DEFINE_int32(num_test_tablets, 60, "Number of tablets for stress test"); |
| |
| using std::shared_ptr; |
| using std::string; |
| using std::thread; |
| using std::unique_ptr; |
| using std::vector; |
| using strings::Substitute; |
| |
| namespace kudu { |
| |
| const char* kTableName = "test_table"; |
| |
| class CreateTableStressTest : public KuduTest { |
| public: |
| CreateTableStressTest() { |
| KuduSchemaBuilder b; |
| b.AddColumn("key")->Type(KuduColumnSchema::INT32)->NotNull()->PrimaryKey(); |
| b.AddColumn("v1")->Type(KuduColumnSchema::INT64)->NotNull(); |
| b.AddColumn("v2")->Type(KuduColumnSchema::STRING)->NotNull(); |
| CHECK_OK(b.Build(&schema_)); |
| } |
| |
| virtual void SetUp() OVERRIDE { |
| // Make heartbeats faster to speed test runtime. |
| FLAGS_heartbeat_interval_ms = 10; |
| |
| // Don't preallocate log segments, since we're creating thousands |
| // of tablets here. If each preallocates 64M or so, we use |
| // a ton of disk space in this test, and it fails on normal |
| // sized /tmp dirs. |
| // TODO: once we collapse multiple tablets into shared WAL files, |
| // this won't be necessary. |
| FLAGS_log_preallocate_segments = false; |
| |
| KuduTest::SetUp(); |
| InternalMiniClusterOptions opts; |
| opts.num_tablet_servers = 3; |
| cluster_.reset(new InternalMiniCluster(env_, opts)); |
| ASSERT_OK(cluster_->Start()); |
| |
| ASSERT_OK(KuduClientBuilder() |
| .add_master_server_addr(cluster_->mini_master()->bound_rpc_addr_str()) |
| .Build(&client_)); |
| |
| ASSERT_OK(MessengerBuilder("stress-test-msgr") |
| .set_num_reactors(1) |
| .set_max_negotiation_threads(1) |
| .Build(&messenger_)); |
| const auto& addr = cluster_->mini_master()->bound_rpc_addr(); |
| master_proxy_.reset(new MasterServiceProxy(messenger_, addr, addr.host())); |
| ASSERT_OK(CreateTabletServerMap(master_proxy_, messenger_, &ts_map_)); |
| } |
| |
| virtual void TearDown() OVERRIDE { |
| cluster_->Shutdown(); |
| STLDeleteValues(&ts_map_); |
| } |
| |
| void CreateBigTable(const string& table_name, int num_tablets); |
| |
| protected: |
| client::sp::shared_ptr<KuduClient> client_; |
| unique_ptr<InternalMiniCluster> cluster_; |
| KuduSchema schema_; |
| std::shared_ptr<Messenger> messenger_; |
| shared_ptr<MasterServiceProxy> master_proxy_; |
| TabletServerMap ts_map_; |
| }; |
| |
| void CreateTableStressTest::CreateBigTable(const string& table_name, int num_tablets) { |
| vector<const KuduPartialRow*> split_rows; |
| int num_splits = num_tablets - 1; // 4 tablets == 3 splits. |
| // Let the "\x8\0\0\0" keys end up in the first split; start splitting at 1. |
| for (int i = 1; i <= num_splits; i++) { |
| KuduPartialRow* row = schema_.NewRow(); |
| CHECK_OK(row->SetInt32(0, i)); |
| split_rows.push_back(row); |
| } |
| |
| unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator()); |
| #pragma GCC diagnostic push |
| #pragma GCC diagnostic ignored "-Wdeprecated-declarations" |
| ASSERT_OK(table_creator->table_name(table_name) |
| .schema(&schema_) |
| .set_range_partition_columns({ "key" }) |
| .split_rows(split_rows) |
| .num_replicas(3) |
| .wait(false) |
| .Create()); |
| #pragma GCC diagnostic pop |
| } |
| |
| TEST_F(CreateTableStressTest, CreateAndDeleteBigTable) { |
| if (!AllowSlowTests()) { |
| LOG(INFO) << "Skipping slow test"; |
| return; |
| } |
| string table_name = "test_table"; |
| NO_FATALS(CreateBigTable(table_name, FLAGS_num_test_tablets)); |
| master::GetTableLocationsResponsePB resp; |
| ASSERT_OK(WaitForRunningTabletCount(cluster_->mini_master(), table_name, |
| FLAGS_num_test_tablets, &resp)); |
| LOG(INFO) << "Created table successfully!"; |
| // Use std::cout instead of log, since these responses are large and log |
| // messages have a max size. |
| std::cout << "Response:\n" << pb_util::SecureDebugString(resp); |
| std::cout << "CatalogManager state:\n"; |
| cluster_->mini_master()->master()->catalog_manager()->DumpState(&std::cerr); |
| |
| LOG(INFO) << "Deleting table..."; |
| ASSERT_OK(client_->DeleteTable(table_name)); |
| |
| // The actual removal of the tablets is asynchronous, so we loop for a bit |
| // waiting for them to get removed. |
| LOG(INFO) << "Waiting for tablets to be removed"; |
| vector<string> tablet_ids; |
| for (int i = 0; i < 1000; i++) { |
| ASSERT_OK(itest::ListRunningTabletIds(ts_map_.begin()->second, |
| MonoDelta::FromSeconds(10), |
| &tablet_ids)); |
| if (tablet_ids.empty()) break; |
| SleepFor(MonoDelta::FromMilliseconds(100)); |
| } |
| ASSERT_TRUE(tablet_ids.empty()) << "Tablets remained: " << tablet_ids; |
| } |
| |
| TEST_F(CreateTableStressTest, RestartMasterDuringCreation) { |
| if (!AllowSlowTests()) { |
| LOG(INFO) << "Skipping slow test"; |
| return; |
| } |
| |
| string table_name = "test_table"; |
| NO_FATALS(CreateBigTable(table_name, FLAGS_num_test_tablets)); |
| |
| for (int i = 0; i < 3; i++) { |
| SleepFor(MonoDelta::FromMicroseconds(500)); |
| LOG(INFO) << "Restarting master..."; |
| cluster_->mini_master()->Shutdown(); |
| ASSERT_OK(cluster_->mini_master()->Restart()); |
| ASSERT_OK(cluster_->mini_master()->master()-> |
| WaitUntilCatalogManagerIsLeaderAndReadyForTests(MonoDelta::FromSeconds(5))); |
| LOG(INFO) << "Master restarted."; |
| } |
| |
| master::GetTableLocationsResponsePB resp; |
| Status s = WaitForRunningTabletCount(cluster_->mini_master(), table_name, |
| FLAGS_num_test_tablets, &resp); |
| if (!s.ok()) { |
| cluster_->mini_master()->master()->catalog_manager()->DumpState(&std::cerr); |
| CHECK_OK(s); |
| } |
| } |
| |
| TEST_F(CreateTableStressTest, TestGetTableLocationsOptions) { |
| if (!AllowSlowTests()) { |
| LOG(INFO) << "Skipping slow test"; |
| return; |
| } |
| |
| string table_name = "test_table"; |
| LOG(INFO) << CURRENT_TEST_NAME() << ": Step 1. Creating big table " << table_name << " ..."; |
| LOG_TIMING(INFO, "creating big table") { |
| NO_FATALS(CreateBigTable(table_name, FLAGS_num_test_tablets)); |
| } |
| |
| master::GetTableLocationsRequestPB req; |
| master::GetTableLocationsResponsePB resp; |
| |
| // Make sure the table is completely created before we start poking. |
| LOG(INFO) << CURRENT_TEST_NAME() << ": Step 2. Waiting for creation of big table " |
| << table_name << " to complete..."; |
| LOG_TIMING(INFO, "waiting for creation of big table") { |
| ASSERT_OK(WaitForRunningTabletCount(cluster_->mini_master(), table_name, |
| FLAGS_num_test_tablets, &resp)); |
| } |
| |
| master::CatalogManager* catalog = |
| cluster_->mini_master()->master()->catalog_manager(); |
| master::CatalogManager::ScopedLeaderSharedLock l(catalog); |
| ASSERT_OK(l.first_failed_status()); |
| |
| // Test asking for 0 tablets, should fail |
| LOG(INFO) << CURRENT_TEST_NAME() << ": Step 3. Asking for zero tablets..."; |
| LOG_TIMING(INFO, "asking for zero tablets") { |
| req.Clear(); |
| resp.Clear(); |
| req.mutable_table()->set_table_name(table_name); |
| req.set_max_returned_locations(0); |
| Status s = catalog->GetTableLocations(&req, &resp, /*user=*/none); |
| ASSERT_STR_CONTAINS(s.ToString(), "must be greater than 0"); |
| } |
| |
| // Ask for one, get one, verify |
| LOG(INFO) << CURRENT_TEST_NAME() << ": Step 4. Asking for one tablet..."; |
| LOG_TIMING(INFO, "asking for one tablet") { |
| req.Clear(); |
| resp.Clear(); |
| req.mutable_table()->set_table_name(table_name); |
| req.set_max_returned_locations(1); |
| ASSERT_OK(catalog->GetTableLocations(&req, &resp, /*user=*/none)); |
| ASSERT_EQ(resp.tablet_locations_size(), 1); |
| // empty since it's the first |
| ASSERT_EQ(resp.tablet_locations(0).partition().partition_key_start(), ""); |
| ASSERT_EQ(resp.tablet_locations(0).partition().partition_key_end(), string("\x80\0\0\1", 4)); |
| } |
| |
| int half_tablets = FLAGS_num_test_tablets / 2; |
| // Ask for half of them, get that number back |
| LOG(INFO) << CURRENT_TEST_NAME() << ": Step 5. Asking for half the tablets..."; |
| LOG_TIMING(INFO, "asking for half the tablets") { |
| req.Clear(); |
| resp.Clear(); |
| req.mutable_table()->set_table_name(table_name); |
| req.set_max_returned_locations(half_tablets); |
| ASSERT_OK(catalog->GetTableLocations(&req, &resp, /*user=*/none)); |
| ASSERT_EQ(half_tablets, resp.tablet_locations_size()); |
| } |
| |
| // Ask for all of them, get that number back |
| LOG(INFO) << CURRENT_TEST_NAME() << ": Step 6. Asking for all the tablets..."; |
| LOG_TIMING(INFO, "asking for all the tablets") { |
| req.Clear(); |
| resp.Clear(); |
| req.mutable_table()->set_table_name(table_name); |
| req.set_max_returned_locations(FLAGS_num_test_tablets); |
| ASSERT_OK(catalog->GetTableLocations(&req, &resp, /*user=*/none)); |
| ASSERT_EQ(FLAGS_num_test_tablets, resp.tablet_locations_size()); |
| } |
| |
| LOG(INFO) << "========================================================"; |
| LOG(INFO) << "Tables and tablets:"; |
| LOG(INFO) << "========================================================"; |
| std::vector<scoped_refptr<master::TableInfo> > tables; |
| catalog->GetAllTables(&tables); |
| for (const scoped_refptr<master::TableInfo>& table_info : tables) { |
| LOG(INFO) << "Table: " << table_info->ToString(); |
| std::vector<scoped_refptr<master::TabletInfo> > tablets; |
| table_info->GetAllTablets(&tablets); |
| for (const scoped_refptr<master::TabletInfo>& tablet_info : tablets) { |
| master::TabletMetadataLock l_tablet(tablet_info.get(), LockMode::READ); |
| const master::SysTabletsEntryPB& metadata = tablet_info->metadata().state().pb; |
| LOG(INFO) << " Tablet: " << tablet_info->ToString() |
| << " { start_key: " |
| << ((metadata.partition().has_partition_key_start()) |
| ? metadata.partition().partition_key_start() : "<< none >>") |
| << ", end_key: " |
| << ((metadata.partition().has_partition_key_end()) |
| ? metadata.partition().partition_key_end() : "<< none >>") |
| << ", running = " << tablet_info->metadata().state().is_running() << " }"; |
| } |
| ASSERT_EQ(FLAGS_num_test_tablets, tablets.size()); |
| } |
| LOG(INFO) << "========================================================"; |
| |
| // Get a single tablet in the middle, make sure we get that one back |
| |
| unique_ptr<KuduPartialRow> row(schema_.NewRow()); |
| ASSERT_OK(row->SetInt32(0, half_tablets - 1)); |
| string start_key_middle; |
| ASSERT_OK(row->EncodeRowKey(&start_key_middle)); |
| |
| LOG(INFO) << "Start key middle: " << start_key_middle; |
| LOG(INFO) << CURRENT_TEST_NAME() << ": Step 7. Asking for single middle tablet..."; |
| LOG_TIMING(INFO, "asking for single middle tablet") { |
| req.Clear(); |
| resp.Clear(); |
| req.mutable_table()->set_table_name(table_name); |
| req.set_max_returned_locations(1); |
| req.set_partition_key_start(start_key_middle); |
| ASSERT_OK(catalog->GetTableLocations(&req, &resp, /*user=*/none)); |
| ASSERT_EQ(1, resp.tablet_locations_size()) |
| << "Response: [" << pb_util::SecureDebugString(resp) << "]"; |
| ASSERT_EQ(start_key_middle, resp.tablet_locations(0).partition().partition_key_start()); |
| } |
| } |
| |
| // Creates tables and reloads on-disk metadata concurrently to test for races |
| // between the two operations. |
| TEST_F(CreateTableStressTest, TestConcurrentCreateTableAndReloadMetadata) { |
| AtomicBool stop(false); |
| |
| thread reload_metadata_thread([&]() { |
| while (!stop.Load()) { |
| CHECK_OK(cluster_->mini_master()->master()->catalog_manager()-> |
| VisitTablesAndTablets()); |
| |
| // Give table creation a chance to run. TSAN is especially brutal; so |
| // let's sleep for longer in such environments. |
| #ifdef THREAD_SANITIZER |
| int sleep_ms = 10 + rand() % 91; |
| #else |
| int sleep_ms = 1 + rand() % 10; |
| #endif |
| SleepFor(MonoDelta::FromMilliseconds(sleep_ms)); |
| } |
| }); |
| SCOPED_CLEANUP({ |
| stop.Store(true); |
| reload_metadata_thread.join(); |
| }); |
| |
| for (int num_tables_created = 0; num_tables_created < 20;) { |
| string table_name = Substitute("test-$0", num_tables_created); |
| LOG(INFO) << "Creating table " << table_name; |
| unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator()); |
| Status s = table_creator->table_name(table_name) |
| .schema(&schema_) |
| .set_range_partition_columns({ "key" }) |
| .num_replicas(3) |
| .wait(false) |
| .Create(); |
| if (s.IsTimedOut()) { |
| // The master was busy reloading its metadata, replying with |
| // ServiceUnavailable on CreateTable() requests. The client transparently |
| // retried (randomized exponential back-off) until the timeout elapsed. |
| // |
| // It's hard to find some universal constant for timeout which would work |
| // in any testing environment instead of simply retrying here. That's |
| // because the client uses exponential-with-random back-off strategy |
| // while the metadata is being reloaded very often. So, from one side |
| // we want to have more or less random interaction between the metadata |
| // reloading and the simultaneous table creations, but from the other side |
| // it's hard do deduce the universal timeout constant and we prefer to |
| // not introduce test flakiness. |
| // |
| // Note: on timeout, we don't actually know with certainty whether the |
| // table was created or not. It's possible for the RPC to be accepted but |
| // processed very slowly by the master, for the client to eventually give |
| // up and time out the request, all while the master continues and |
| // eventually finishes creating the table. |
| // |
| // TODO(aserbin): update the test keeping its racy essence but making it |
| // cleaner regarding this timeout&retry workaround. |
| continue; |
| } |
| ASSERT_OK(s); |
| num_tables_created++; |
| } |
| } |
| |
| } // namespace kudu |