| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "kudu/tserver/ts_tablet_manager.h" |
| |
| #include <cstdint> |
| #include <functional> |
| #include <memory> |
| #include <optional> |
| #include <ostream> |
| #include <string> |
| #include <type_traits> |
| #include <utility> |
| #include <vector> |
| |
| #include <gflags/gflags.h> |
| #include <glog/logging.h> |
| #include <gtest/gtest.h> |
| |
| #include "kudu/common/common.pb.h" |
| #include "kudu/common/partial_row.h" |
| #include "kudu/common/partition.h" |
| #include "kudu/common/schema.h" |
| #include "kudu/consensus/metadata.pb.h" |
| #include "kudu/consensus/opid_util.h" |
| #include "kudu/consensus/raft_consensus.h" |
| #include "kudu/gutil/port.h" |
| #include "kudu/gutil/ref_counted.h" |
| #include "kudu/gutil/strings/substitute.h" |
| #include "kudu/master/master.pb.h" |
| #include "kudu/tablet/local_tablet_writer.h" |
| #include "kudu/tablet/metadata.pb.h" |
| #include "kudu/tablet/tablet-harness.h" |
| #include "kudu/tablet/tablet.h" |
| #include "kudu/tablet/tablet_metadata.h" |
| #include "kudu/tablet/tablet_replica.h" |
| #include "kudu/tserver/heartbeater.h" |
| #include "kudu/tserver/mini_tablet_server.h" |
| #include "kudu/tserver/tablet_server.h" |
| #include "kudu/util/logging.h" |
| #include "kudu/util/monotime.h" |
| #include "kudu/util/net/net_util.h" |
| #include "kudu/util/oid_generator.h" |
| #include "kudu/util/pb_util.h" |
| #include "kudu/util/status.h" |
| #include "kudu/util/test_macros.h" |
| #include "kudu/util/test_util.h" |
| |
| DEFINE_int32(startup_benchmark_tablet_count_for_testing, 100, |
| "Tablet count to do startup benchmark."); |
| |
| DECLARE_bool(enable_leader_failure_detection); |
| DECLARE_int32(num_tablets_to_open_simultaneously); |
| DECLARE_bool(tablet_bootstrap_skip_opening_tablet_for_testing); |
| DECLARE_int32(tablet_metadata_load_inject_latency_ms); |
| DECLARE_int32(update_tablet_metrics_interval_ms); |
| |
| #define ASSERT_REPORT_HAS_UPDATED_TABLET(report, tablet_id) \ |
| NO_FATALS(AssertReportHasUpdatedTablet(report, tablet_id)) |
| |
| #define ASSERT_MONOTONIC_REPORT_SEQNO(report_seqno, tablet_report) \ |
| NO_FATALS(AssertMonotonicReportSeqno(report_seqno, tablet_report)) |
| |
| using kudu::consensus::kInvalidOpIdIndex; |
| using kudu::consensus::RaftConfigPB; |
| using kudu::master::ReportedTabletPB; |
| using kudu::master::TabletReportPB; |
| using kudu::pb_util::SecureShortDebugString; |
| using kudu::tablet::LocalTabletWriter; |
| using kudu::tablet::Tablet; |
| using kudu::tablet::TabletReplica; |
| using std::nullopt; |
| using std::optional; |
| using std::string; |
| using std::unique_ptr; |
| using std::vector; |
| using strings::Substitute; |
| |
| namespace kudu { |
| |
| class FsManager; |
| |
| namespace tserver { |
| |
| class TsTabletManagerTest : public KuduTest { |
| public: |
| TsTabletManagerTest() |
| : schema_({ ColumnSchema("key", INT32) }, 1) { |
| } |
| |
| virtual void SetUp() OVERRIDE { |
| KuduTest::SetUp(); |
| mini_server_.reset(new MiniTabletServer(GetTestPath("TsTabletManagerTest-fsroot"), |
| HostPort("127.0.0.1", 0))); |
| ASSERT_OK(mini_server_->Start()); |
| mini_server_->FailHeartbeats(); |
| |
| config_ = mini_server_->CreateLocalConfig(); |
| |
| tablet_manager_ = mini_server_->server()->tablet_manager(); |
| fs_manager_ = mini_server_->server()->fs_manager(); |
| heartbeater_ = mini_server_->server()->heartbeater(); |
| } |
| |
| virtual void TearDown() OVERRIDE { |
| KuduTest::TearDown(); |
| } |
| |
| Status CreateNewTablet(const std::string& tablet_id, |
| const Schema& schema, |
| bool wait_leader, |
| optional<TableExtraConfigPB> extra_config, |
| optional<std::string> dimension_label, |
| scoped_refptr<tablet::TabletReplica>* out_tablet_replica) { |
| Schema full_schema = SchemaBuilder(schema).Build(); |
| std::pair<PartitionSchema, Partition> partition = tablet::CreateDefaultPartition(full_schema); |
| |
| scoped_refptr<tablet::TabletReplica> tablet_replica; |
| RETURN_NOT_OK(tablet_manager_->CreateNewTablet(tablet_id, tablet_id, partition.second, |
| tablet_id, |
| full_schema, partition.first, |
| config_, |
| std::move(extra_config), |
| std::move(dimension_label), |
| /*table_type*/nullopt, |
| &tablet_replica)); |
| if (out_tablet_replica) { |
| (*out_tablet_replica) = tablet_replica; |
| } |
| |
| RETURN_NOT_OK(tablet_replica->WaitUntilConsensusRunning(MonoDelta::FromMilliseconds(2000))); |
| return wait_leader ? tablet_replica->consensus()->WaitUntilLeader(MonoDelta::FromSeconds(10)) : |
| Status::OK(); |
| } |
| |
| void GenerateFullTabletReport(TabletReportPB* report) { |
| vector<TabletReportPB> reports = |
| heartbeater_->GenerateFullTabletReportsForTests(); |
| ASSERT_EQ(1, reports.size()); |
| report->CopyFrom(reports[0]); |
| } |
| |
| void GenerateIncrementalTabletReport(TabletReportPB* report) { |
| vector<TabletReportPB> reports = |
| heartbeater_->GenerateIncrementalTabletReportsForTests(); |
| ASSERT_EQ(1, reports.size()); |
| report->CopyFrom(reports[0]); |
| } |
| |
| void MarkTabletReportAcknowledged(const TabletReportPB& report) { |
| heartbeater_->MarkTabletReportsAcknowledgedForTests({ report }); |
| } |
| |
| void InsertTestRows(Tablet* tablet, int64_t count) { |
| LocalTabletWriter writer(tablet, &schema_); |
| KuduPartialRow row(&schema_); |
| for (int64_t i = 0; i < count; i++) { |
| ASSERT_OK(row.SetInt32(0, i)); |
| ASSERT_OK(writer.Insert(row)); |
| } |
| } |
| |
| protected: |
| unique_ptr<MiniTabletServer> mini_server_; |
| FsManager* fs_manager_; |
| TSTabletManager* tablet_manager_; |
| Heartbeater* heartbeater_; |
| |
| Schema schema_; |
| RaftConfigPB config_; |
| }; |
| |
| TEST_F(TsTabletManagerTest, TestCreateTablet) { |
| string tablet1 = "0fffffffffffffffffffffffffffffff"; |
| string tablet2 = "1fffffffffffffffffffffffffffffff"; |
| scoped_refptr<TabletReplica> replica1; |
| scoped_refptr<TabletReplica> replica2; |
| TableExtraConfigPB extra_config; |
| extra_config.set_history_max_age_sec(7200); |
| |
| // Create a new tablet. |
| ASSERT_OK(CreateNewTablet(tablet1, schema_, true, nullopt, nullopt, &replica1)); |
| // Create a new tablet with extra config. |
| ASSERT_OK(CreateNewTablet(tablet2, schema_, true, extra_config, nullopt, &replica2)); |
| ASSERT_EQ(tablet1, replica1->tablet()->tablet_id()); |
| ASSERT_EQ(tablet2, replica2->tablet()->tablet_id()); |
| ASSERT_EQ(nullopt, replica1->tablet()->metadata()->extra_config()); |
| ASSERT_NE(nullopt, replica2->tablet()->metadata()->extra_config()); |
| ASSERT_EQ(7200, replica2->tablet()->metadata()->extra_config()->history_max_age_sec()); |
| replica1.reset(); |
| replica2.reset(); |
| |
| // Re-load the tablet manager from the filesystem. |
| LOG(INFO) << "Shutting down tablet manager"; |
| mini_server_->Shutdown(); |
| LOG(INFO) << "Restarting tablet manager"; |
| mini_server_.reset(new MiniTabletServer(GetTestPath("TsTabletManagerTest-fsroot"), |
| HostPort("127.0.0.1", 0))); |
| ASSERT_OK(mini_server_->Start()); |
| ASSERT_OK(mini_server_->WaitStarted()); |
| tablet_manager_ = mini_server_->server()->tablet_manager(); |
| |
| // Ensure that the tablet got re-loaded and re-opened off disk. |
| ASSERT_TRUE(tablet_manager_->LookupTablet(tablet1, &replica1)); |
| ASSERT_TRUE(tablet_manager_->LookupTablet(tablet2, &replica2)); |
| ASSERT_EQ(tablet1, replica1->tablet()->tablet_id()); |
| ASSERT_EQ(tablet2, replica2->tablet()->tablet_id()); |
| ASSERT_EQ(nullopt, replica1->tablet()->metadata()->extra_config()); |
| ASSERT_NE(nullopt, replica2->tablet()->metadata()->extra_config()); |
| ASSERT_EQ(7200, replica2->tablet()->metadata()->extra_config()->history_max_age_sec()); |
| } |
| |
| static void AssertMonotonicReportSeqno(int64_t* report_seqno, |
| const TabletReportPB &report) { |
| ASSERT_LT(*report_seqno, report.sequence_number()); |
| *report_seqno = report.sequence_number(); |
| } |
| |
| static void AssertReportHasUpdatedTablet(const TabletReportPB& report, |
| const string& tablet_id) { |
| ASSERT_GE(report.updated_tablets_size(), 0); |
| bool found_tablet = false; |
| for (const ReportedTabletPB& reported_tablet : report.updated_tablets()) { |
| if (reported_tablet.tablet_id() == tablet_id) { |
| found_tablet = true; |
| ASSERT_TRUE(reported_tablet.has_consensus_state()); |
| ASSERT_TRUE(reported_tablet.consensus_state().has_current_term()) |
| << SecureShortDebugString(reported_tablet); |
| ASSERT_FALSE(reported_tablet.consensus_state().leader_uuid().empty()) |
| << SecureShortDebugString(reported_tablet); |
| ASSERT_TRUE(reported_tablet.consensus_state().has_committed_config()); |
| const RaftConfigPB& committed_config = reported_tablet.consensus_state().committed_config(); |
| ASSERT_EQ(kInvalidOpIdIndex, committed_config.opid_index()); |
| ASSERT_EQ(1, committed_config.peers_size()); |
| ASSERT_TRUE(committed_config.peers(0).has_permanent_uuid()) |
| << SecureShortDebugString(reported_tablet); |
| ASSERT_EQ(committed_config.peers(0).permanent_uuid(), |
| reported_tablet.consensus_state().leader_uuid()) |
| << SecureShortDebugString(reported_tablet); |
| } |
| } |
| ASSERT_TRUE(found_tablet); |
| } |
| |
| TEST_F(TsTabletManagerTest, TestTabletReports) { |
| TabletReportPB report; |
| int64_t seqno = -1; |
| |
| // Generate a tablet report before any tablets are loaded. Should be empty. |
| GenerateFullTabletReport(&report); |
| ASSERT_FALSE(report.is_incremental()); |
| ASSERT_EQ(0, report.updated_tablets().size()); |
| ASSERT_MONOTONIC_REPORT_SEQNO(&seqno, report); |
| MarkTabletReportAcknowledged(report); |
| |
| // Another report should now be incremental, but with no changes. |
| GenerateIncrementalTabletReport(&report); |
| ASSERT_TRUE(report.is_incremental()); |
| ASSERT_EQ(0, report.updated_tablets().size()); |
| ASSERT_MONOTONIC_REPORT_SEQNO(&seqno, report); |
| MarkTabletReportAcknowledged(report); |
| |
| // Create a tablet and do another incremental report - should include the tablet. |
| ASSERT_OK(CreateNewTablet("tablet-1", schema_, true, nullopt, nullopt, nullptr)); |
| int updated_tablets = 0; |
| while (updated_tablets != 1) { |
| GenerateIncrementalTabletReport(&report); |
| updated_tablets = report.updated_tablets().size(); |
| ASSERT_TRUE(report.is_incremental()); |
| ASSERT_MONOTONIC_REPORT_SEQNO(&seqno, report); |
| } |
| ASSERT_REPORT_HAS_UPDATED_TABLET(report, "tablet-1"); |
| |
| // If we don't acknowledge the report, and ask for another incremental report, |
| // it should include the tablet again. |
| GenerateIncrementalTabletReport(&report); |
| ASSERT_TRUE(report.is_incremental()); |
| ASSERT_EQ(1, report.updated_tablets().size()); |
| ASSERT_REPORT_HAS_UPDATED_TABLET(report, "tablet-1"); |
| ASSERT_MONOTONIC_REPORT_SEQNO(&seqno, report); |
| |
| // Now acknowledge the last report, and further incrementals should be empty. |
| MarkTabletReportAcknowledged(report); |
| GenerateIncrementalTabletReport(&report); |
| ASSERT_TRUE(report.is_incremental()); |
| ASSERT_EQ(0, report.updated_tablets().size()); |
| ASSERT_MONOTONIC_REPORT_SEQNO(&seqno, report); |
| MarkTabletReportAcknowledged(report); |
| |
| // Create a second tablet, and ensure the incremental report shows it. |
| ASSERT_OK(CreateNewTablet("tablet-2", schema_, true, nullopt, nullopt, nullptr)); |
| |
| // Wait up to 10 seconds to get a tablet report from tablet-2. |
| // TabletReplica does not mark tablets dirty until after it commits the |
| // initial configuration change, so there is also a window for tablet-1 to |
| // have been marked dirty since the last report. |
| MonoDelta timeout(MonoDelta::FromSeconds(10)); |
| MonoTime start(MonoTime::Now()); |
| report.Clear(); |
| while (true) { |
| bool found_tablet_2 = false; |
| GenerateIncrementalTabletReport(&report); |
| ASSERT_TRUE(report.is_incremental()) << SecureShortDebugString(report); |
| ASSERT_MONOTONIC_REPORT_SEQNO(&seqno, report) << SecureShortDebugString(report); |
| for (const ReportedTabletPB& reported_tablet : report.updated_tablets()) { |
| if (reported_tablet.tablet_id() == "tablet-2") { |
| found_tablet_2 = true; |
| break; |
| } |
| } |
| if (found_tablet_2) break; |
| MonoDelta elapsed(MonoTime::Now() - start); |
| ASSERT_TRUE(elapsed < timeout) |
| << "Waited too long for tablet-2 to be marked dirty: " |
| << elapsed.ToString() << ". " |
| << "Latest report: " << SecureShortDebugString(report); |
| SleepFor(MonoDelta::FromMilliseconds(10)); |
| } |
| |
| MarkTabletReportAcknowledged(report); |
| |
| // Asking for a full tablet report should re-report both tablets |
| GenerateFullTabletReport(&report); |
| ASSERT_FALSE(report.is_incremental()); |
| ASSERT_EQ(2, report.updated_tablets().size()); |
| ASSERT_REPORT_HAS_UPDATED_TABLET(report, "tablet-1"); |
| ASSERT_REPORT_HAS_UPDATED_TABLET(report, "tablet-2"); |
| ASSERT_MONOTONIC_REPORT_SEQNO(&seqno, report); |
| } |
| |
| TEST_F(TsTabletManagerTest, TestTabletStatsReports) { |
| TabletReportPB report; |
| int64_t seqno = -1; |
| const int64_t kCount = 12; |
| |
| // 1. Create two tablets. |
| scoped_refptr<tablet::TabletReplica> replica1; |
| ASSERT_OK(CreateNewTablet("tablet-1", schema_, true, nullopt, nullopt, &replica1)); |
| ASSERT_OK(CreateNewTablet("tablet-2", schema_, true, nullopt, nullopt, nullptr)); |
| |
| // 2. Do a full report - should include two tablets and statistics are uninitialized. |
| NO_FATALS(GenerateFullTabletReport(&report)); |
| ASSERT_FALSE(report.is_incremental()); |
| ASSERT_EQ(2, report.updated_tablets().size()); |
| ASSERT_TRUE(report.updated_tablets(0).has_stats()); |
| ASSERT_TRUE(report.updated_tablets(1).has_stats()); |
| ASSERT_FALSE(report.updated_tablets(0).stats().has_on_disk_size()); |
| ASSERT_FALSE(report.updated_tablets(0).stats().has_live_row_count()); |
| ASSERT_FALSE(report.updated_tablets(1).stats().has_on_disk_size()); |
| ASSERT_FALSE(report.updated_tablets(1).stats().has_live_row_count()); |
| ASSERT_MONOTONIC_REPORT_SEQNO(&seqno, report); |
| MarkTabletReportAcknowledged(report); |
| |
| // 3. Trigger updates to tablet statistics as soon as possible. |
| tablet_manager_->SetNextUpdateTimeForTests(); |
| heartbeater_->TriggerASAP(); |
| |
| // Do an incremental report - should include two tablets and statistics have been initialized. |
| ASSERT_EVENTUALLY([&] () { |
| NO_FATALS(GenerateIncrementalTabletReport(&report)); |
| ASSERT_TRUE(report.is_incremental()); |
| ASSERT_EQ(2, report.updated_tablets().size()); |
| }); |
| ASSERT_MONOTONIC_REPORT_SEQNO(&seqno, report); |
| for (int i = 0; i < 2; ++i) { |
| ASSERT_TRUE(report.updated_tablets(i).has_stats()); |
| ASSERT_GT(report.updated_tablets(i).stats().on_disk_size(), 0); |
| ASSERT_EQ(0, report.updated_tablets(i).stats().live_row_count()); |
| } |
| ASSERT_REPORT_HAS_UPDATED_TABLET(report, "tablet-1"); |
| ASSERT_REPORT_HAS_UPDATED_TABLET(report, "tablet-2"); |
| MarkTabletReportAcknowledged(report); |
| |
| // Clean the pending dirty tablets that are not acknowledged since the seqno race. |
| ASSERT_EVENTUALLY([&] () { |
| NO_FATALS(GenerateIncrementalTabletReport(&report)); |
| ASSERT_TRUE(report.is_incremental()); |
| MarkTabletReportAcknowledged(report); |
| ASSERT_EQ(0, report.updated_tablets().size()); |
| }); |
| ASSERT_MONOTONIC_REPORT_SEQNO(&seqno, report); |
| |
| // 4. Write some test rows to 'tablet-1'. |
| NO_FATALS(InsertTestRows(replica1->tablet(), kCount)); |
| |
| // Trigger updates to tablet statistics as soon as possible again. |
| tablet_manager_->SetNextUpdateTimeForTests(); |
| heartbeater_->TriggerASAP(); |
| |
| // Do an incremental report - should include the tablet and check the statistics. |
| ASSERT_EVENTUALLY([&] () { |
| NO_FATALS(GenerateIncrementalTabletReport(&report)); |
| ASSERT_TRUE(report.is_incremental()); |
| ASSERT_EQ(1, report.updated_tablets().size()); |
| }); |
| ASSERT_MONOTONIC_REPORT_SEQNO(&seqno, report); |
| ASSERT_TRUE(report.updated_tablets(0).has_stats()); |
| ASSERT_GT(report.updated_tablets(0).stats().on_disk_size(), 0); |
| ASSERT_EQ(kCount, report.updated_tablets(0).stats().live_row_count()); |
| ASSERT_REPORT_HAS_UPDATED_TABLET(report, "tablet-1"); |
| MarkTabletReportAcknowledged(report); |
| } |
| |
| TEST_F(TsTabletManagerTest, StartupBenchmark) { |
| const int64_t kTabletCount = FLAGS_startup_benchmark_tablet_count_for_testing; |
| |
| FLAGS_enable_leader_failure_detection = false; |
| |
| // Mute logs, cause there are too many tablets to be created. |
| FLAGS_minloglevel = 2; |
| ObjectIdGenerator generator; |
| for (int i = 0; i < kTabletCount; i++) { |
| KLOG_EVERY_N_SECS(ERROR, 1) << Substitute("Created tablet ($0/$1 complete)", i, kTabletCount); |
| ASSERT_OK(CreateNewTablet(generator.Next(), schema_, false, nullopt, nullopt, nullptr)); |
| } |
| |
| mini_server_->Shutdown(); |
| // Revert log level to see how much time cost when load tablet metadata. |
| FLAGS_minloglevel = 0; |
| FLAGS_tablet_bootstrap_skip_opening_tablet_for_testing = true; |
| FLAGS_tablet_metadata_load_inject_latency_ms = 2; |
| ASSERT_OK(mini_server_->Start()); |
| // Mute logs, cause there are too many tablets to be shutdown. |
| FLAGS_minloglevel = 2; |
| } |
| |
| } // namespace tserver |
| } // namespace kudu |