| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include <stdint.h> |
| |
| #include <algorithm> |
| #include <functional> |
| #include <memory> |
| #include <ostream> |
| #include <string> |
| #include <vector> |
| |
| #include <boost/optional/optional.hpp> |
| #include <gflags/gflags_declare.h> |
| #include <glog/logging.h> |
| #include <gtest/gtest.h> |
| |
| #include "kudu/common/common.pb.h" |
| #include "kudu/common/schema.h" |
| #include "kudu/common/wire_protocol.h" |
| #include "kudu/common/wire_protocol.pb.h" |
| #include "kudu/gutil/ref_counted.h" |
| #include "kudu/gutil/strings/substitute.h" |
| #include "kudu/gutil/walltime.h" |
| #include "kudu/master/catalog_manager.h" |
| #include "kudu/master/master-test-util.h" |
| #include "kudu/master/master.h" |
| #include "kudu/master/master.pb.h" |
| #include "kudu/master/mini_master.h" |
| #include "kudu/master/sys_catalog.h" |
| #include "kudu/master/ts_descriptor.h" |
| #include "kudu/mini-cluster/internal_mini_cluster.h" |
| #include "kudu/security/test/test_certs.h" |
| #include "kudu/security/tls_context.h" |
| #include "kudu/security/token_verifier.h" |
| #include "kudu/server/webserver_options.h" |
| #include "kudu/tablet/tablet.h" |
| #include "kudu/tablet/tablet_replica.h" |
| #include "kudu/tserver/mini_tablet_server.h" |
| #include "kudu/tserver/tablet_server.h" |
| #include "kudu/tserver/tablet_server_options.h" |
| #include "kudu/util/curl_util.h" |
| #include "kudu/util/faststring.h" |
| #include "kudu/util/metrics.h" |
| #include "kudu/util/monotime.h" |
| #include "kudu/util/net/sockaddr.h" |
| #include "kudu/util/pb_util.h" |
| #include "kudu/util/status.h" |
| #include "kudu/util/test_macros.h" |
| #include "kudu/util/test_util.h" |
| #include "kudu/util/version_info.h" |
| |
| DECLARE_int32(heartbeat_interval_ms); |
| METRIC_DECLARE_counter(rows_inserted); |
| METRIC_DECLARE_counter(rows_updated); |
| |
| using boost::none; |
| using kudu::cluster::InternalMiniCluster; |
| using kudu::master::CatalogManager; |
| using kudu::master::CreateTableRequestPB; |
| using kudu::master::CreateTableResponsePB; |
| using kudu::master::GetTableLocationsResponsePB; |
| using kudu::master::IsCreateTableDoneRequestPB; |
| using kudu::master::IsCreateTableDoneResponsePB; |
| using kudu::master::MiniMaster; |
| using kudu::master::TSDescriptor; |
| using kudu::master::TabletLocationsPB; |
| using kudu::pb_util::SecureShortDebugString; |
| using kudu::tserver::MiniTabletServer; |
| using std::shared_ptr; |
| using std::string; |
| using std::unique_ptr; |
| using std::vector; |
| |
| namespace kudu { |
| |
| void CreateTableForTesting(MiniMaster* mini_master, |
| const string& table_name, |
| const Schema& schema, |
| string* tablet_id) { |
| { |
| CreateTableRequestPB req; |
| CreateTableResponsePB resp; |
| |
| req.set_name(table_name); |
| req.set_num_replicas(1); |
| req.set_owner("jdoe"); |
| ASSERT_OK(SchemaToPB(schema, req.mutable_schema())); |
| CatalogManager* catalog = mini_master->master()->catalog_manager(); |
| CatalogManager::ScopedLeaderSharedLock l(catalog); |
| ASSERT_OK(l.first_failed_status()); |
| ASSERT_OK(catalog->CreateTable(&req, &resp, nullptr)); |
| } |
| |
| int wait_time = 1000; |
| bool is_table_created = false; |
| for (int i = 0; i < 80; ++i) { |
| IsCreateTableDoneRequestPB req; |
| IsCreateTableDoneResponsePB resp; |
| |
| req.mutable_table()->set_table_name(table_name); |
| CatalogManager* catalog = mini_master->master()->catalog_manager(); |
| { |
| CatalogManager::ScopedLeaderSharedLock l(catalog); |
| ASSERT_OK(l.first_failed_status()); |
| ASSERT_OK(catalog->IsCreateTableDone(&req, &resp, /*user=*/none)); |
| } |
| if (resp.done()) { |
| is_table_created = true; |
| break; |
| } |
| |
| VLOG(1) << "Waiting for table '" << table_name << "'to be created"; |
| |
| SleepFor(MonoDelta::FromMicroseconds(wait_time)); |
| wait_time = std::min(wait_time * 5 / 4, 1000000); |
| } |
| ASSERT_TRUE(is_table_created); |
| |
| GetTableLocationsResponsePB resp; |
| ASSERT_OK(WaitForRunningTabletCount(mini_master, table_name, 1, &resp)); |
| *tablet_id = resp.tablet_locations(0).tablet_id(); |
| LOG(INFO) << "Got tablet " << *tablet_id << " for table " << table_name; |
| } |
| |
| |
| // Tests for the Tablet Server registering with the Master, |
| // and the master maintaining the tablet descriptor. |
| class RegistrationTest : public KuduTest { |
| public: |
| RegistrationTest() |
| : schema_({ ColumnSchema("c1", UINT32) }, 1) { |
| } |
| |
| void SetUp() override { |
| // Make heartbeats faster to speed test runtime. |
| FLAGS_heartbeat_interval_ms = 10; |
| setup_time_ = WallTime_Now(); |
| |
| KuduTest::SetUp(); |
| |
| cluster_.reset(new InternalMiniCluster(env_, {})); |
| ASSERT_OK(cluster_->Start()); |
| } |
| |
| void TearDown() override { |
| cluster_->Shutdown(); |
| } |
| |
| void CheckTabletServersPage(string* contents = nullptr) { |
| EasyCurl c; |
| faststring buf; |
| string addr = cluster_->mini_master()->bound_http_addr().ToString(); |
| ASSERT_OK(c.FetchURL(strings::Substitute("http://$0/tablet-servers", addr), |
| &buf)); |
| string buf_str = buf.ToString(); |
| |
| // Should include the TS UUID |
| string expected_uuid = |
| cluster_->mini_tablet_server(0)->server()->instance_pb().permanent_uuid(); |
| ASSERT_STR_CONTAINS(buf_str, expected_uuid); |
| |
| // Should check that the TS software version is included on the page. |
| // tserver version should be the same as returned by GetVersionInfo() |
| ASSERT_STR_CONTAINS(buf_str, VersionInfo::GetVersionInfo()); |
| if (contents != nullptr) { |
| *contents = std::move(buf_str); |
| } |
| } |
| |
| Status WaitForReplicaCount(const string& tablet_id, |
| int expected_count, |
| TabletLocationsPB* locations = nullptr) { |
| while (true) { |
| master::CatalogManager* catalog = |
| cluster_->mini_master()->master()->catalog_manager(); |
| Status s; |
| TabletLocationsPB loc; |
| CatalogManager::TSInfosDict infos_dict; |
| do { |
| master::CatalogManager::ScopedLeaderSharedLock l(catalog); |
| const Status& ls = l.first_failed_status(); |
| if (ls.IsServiceUnavailable() || ls.IsIllegalState()) { |
| // ServiceUnavailable means catalog manager is not yet ready |
| // to serve requests; IllegalState means the catalog is not the |
| // leader yet. That's an indication of a transient state where it's |
| // it's necessary to try again later. |
| break; // exiting out of the 'do {...} while (false)' scope |
| } |
| RETURN_NOT_OK(ls); |
| s = catalog->GetTabletLocations(tablet_id, |
| master::VOTER_REPLICA, |
| &loc, |
| &infos_dict, |
| /*user=*/none); |
| } while (false); |
| if (s.ok() && loc.interned_replicas_size() == expected_count) { |
| if (locations) { |
| *locations = std::move(loc); |
| } |
| return Status::OK(); |
| } |
| SleepFor(MonoDelta::FromMilliseconds(10)); |
| } |
| } |
| |
| protected: |
| unique_ptr<InternalMiniCluster> cluster_; |
| Schema schema_; |
| int64_t setup_time_; |
| }; |
| |
| TEST_F(RegistrationTest, TestTSRegisters) { |
| // Wait for the TS to register. |
| vector<shared_ptr<TSDescriptor>> descs; |
| ASSERT_OK(cluster_->WaitForTabletServerCount( |
| 1, InternalMiniCluster::MatchMode::MATCH_TSERVERS, &descs)); |
| ASSERT_EQ(1, descs.size()); |
| |
| // Verify that the registration is sane. |
| ServerRegistrationPB reg; |
| descs[0]->GetRegistration(®); |
| { |
| SCOPED_TRACE(SecureShortDebugString(reg)); |
| ASSERT_EQ(string::npos, SecureShortDebugString(reg).find("0.0.0.0")) |
| << "Should not include wildcards in registration"; |
| } |
| |
| NO_FATALS(CheckTabletServersPage()); |
| |
| // Restart the master, so it loses the descriptor, and ensure that the |
| // heartbeater thread handles re-registering. |
| cluster_->mini_master()->Shutdown(); |
| ASSERT_OK(cluster_->mini_master()->Restart()); |
| |
| const int64_t seqno = descs.back()->latest_seqno(); |
| descs.clear(); |
| ASSERT_OK(cluster_->WaitForTabletServerCount( |
| 1, InternalMiniCluster::MatchMode::MATCH_TSERVERS, &descs)); |
| ASSERT_EQ(descs.back()->latest_seqno(), seqno); |
| |
| // Restart the tserver, so it will register, and ensure that it gets the |
| // newer sequence number. |
| cluster_->mini_tablet_server(0)->Shutdown(); |
| ASSERT_OK(cluster_->mini_tablet_server(0)->Restart()); |
| |
| descs.clear(); |
| ASSERT_OK(cluster_->WaitForTabletServerCount( |
| 1, InternalMiniCluster::MatchMode::MATCH_TSERVERS, &descs)); |
| ASSERT_GT(descs.back()->latest_seqno(), seqno); |
| } |
| |
| TEST_F(RegistrationTest, TestMasterSoftwareVersion) { |
| // Verify that the master's software version exists. |
| ServerRegistrationPB reg; |
| cluster_->mini_master()->master()->GetMasterRegistration(®); |
| { |
| SCOPED_TRACE(SecureShortDebugString(reg)); |
| ASSERT_TRUE(reg.has_software_version()); |
| ASSERT_STR_CONTAINS(reg.software_version(), |
| VersionInfo::GetVersionInfo()); |
| ASSERT_LE(setup_time_, reg.start_time()); |
| ASSERT_LE(reg.start_time(), WallTime_Now()); |
| } |
| } |
| |
| TEST_F(RegistrationTest, TestServerStartWallTime) { |
| ServerRegistrationPB reg; |
| cluster_->mini_master()->master()->GetMasterRegistration(®); |
| ASSERT_LE(setup_time_, reg.start_time()); |
| ASSERT_LE(reg.start_time(), WallTime_Now()); |
| |
| for (int i = 0; i < cluster_->num_tablet_servers(); ++i) { |
| auto start_walltime = |
| cluster_->mini_tablet_server(i)->server()->start_walltime(); |
| ASSERT_LE(setup_time_, start_walltime); |
| ASSERT_LE(start_walltime, WallTime_Now()); |
| } |
| } |
| |
| // Test starting multiple tablet servers and ensuring they both register with the master. |
| TEST_F(RegistrationTest, TestMultipleTS) { |
| ASSERT_OK(cluster_->AddTabletServer()); |
| ASSERT_OK(cluster_->WaitForTabletServerCount(2)); |
| } |
| |
| // TODO: this doesn't belong under "RegistrationTest" - rename this file |
| // to something more appropriate - doesn't seem worth having separate |
| // whole test suites for registration, tablet reports, etc. |
| TEST_F(RegistrationTest, TestTabletReports) { |
| auto GetCatalogMetric = [&](CounterPrototype& prototype) { |
| auto metrics = cluster_->mini_master()->master()->catalog_manager()-> |
| sys_catalog()->tablet_replica()->tablet()->GetMetricEntity(); |
| return prototype.Instantiate(metrics)->value(); |
| }; |
| const int startup_rows_inserted = GetCatalogMetric(METRIC_rows_inserted); |
| |
| // Add a table, make sure it reports itself. |
| string tablet_id_1; |
| NO_FATALS(CreateTableForTesting( |
| cluster_->mini_master(), "tablet-reports-1", schema_, &tablet_id_1)); |
| TabletLocationsPB locs_1; |
| ASSERT_OK(WaitForReplicaCount(tablet_id_1, 1, &locs_1)); |
| ASSERT_EQ(1, locs_1.interned_replicas_size()); |
| |
| // Check that we inserted the right number of rows for the new single-tablet table |
| // (one for the table, one for the tablet). |
| const int post_create_rows_inserted = GetCatalogMetric(METRIC_rows_inserted); |
| EXPECT_EQ(2, post_create_rows_inserted - startup_rows_inserted) |
| << "Should have inserted one row each for the table and tablet"; |
| |
| // Add another table, make sure it is reported via incremental. |
| string tablet_id_2; |
| NO_FATALS(CreateTableForTesting( |
| cluster_->mini_master(), "tablet-reports-2", schema_, &tablet_id_2)); |
| ASSERT_OK(WaitForReplicaCount(tablet_id_2, 1)); |
| |
| // Shut down the whole system, bring it back up, and make sure the tablets |
| // are reported. |
| cluster_->Shutdown(); |
| ASSERT_OK(cluster_->Start()); |
| |
| // After restart, check that the tablet reports produced the expected number of |
| // writes to the catalog table: |
| // - No inserts, because there are no new tablets. |
| // - Two updates, since both replicas increase their term on restart. |
| // |
| // It can take some time for the TS to re-heartbeat. To avoid flakiness, here |
| // it's easier to wait for the target non-zero metric value instead of |
| // hard-coding an extra delay. |
| AssertEventually([&]() { |
| ASSERT_EQ(0, GetCatalogMetric(METRIC_rows_inserted)); |
| ASSERT_EQ(2, GetCatalogMetric(METRIC_rows_updated)); |
| }); |
| |
| // If we restart just the master, it should not write any data to the catalog, since the |
| // tablets themselves are not changing term, etc. |
| cluster_->mini_master()->Shutdown(); |
| ASSERT_OK(cluster_->mini_master()->Restart()); |
| |
| // Sleep for a second to make sure the TS has plenty of time to re-heartbeat. |
| // The metrics are updated after processing TS heartbeats, and we want to |
| // capture updated metric readings. |
| SleepFor(MonoDelta::FromSeconds(1)); |
| |
| EXPECT_EQ(0, GetCatalogMetric(METRIC_rows_inserted)); |
| EXPECT_EQ(0, GetCatalogMetric(METRIC_rows_updated)); |
| } |
| |
| // Check that after the tablet server registers, it gets a signed cert |
| // from the master. |
| TEST_F(RegistrationTest, TestTSGetsSignedX509Certificate) { |
| MiniTabletServer* ts = cluster_->mini_tablet_server(0); |
| ASSERT_EVENTUALLY([&](){ |
| ASSERT_TRUE(ts->server()->tls_context().has_signed_cert()); |
| }); |
| } |
| |
| // Check that after the tablet server registers, it gets the list of valid |
| // public token signing keys. |
| TEST_F(RegistrationTest, TestTSGetsTskList) { |
| MiniTabletServer* ts = cluster_->mini_tablet_server(0); |
| ASSERT_EVENTUALLY([&](){ |
| ASSERT_FALSE(ts->server()->token_verifier().ExportKeys().empty()); |
| }); |
| } |
| |
| // Test that, if the tserver has HTTPS enabled, the master links to it |
| // via https:// URLs and not http://. |
| TEST_F(RegistrationTest, TestExposeHttpsURLs) { |
| MiniTabletServer* ts = cluster_->mini_tablet_server(0); |
| string password; |
| WebserverOptions* opts = &ts->options()->webserver_opts; |
| ASSERT_OK(security::CreateTestSSLCertWithEncryptedKey(GetTestDataDirectory(), |
| &opts->certificate_file, |
| &opts->private_key_file, |
| &password)); |
| opts->private_key_password_cmd = strings::Substitute("echo $0", password); |
| ts->Shutdown(); |
| ASSERT_OK(ts->Start()); |
| |
| // The URL displayed on the page uses a hostname. Rather than |
| // dealing with figuring out what the hostname should be, just |
| // use a more permissive regex which doesn't check the host. |
| string expected_url_regex = strings::Substitute( |
| "https://[a-zA-Z0-9.-]+:$0", opts->port); |
| |
| // Need "eventually" here because the tserver may take a few seconds |
| // to re-register while starting up. |
| ASSERT_EVENTUALLY([&](){ |
| string contents; |
| NO_FATALS(CheckTabletServersPage(&contents)); |
| ASSERT_STR_MATCHES(contents, expected_url_regex); |
| }); |
| } |
| |
| } // namespace kudu |