| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include <sys/types.h> |
| |
| #include <cstddef> |
| #include <cstdint> |
| #include <functional> |
| #include <initializer_list> |
| #include <limits> |
| #include <memory> |
| #include <ostream> |
| #include <set> |
| #include <string> |
| #include <tuple> // IWYU pragma: keep |
| #include <unordered_map> |
| #include <utility> |
| #include <vector> |
| |
| #include <boost/optional/optional.hpp> |
| #include <glog/logging.h> |
| #include <gtest/gtest.h> |
| #include <rapidjson/document.h> |
| |
| #include "kudu/client/client.h" |
| #include "kudu/client/scan_batch.h" |
| #include "kudu/client/schema.h" |
| #include "kudu/client/shared_ptr.h" // IWYU pragma: keep |
| #include "kudu/common/partial_row.h" |
| #include "kudu/common/schema.h" |
| #include "kudu/common/wire_protocol-test-util.h" |
| #include "kudu/common/wire_protocol.h" |
| #include "kudu/common/wire_protocol.pb.h" |
| #include "kudu/consensus/consensus.pb.h" |
| #include "kudu/consensus/metadata.pb.h" |
| #include "kudu/gutil/basictypes.h" |
| #include "kudu/gutil/map-util.h" |
| #include "kudu/gutil/strings/split.h" |
| #include "kudu/gutil/strings/substitute.h" |
| #include "kudu/integration-tests/cluster_itest_util.h" |
| #include "kudu/integration-tests/cluster_verifier.h" |
| #include "kudu/integration-tests/external_mini_cluster-itest-base.h" |
| #include "kudu/integration-tests/mini_cluster_fs_inspector.h" |
| #include "kudu/integration-tests/test_workload.h" |
| #include "kudu/master/master.pb.h" |
| #include "kudu/master/master.proxy.h" |
| #include "kudu/mini-cluster/external_mini_cluster.h" |
| #include "kudu/rpc/rpc_controller.h" |
| #include "kudu/tablet/metadata.pb.h" |
| #include "kudu/tablet/tablet.pb.h" |
| #include "kudu/tserver/tserver.pb.h" |
| #include "kudu/tserver/tserver_admin.pb.h" |
| #include "kudu/tserver/tserver_admin.proxy.h" |
| #include "kudu/util/curl_util.h" |
| #include "kudu/util/faststring.h" |
| #include "kudu/util/metrics.h" |
| #include "kudu/util/monotime.h" |
| #include "kudu/util/net/net_util.h" |
| #include "kudu/util/pb_util.h" |
| #include "kudu/util/pstack_watcher.h" |
| #include "kudu/util/status.h" |
| #include "kudu/util/stopwatch.h" |
| #include "kudu/util/subprocess.h" |
| #include "kudu/util/test_macros.h" |
| #include "kudu/util/test_util.h" |
| |
| using boost::none; |
| using kudu::client::KuduClient; |
| using kudu::client::KuduScanner; |
| using kudu::client::KuduScanBatch; |
| using kudu::client::KuduSchema; |
| using kudu::client::KuduTable; |
| using kudu::client::KuduTableCreator; |
| using kudu::cluster::ExternalTabletServer; |
| using kudu::consensus::COMMITTED_OPID; |
| using kudu::consensus::ConsensusMetadataPB; |
| using kudu::consensus::ConsensusStatePB; |
| using kudu::consensus::EXCLUDE_HEALTH_REPORT; |
| using kudu::consensus::RaftPeerPB; |
| using kudu::itest::AddServer; |
| using kudu::itest::DeleteTablet; |
| using kudu::itest::DeleteTabletWithRetries; |
| using kudu::itest::GetInt64Metric; |
| using kudu::itest::RemoveServer; |
| using kudu::itest::TServerDetails; |
| using kudu::pb_util::SecureDebugString; |
| using kudu::pb_util::SecureShortDebugString; |
| using kudu::tablet::TABLET_DATA_COPYING; |
| using kudu::tablet::TABLET_DATA_DELETED; |
| using kudu::tablet::TABLET_DATA_READY; |
| using kudu::tablet::TABLET_DATA_TOMBSTONED; |
| using kudu::tablet::TabletDataState; |
| using kudu::tablet::TabletSuperBlockPB; |
| using kudu::tserver::ListTabletsResponsePB; |
| using kudu::tserver::TabletServerErrorPB; |
| using std::numeric_limits; |
| using std::string; |
| using std::unique_ptr; |
| using std::vector; |
| using strings::Substitute; |
| |
| METRIC_DECLARE_entity(server); |
| METRIC_DECLARE_histogram(handler_latency_kudu_master_MasterService_TSHeartbeat); |
| METRIC_DECLARE_histogram(handler_latency_kudu_tserver_TabletServerAdminService_DeleteTablet); |
| |
| namespace kudu { |
| |
| class DeleteTableITest : public ExternalMiniClusterITestBase { |
| protected: |
| enum IsCMetaExpected { |
| CMETA_NOT_EXPECTED = 0, |
| CMETA_EXPECTED = 1 |
| }; |
| |
| enum IsSuperBlockExpected { |
| SUPERBLOCK_NOT_EXPECTED = 0, |
| SUPERBLOCK_EXPECTED = 1 |
| }; |
| |
| enum ErrorDumpStackSelector { |
| ON_ERROR_DO_NOT_DUMP_STACKS = 0, |
| ON_ERROR_DUMP_STACKS = 1, |
| }; |
| |
| // Get the UUID of the leader of the specified tablet, as seen by the TS with |
| // the given 'ts_uuid'. |
| string GetLeaderUUID(const string& ts_uuid, const string& tablet_id); |
| |
| Status CheckTabletTombstonedOrDeletedOnTS( |
| int index, |
| const string& tablet_id, |
| TabletDataState data_state, |
| IsCMetaExpected is_cmeta_expected, |
| IsSuperBlockExpected is_superblock_expected); |
| |
| Status CheckTabletTombstonedOnTS(int index, |
| const string& tablet_id, |
| IsCMetaExpected is_cmeta_expected); |
| |
| Status CheckTabletDeletedOnTS(int index, |
| const string& tablet_id, |
| IsSuperBlockExpected is_superblock_expected); |
| |
| void WaitForTabletTombstonedOnTS(int index, |
| const string& tablet_id, |
| IsCMetaExpected is_cmeta_expected); |
| |
| void WaitForTabletDeletedOnTS(int index, |
| const string& tablet_id, |
| IsSuperBlockExpected is_superblock_expected); |
| |
| void WaitForTSToCrash(int index); |
| void WaitForAllTSToCrash(); |
| void WaitUntilTabletRunning(int index, const std::string& tablet_id); |
| |
| // Delete the given table. If the operation times out, optionally dump |
| // the master stacks to help debug master-side deadlocks. |
| void DeleteTable(const string& table_name, |
| ErrorDumpStackSelector selector = ON_ERROR_DUMP_STACKS); |
| }; |
| |
| string DeleteTableITest::GetLeaderUUID(const string& ts_uuid, const string& tablet_id) { |
| ConsensusStatePB cstate; |
| CHECK_OK(itest::GetConsensusState(ts_map_[ts_uuid], tablet_id, MonoDelta::FromSeconds(10), |
| EXCLUDE_HEALTH_REPORT, &cstate)); |
| return cstate.leader_uuid(); |
| } |
| |
| Status DeleteTableITest::CheckTabletTombstonedOrDeletedOnTS( |
| int index, |
| const string& tablet_id, |
| TabletDataState data_state, |
| IsCMetaExpected is_cmeta_expected, |
| IsSuperBlockExpected is_superblock_expected) { |
| CHECK(data_state == TABLET_DATA_TOMBSTONED || data_state == TABLET_DATA_DELETED) << data_state; |
| // There should be no WALs and no cmeta. |
| if (inspect_->CountFilesInWALDirForTS(index, tablet_id) > 0) { |
| return Status::IllegalState("WAL segments exist for tablet", tablet_id); |
| } |
| if (is_cmeta_expected == CMETA_EXPECTED && |
| !inspect_->DoesConsensusMetaExistForTabletOnTS(index, tablet_id)) { |
| return Status::IllegalState("Expected cmeta for tablet " + tablet_id + " but it doesn't exist"); |
| } |
| if (is_superblock_expected == SUPERBLOCK_EXPECTED) { |
| RETURN_NOT_OK(inspect_->CheckTabletDataStateOnTS(index, tablet_id, { data_state })); |
| } else { |
| TabletSuperBlockPB superblock_pb; |
| Status s = inspect_->ReadTabletSuperBlockOnTS(index, tablet_id, &superblock_pb); |
| if (!s.IsNotFound()) { |
| return Status::IllegalState("Found unexpected superblock for tablet " + tablet_id); |
| } |
| } |
| return Status::OK(); |
| } |
| |
| Status DeleteTableITest::CheckTabletTombstonedOnTS(int index, |
| const string& tablet_id, |
| IsCMetaExpected is_cmeta_expected) { |
| return CheckTabletTombstonedOrDeletedOnTS(index, tablet_id, TABLET_DATA_TOMBSTONED, |
| is_cmeta_expected, SUPERBLOCK_EXPECTED); |
| } |
| |
| Status DeleteTableITest::CheckTabletDeletedOnTS(int index, |
| const string& tablet_id, |
| IsSuperBlockExpected is_superblock_expected) { |
| return CheckTabletTombstonedOrDeletedOnTS(index, tablet_id, TABLET_DATA_DELETED, |
| CMETA_NOT_EXPECTED, is_superblock_expected); |
| } |
| |
| void DeleteTableITest::WaitForTabletTombstonedOnTS(int index, |
| const string& tablet_id, |
| IsCMetaExpected is_cmeta_expected) { |
| Status s; |
| for (int i = 0; i < 6000; i++) { |
| s = CheckTabletTombstonedOnTS(index, tablet_id, is_cmeta_expected); |
| if (s.ok()) return; |
| SleepFor(MonoDelta::FromMilliseconds(10)); |
| } |
| ASSERT_OK(s); |
| } |
| |
| void DeleteTableITest::WaitForTabletDeletedOnTS(int index, |
| const string& tablet_id, |
| IsSuperBlockExpected is_superblock_expected) { |
| Status s; |
| for (int i = 0; i < 6000; i++) { |
| s = CheckTabletDeletedOnTS(index, tablet_id, is_superblock_expected); |
| if (s.ok()) return; |
| SleepFor(MonoDelta::FromMilliseconds(10)); |
| } |
| ASSERT_OK(s); |
| } |
| |
| void DeleteTableITest::WaitForTSToCrash(int index) { |
| auto ts = cluster_->tablet_server(index); |
| SCOPED_TRACE(ts->instance_id().permanent_uuid()); |
| ASSERT_OK(ts->WaitForInjectedCrash(MonoDelta::FromSeconds(60))); |
| } |
| |
| void DeleteTableITest::WaitForAllTSToCrash() { |
| for (int i = 0; i < cluster_->num_tablet_servers(); i++) { |
| NO_FATALS(WaitForTSToCrash(i)); |
| } |
| } |
| |
| void DeleteTableITest::WaitUntilTabletRunning(int index, const std::string& tablet_id) { |
| ASSERT_OK(itest::WaitUntilTabletRunning(ts_map_[cluster_->tablet_server(index)->uuid()], |
| tablet_id, MonoDelta::FromSeconds(60))); |
| } |
| |
| void DeleteTableITest::DeleteTable(const string& table_name, |
| ErrorDumpStackSelector selector) { |
| Status s = client_->DeleteTable(table_name); |
| if (s.IsTimedOut() && (ON_ERROR_DUMP_STACKS == selector)) { |
| WARN_NOT_OK(PstackWatcher::DumpPidStacks(cluster_->master()->pid()), |
| "Couldn't dump stacks"); |
| } |
| ASSERT_OK(s); |
| } |
| |
| // Test deleting an empty table, and ensure that the tablets get removed, |
| // and the master no longer shows the table as existing. |
| TEST_F(DeleteTableITest, TestDeleteEmptyTable) { |
| NO_FATALS(StartCluster()); |
| // Create a table on the cluster. We're just using TestWorkload |
| // as a convenient way to create it. |
| TestWorkload(cluster_.get()).Setup(); |
| |
| // The table should have replicas on all three tservers. |
| ASSERT_OK(inspect_->WaitForReplicaCount(3)); |
| |
| // Grab the tablet ID (used later). |
| vector<string> tablets = inspect_->ListTabletsOnTS(1); |
| ASSERT_EQ(1, tablets.size()); |
| const string& tablet_id = tablets[0]; |
| |
| // Delete it and wait for the replicas to get deleted. |
| // We should have no tablets at the filesystem layer after deleting the table. |
| NO_FATALS(DeleteTable(TestWorkload::kDefaultTableName)); |
| ASSERT_OK(inspect_->WaitForNoData()); |
| |
| // Check that the master no longer exposes the table in any way: |
| |
| // 1) Should not list it in ListTables. |
| vector<string> table_names; |
| ASSERT_OK(client_->ListTables(&table_names)); |
| ASSERT_TRUE(table_names.empty()) << "table still exposed in ListTables"; |
| |
| // 2) Should respond to GetTableSchema with a NotFound error. |
| KuduSchema schema; |
| Status s = client_->GetTableSchema(TestWorkload::kDefaultTableName, &schema); |
| ASSERT_TRUE(s.IsNotFound()) << s.ToString(); |
| |
| // 3) Should return an error for GetTabletLocations RPCs. |
| { |
| rpc::RpcController rpc; |
| master::GetTabletLocationsRequestPB req; |
| master::GetTabletLocationsResponsePB resp; |
| rpc.set_timeout(MonoDelta::FromSeconds(10)); |
| req.add_tablet_ids()->assign(tablet_id); |
| req.set_intern_ts_infos_in_response(true); |
| ASSERT_OK(cluster_->master_proxy()->GetTabletLocations(req, &resp, &rpc)); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| ASSERT_EQ(1, resp.errors_size()); |
| ASSERT_STR_CONTAINS(SecureShortDebugString(resp.errors(0)), |
| "code: NOT_FOUND message: \"Tablet deleted: Table deleted"); |
| } |
| |
| // 4) The master 'dump-entities' page should not list the deleted table or tablets. |
| EasyCurl c; |
| faststring entities_buf; |
| ASSERT_OK(c.FetchURL(Substitute("http://$0/dump-entities", |
| cluster_->master()->bound_http_hostport().ToString()), |
| &entities_buf)); |
| rapidjson::Document doc; |
| doc.Parse<0>(entities_buf.ToString().c_str()); |
| ASSERT_EQ(0, doc["tables"].Size()); |
| ASSERT_EQ(0, doc["tablets"].Size()); |
| } |
| |
| // Test that a DeleteTablet RPC is rejected without a matching destination UUID. |
| TEST_F(DeleteTableITest, TestDeleteTabletDestUuidValidation) { |
| NO_FATALS(StartCluster()); |
| // Create a table on the cluster. We're just using TestWorkload |
| // as a convenient way to create it. |
| TestWorkload(cluster_.get()).Setup(); |
| ASSERT_OK(inspect_->WaitForReplicaCount(3)); |
| |
| vector<string> tablets = inspect_->ListTabletsOnTS(1); |
| ASSERT_EQ(1, tablets.size()); |
| const string& tablet_id = tablets[0]; |
| |
| TServerDetails* ts = ts_map_[cluster_->tablet_server(0)->uuid()]; |
| |
| tserver::DeleteTabletRequestPB req; |
| tserver::DeleteTabletResponsePB resp; |
| rpc::RpcController rpc; |
| rpc.set_timeout(MonoDelta::FromSeconds(20)); |
| |
| req.set_dest_uuid("fake-uuid"); |
| req.set_tablet_id(tablet_id); |
| req.set_delete_type(TABLET_DATA_TOMBSTONED); |
| ASSERT_OK(ts->tserver_admin_proxy->DeleteTablet(req, &resp, &rpc)); |
| ASSERT_TRUE(resp.has_error()); |
| ASSERT_EQ(tserver::TabletServerErrorPB::WRONG_SERVER_UUID, resp.error().code()) |
| << SecureShortDebugString(resp); |
| ASSERT_STR_CONTAINS(StatusFromPB(resp.error().status()).ToString(), |
| "Wrong destination UUID"); |
| } |
| |
| // Test the atomic CAS argument to DeleteTablet(). |
| TEST_F(DeleteTableITest, TestAtomicDeleteTablet) { |
| MonoDelta timeout = MonoDelta::FromSeconds(30); |
| NO_FATALS(StartCluster()); |
| // Create a table on the cluster. We're just using TestWorkload |
| // as a convenient way to create it. |
| TestWorkload(cluster_.get()).Setup(); |
| |
| // The table should have replicas on all three tservers. |
| ASSERT_OK(inspect_->WaitForReplicaCount(3)); |
| |
| // Grab the tablet ID (used later). |
| vector<string> tablets = inspect_->ListTabletsOnTS(1); |
| ASSERT_EQ(1, tablets.size()); |
| const string& tablet_id = tablets[0]; |
| |
| const int kTsIndex = 0; |
| TServerDetails* ts = ts_map_[cluster_->tablet_server(kTsIndex)->uuid()]; |
| |
| // The committed config starts off with an opid_index of -1, so choose something lower. |
| boost::optional<int64_t> opid_index(-2); |
| tserver::TabletServerErrorPB::Code error_code; |
| ASSERT_OK(itest::WaitUntilTabletRunning(ts, tablet_id, timeout)); |
| |
| Status s; |
| for (int i = 0; i < 100; i++) { |
| s = DeleteTablet(ts, tablet_id, TABLET_DATA_TOMBSTONED, timeout, opid_index, |
| &error_code); |
| if (error_code == TabletServerErrorPB::CAS_FAILED) break; |
| // If we didn't get the expected CAS_FAILED error, it's OK to get 'TABLET_NOT_RUNNING' |
| // because the "creating" maintenance state persists just slightly after it starts to |
| // expose 'RUNNING' state in ListTablets() |
| ASSERT_EQ(TabletServerErrorPB::TABLET_NOT_RUNNING, error_code) |
| << "unexpected error: " << s.ToString(); |
| SleepFor(MonoDelta::FromMilliseconds(100)); |
| } |
| |
| ASSERT_EQ(TabletServerErrorPB::CAS_FAILED, error_code) << "unexpected error: " << s.ToString(); |
| ASSERT_STR_CONTAINS(s.ToString(), "of -2 but the committed config has opid_index of -1"); |
| |
| // Now use the "latest", which is -1. |
| ASSERT_OK(DeleteTablet(ts, tablet_id, TABLET_DATA_TOMBSTONED, timeout, -1)); |
| inspect_->CheckTabletDataStateOnTS(kTsIndex, tablet_id, { TABLET_DATA_TOMBSTONED }); |
| |
| // Now that the tablet is already tombstoned, our opid_index should be |
| // ignored (because it's impossible to check it). |
| ASSERT_OK(DeleteTablet(ts, tablet_id, TABLET_DATA_TOMBSTONED, timeout, -9999)); |
| inspect_->CheckTabletDataStateOnTS(kTsIndex, tablet_id, { TABLET_DATA_TOMBSTONED }); |
| |
| // Same with TOMBSTONED -> DELETED. |
| ASSERT_OK(itest::DeleteTablet(ts, tablet_id, TABLET_DATA_DELETED, timeout, -9999)); |
| inspect_->CheckTabletDataStateOnTS(kTsIndex, tablet_id, { TABLET_DATA_DELETED }); |
| } |
| |
| TEST_F(DeleteTableITest, TestDeleteTableWithConcurrentWrites) { |
| NO_FATALS(StartCluster()); |
| int n_iters = AllowSlowTests() ? 20 : 1; |
| for (int i = 0; i < n_iters; i++) { |
| LOG(INFO) << "Running iteration " << i; |
| TestWorkload workload(cluster_.get()); |
| workload.set_table_name(Substitute("table-$0", i)); |
| |
| // We'll delete the table underneath the writers, so we expcted |
| // a NotFound error during the writes. |
| workload.set_not_found_allowed(true); |
| workload.Setup(); |
| |
| // Start the workload, and wait to see some rows actually inserted |
| workload.Start(); |
| while (workload.rows_inserted() < 100) { |
| SleepFor(MonoDelta::FromMilliseconds(10)); |
| } |
| |
| // Delete it and wait for the replicas to get deleted. |
| NO_FATALS(DeleteTable(workload.table_name())); |
| ASSERT_OK(inspect_->WaitForNoData()); |
| |
| // Sleep just a little longer to make sure client threads send |
| // requests to the missing tablets. |
| SleepFor(MonoDelta::FromMilliseconds(50)); |
| |
| workload.StopAndJoin(); |
| NO_FATALS(cluster_->AssertNoCrashes()); |
| } |
| } |
| |
| // Test that a tablet replica is automatically tombstoned on startup if a local |
| // crash occurs in the middle of Tablet Copy. Additionally acts as a regression |
| // test for KUDU-1605. |
| TEST_F(DeleteTableITest, TestAutoTombstoneAfterCrashDuringTabletCopy) { |
| // Set up flags to flush frequently, so that we get data on disk. |
| const vector<string> ts_flags = { |
| "--flush_threshold_mb=0", |
| "--maintenance_manager_polling_interval_ms=100", |
| }; |
| const vector<string> master_flags = { |
| "--allow_unsafe_replication_factor=true", |
| // If running with the 3-4-3 replication scheme, the catalog manager |
| // controls replacement of replicas: it's necessary to disable that default |
| // behavior since this test manages replicas on its own. |
| "--catalog_manager_evict_excess_replicas=false", |
| "--master_add_server_when_underreplicated=false", |
| }; |
| NO_FATALS(StartCluster(ts_flags, master_flags)); |
| const MonoDelta timeout = MonoDelta::FromSeconds(10); |
| const int kTsIndex = 0; // We'll test with the first TS. |
| |
| // Shut down TS 1, 2, write some data to TS 0 alone. |
| cluster_->master()->Shutdown(); |
| cluster_->tablet_server(1)->Shutdown(); |
| cluster_->tablet_server(2)->Shutdown(); |
| ASSERT_OK(cluster_->master()->Restart()); |
| ASSERT_OK(cluster_->WaitForTabletServerCount(1, MonoDelta::FromSeconds(30))); |
| |
| // Set up a table which has a tablet only on TS 0. This will be used to test for |
| // "collateral damage" bugs where incorrect handling of the main test tablet |
| // accidentally removes blocks from another tablet. |
| // We use a sequential workload so that we just flush and don't compact. |
| // Thus we use a contiguous set of block IDs starting with 1, with no "holes" |
| // from deleted blocks. |
| string unaffected_tablet_id; |
| TestWorkload unrepl_workload(cluster_.get()); |
| { |
| unrepl_workload.set_num_replicas(1); |
| unrepl_workload.set_table_name("other-table"); |
| unrepl_workload.set_num_write_threads(1); |
| unrepl_workload.set_write_pattern(TestWorkload::INSERT_SEQUENTIAL_ROWS); |
| unrepl_workload.Setup(); |
| |
| // Figure out the tablet ID. |
| vector<string> tablets = inspect_->ListTabletsOnTS(kTsIndex); |
| ASSERT_EQ(1, tablets.size()); |
| unaffected_tablet_id = tablets[0]; |
| |
| // Write rows until it has flushed a few times. |
| unrepl_workload.Start(); |
| TabletSuperBlockPB sb; |
| while (sb.rowsets().size() < 3) { |
| SleepFor(MonoDelta::FromMilliseconds(10)); |
| ASSERT_OK(inspect_->ReadTabletSuperBlockOnTS(kTsIndex, unaffected_tablet_id, &sb)); |
| } |
| unrepl_workload.StopAndJoin(); |
| } |
| |
| // Restart the other two servers. |
| ASSERT_OK(cluster_->tablet_server(1)->Restart()); |
| ASSERT_OK(cluster_->tablet_server(2)->Restart()); |
| cluster_->tablet_server(kTsIndex)->Shutdown(); |
| |
| // Restart the master to be sure that it only sees the live servers. |
| // Otherwise it may try to create a tablet with a replica on the down server. |
| // The table creation would eventually succeed after picking a different set of |
| // replicas, but not before causing a timeout. |
| cluster_->master()->Shutdown(); |
| ASSERT_OK(cluster_->master()->Restart()); |
| ASSERT_OK(cluster_->WaitForTabletServerCount(2, MonoDelta::FromSeconds(30))); |
| |
| // Create a new table with a single tablet replicated on the other two servers. |
| // We use the same sequential workload. This produces block ID sequences |
| // that look like: |
| // TS 0: |---- blocks from 'other-table' ---] |
| // TS 1: |---- blocks from default workload table ---] |
| // TS 2: |---- blocks from default workload table ---] |
| // So, if we were to accidentally reuse or delete block IDs from TS 1 or TS 2 when |
| // copying a tablet to TS 0, we'd surely notice it! |
| string replicated_tablet_id; |
| |
| TestWorkload repl_workload(cluster_.get()); |
| { |
| repl_workload.set_num_replicas(2); |
| repl_workload.set_num_write_threads(1); |
| repl_workload.set_write_pattern(TestWorkload::INSERT_SEQUENTIAL_ROWS); |
| repl_workload.Setup(); |
| ASSERT_OK(inspect_->WaitForReplicaCount(3)); // 1 original plus 2 new replicas. |
| |
| // Figure out the tablet id of the new table. |
| vector<string> tablets = inspect_->ListTabletsOnTS(1); |
| ASSERT_EQ(1, tablets.size()); |
| replicated_tablet_id = tablets[0]; |
| |
| repl_workload.Start(); |
| TabletSuperBlockPB sb; |
| while (sb.rowsets().size() < 3) { |
| SleepFor(MonoDelta::FromMilliseconds(10)); |
| ASSERT_OK(inspect_->ReadTabletSuperBlockOnTS(1, replicated_tablet_id, &sb)); |
| } |
| repl_workload.StopAndJoin(); |
| } |
| |
| // Enable a fault crash when Tablet Copy occurs on TS 0. |
| cluster_->tablet_server(kTsIndex)->mutable_flags()->push_back( |
| "--fault_crash_after_tc_files_fetched=1.0"); |
| |
| // Restart TS 0 and add it to the config. It will crash when tablet copy starts. |
| ASSERT_OK(cluster_->tablet_server(kTsIndex)->Restart()); |
| string leader_uuid = GetLeaderUUID(cluster_->tablet_server(1)->uuid(), replicated_tablet_id); |
| TServerDetails* leader = DCHECK_NOTNULL(ts_map_[leader_uuid]); |
| TServerDetails* ts = ts_map_[cluster_->tablet_server(kTsIndex)->uuid()]; |
| ASSERT_OK(AddServer(leader, replicated_tablet_id, ts, RaftPeerPB::VOTER, timeout)); |
| NO_FATALS(WaitForTSToCrash(kTsIndex)); |
| cluster_->tablet_server(kTsIndex)->Shutdown(); |
| |
| LOG(INFO) << "Test progress: crashed on first attempt to copy"; |
| |
| // The superblock should be in TABLET_DATA_COPYING state on disk. |
| NO_FATALS(inspect_->CheckTabletDataStateOnTS(kTsIndex, replicated_tablet_id, |
| { TABLET_DATA_COPYING })); |
| |
| // Restart and let it crash one more time. |
| cluster_->tablet_server(kTsIndex)->Shutdown(); |
| ASSERT_OK(cluster_->tablet_server(kTsIndex)->Restart()); |
| NO_FATALS(WaitForTSToCrash(kTsIndex)); |
| LOG(INFO) << "Test progress: crashed on second attempt to copy"; |
| |
| // Remove the fault flag and let it successfully copy the tablet. |
| cluster_->tablet_server(kTsIndex)->mutable_flags()->pop_back(); |
| cluster_->tablet_server(kTsIndex)->Shutdown(); |
| ASSERT_OK(cluster_->tablet_server(kTsIndex)->Restart()); |
| |
| // Everything should be consistent now. |
| ClusterVerifier v(cluster_.get()); |
| NO_FATALS(v.CheckCluster()); |
| NO_FATALS(v.CheckRowCount(repl_workload.table_name(), ClusterVerifier::AT_LEAST, |
| repl_workload.rows_inserted())); |
| NO_FATALS(v.CheckRowCount(unrepl_workload.table_name(), ClusterVerifier::AT_LEAST, |
| unrepl_workload.rows_inserted())); |
| |
| // Now we want to test the case where crashed while copying over a previously-tombstoned tablet. |
| // So, we first remove the server, causing it to get tombstoned. |
| ASSERT_OK(RemoveServer(leader, replicated_tablet_id, ts, timeout)); |
| NO_FATALS(WaitForTabletTombstonedOnTS(kTsIndex, replicated_tablet_id, CMETA_EXPECTED)); |
| |
| // ... and then add it back, with the fault runtime-enabled. |
| ASSERT_OK(cluster_->SetFlag(cluster_->tablet_server(kTsIndex), |
| "fault_crash_after_tc_files_fetched", "1.0")); |
| ASSERT_OK(AddServer(leader, replicated_tablet_id, ts, RaftPeerPB::VOTER, timeout)); |
| NO_FATALS(WaitForTSToCrash(kTsIndex)); |
| LOG(INFO) << "Test progress: crashed on attempt to copy over tombstoned"; |
| |
| // Finally, crashing with no fault flags should fully recover again. |
| cluster_->tablet_server(kTsIndex)->Shutdown(); |
| ASSERT_OK(cluster_->tablet_server(kTsIndex)->Restart()); |
| NO_FATALS(v.CheckCluster()); |
| NO_FATALS(v.CheckRowCount(repl_workload.table_name(), ClusterVerifier::AT_LEAST, |
| repl_workload.rows_inserted())); |
| NO_FATALS(v.CheckRowCount(unrepl_workload.table_name(), ClusterVerifier::AT_LEAST, |
| unrepl_workload.rows_inserted())); |
| } |
| |
| // Test that a tablet replica automatically tombstones itself if the remote |
| // server fails in the middle of the Tablet Copy process. |
| // Also test that we can Copy Tablet over a tombstoned tablet. |
| TEST_F(DeleteTableITest, TestAutoTombstoneAfterTabletCopyRemoteFails) { |
| const vector<string> ts_flags = { |
| "--enable_leader_failure_detection=false", // Make test deterministic. |
| "--log_segment_size_mb=1", // Faster log rolls. |
| "--log_compression_codec=NO_COMPRESSION", // Faster log rolls. |
| }; |
| const vector<string> master_flags = { |
| "--catalog_manager_wait_for_new_tablets_to_elect_leader=false", |
| "--allow_unsafe_replication_factor=true", |
| // If running with the 3-4-3 replication scheme, the catalog manager |
| // controls replacement of replicas: it's necessary to disable that default |
| // behavior since this test manages replicas on its own. |
| "--catalog_manager_evict_excess_replicas=false", |
| "--master_add_server_when_underreplicated=false", |
| }; |
| NO_FATALS(StartCluster(ts_flags, master_flags)); |
| const MonoDelta kTimeout = MonoDelta::FromSeconds(20); |
| const int kTsIndex = 0; // We'll test with the first TS. |
| |
| // We'll do a config change to Tablet Copy a replica here later. For |
| // now, shut down TS-0. |
| LOG(INFO) << "Shutting down TS " << cluster_->tablet_server(kTsIndex)->uuid(); |
| cluster_->tablet_server(kTsIndex)->Shutdown(); |
| |
| // Bounce the Master so it gets new tablet reports and doesn't try to assign |
| // a replica to the dead TS. |
| cluster_->master()->Shutdown(); |
| ASSERT_OK(cluster_->master()->Restart()); |
| cluster_->WaitForTabletServerCount(2, kTimeout); |
| |
| // Start a workload on the cluster, and run it for a little while. |
| TestWorkload workload(cluster_.get()); |
| workload.set_num_replicas(2); |
| workload.Setup(); |
| ASSERT_OK(inspect_->WaitForReplicaCount(2)); |
| |
| // Figure out the tablet id. |
| vector<string> tablets = inspect_->ListTabletsOnTS(1); |
| ASSERT_EQ(1, tablets.size()); |
| const string& tablet_id = tablets[0]; |
| |
| for (int i = 1; i <= 2; i++) { |
| NO_FATALS(WaitUntilTabletRunning(i, tablet_id)); |
| } |
| |
| // Elect a leader and run some data through the cluster. |
| const int kLeaderIndex = 1; |
| string kLeaderUuid = cluster_->tablet_server(kLeaderIndex)->uuid(); |
| ASSERT_OK(itest::StartElection(ts_map_[kLeaderUuid], tablet_id, kTimeout)); |
| workload.Start(); |
| while (workload.rows_inserted() < 100) { |
| SleepFor(MonoDelta::FromMilliseconds(10)); |
| } |
| |
| // Tablet Copy doesn't see the active WAL segment, and we need to |
| // download a file to trigger the fault in this test. Due to the log index |
| // chunks, that means 3 files minimum: One in-flight WAL segment, one index |
| // chunk file (these files grow much more slowly than the WAL segments), and |
| // one completed WAL segment. |
| ASSERT_OK(inspect_->WaitForMinFilesInTabletWalDirOnTS(kLeaderIndex, tablet_id, 3)); |
| workload.StopAndJoin(); |
| |
| // Cause the leader to crash when a follower tries to initiate Tablet Copy from it. |
| const string& fault_flag = "fault_crash_on_handle_tc_fetch_data"; |
| ASSERT_OK(cluster_->SetFlag(cluster_->tablet_server(kLeaderIndex), fault_flag, "1.0")); |
| |
| // Add TS-0 as a new member to the config and wait for the leader to crash. |
| ASSERT_OK(cluster_->tablet_server(kTsIndex)->Restart()); |
| TServerDetails* leader = ts_map_[kLeaderUuid]; |
| TServerDetails* ts = ts_map_[cluster_->tablet_server(0)->uuid()]; |
| // The server may crash before responding to our RPC. |
| Status s = AddServer(leader, tablet_id, ts, RaftPeerPB::VOTER, kTimeout); |
| ASSERT_TRUE(s.ok() || s.IsNetworkError()) << s.ToString(); |
| NO_FATALS(WaitForTSToCrash(kLeaderIndex)); |
| |
| // The tablet server will detect that the leader failed, and automatically |
| // tombstone its replica. Shut down the other non-leader replica to avoid |
| // interference while we wait for this to happen. |
| cluster_->tablet_server(1)->Shutdown(); |
| cluster_->tablet_server(2)->Shutdown(); |
| NO_FATALS(WaitForTabletTombstonedOnTS(kTsIndex, tablet_id, CMETA_NOT_EXPECTED)); |
| |
| // Check the textual status message for the failed copy. |
| { |
| vector<ListTabletsResponsePB::StatusAndSchemaPB> status_pbs; |
| ASSERT_OK(WaitForNumTabletsOnTS(ts, 1, kTimeout, &status_pbs)); |
| ASSERT_STR_MATCHES(status_pbs[0].tablet_status().last_status(), |
| "Tablet Copy: Tombstoned tablet .*: Tablet copy aborted"); |
| } |
| |
| // Now bring the other replicas back, re-elect the previous leader (TS-1), |
| // and wait for the leader to Tablet Copy the tombstoned replica. This |
| // will have replaced a tablet with no consensus metadata. |
| ASSERT_OK(cluster_->tablet_server(1)->Restart()); |
| ASSERT_OK(cluster_->tablet_server(2)->Restart()); |
| for (int i = 1; i <= 2; i++) { |
| NO_FATALS(WaitUntilTabletRunning(i, tablet_id)); |
| } |
| ASSERT_OK(itest::StartElection(ts_map_[kLeaderUuid], tablet_id, kTimeout)); |
| ASSERT_OK(inspect_->WaitForTabletDataStateOnTS(kTsIndex, tablet_id, { TABLET_DATA_READY })); |
| |
| ClusterVerifier v(cluster_.get()); |
| NO_FATALS(v.CheckCluster()); |
| NO_FATALS(v.CheckRowCount(workload.table_name(), ClusterVerifier::AT_LEAST, |
| workload.rows_inserted())); |
| |
| // Now pause the other replicas and tombstone our replica again. |
| ASSERT_OK(cluster_->tablet_server(1)->Pause()); |
| ASSERT_OK(cluster_->tablet_server(2)->Pause()); |
| ASSERT_OK(DeleteTablet(ts, tablet_id, TABLET_DATA_TOMBSTONED, kTimeout)); |
| NO_FATALS(WaitForTabletTombstonedOnTS(kTsIndex, tablet_id, CMETA_NOT_EXPECTED)); |
| |
| // Bring them back again, let them yet again Copy a new replica on top of our tombstoned replica. |
| // This time, the leader will have replaced a tablet with consensus metadata. |
| ASSERT_OK(cluster_->tablet_server(1)->Resume()); |
| ASSERT_OK(cluster_->tablet_server(2)->Resume()); |
| ASSERT_OK(inspect_->WaitForTabletDataStateOnTS(kTsIndex, tablet_id, { TABLET_DATA_READY })); |
| |
| NO_FATALS(v.CheckCluster()); |
| NO_FATALS(v.CheckRowCount(workload.table_name(), ClusterVerifier::AT_LEAST, |
| workload.rows_inserted())); |
| } |
| |
| // Test for correct Tablet Copy merge of consensus metadata. |
| TEST_F(DeleteTableITest, TestMergeConsensusMetadata) { |
| // Enable manual leader selection. |
| vector<string> ts_flags, master_flags; |
| ts_flags.emplace_back("--enable_leader_failure_detection=false"); |
| master_flags.emplace_back("--catalog_manager_wait_for_new_tablets_to_elect_leader=false"); |
| NO_FATALS(StartCluster(ts_flags, master_flags)); |
| const MonoDelta timeout = MonoDelta::FromSeconds(10); |
| const int kTsIndex = 0; |
| |
| TestWorkload workload(cluster_.get()); |
| workload.Setup(); |
| ASSERT_OK(inspect_->WaitForReplicaCount(3)); |
| |
| // Figure out the tablet id to Tablet Copy. |
| vector<string> tablets = inspect_->ListTabletsOnTS(1); |
| ASSERT_EQ(1, tablets.size()); |
| const string& tablet_id = tablets[0]; |
| |
| for (int i = 0; i < cluster_->num_tablet_servers(); i++) { |
| NO_FATALS(WaitUntilTabletRunning(i, tablet_id)); |
| } |
| |
| // Elect a leader and run some data through the cluster. |
| int leader_index = 1; |
| string leader_uuid = cluster_->tablet_server(leader_index)->uuid(); |
| ASSERT_OK(itest::StartElection(ts_map_[leader_uuid], tablet_id, timeout)); |
| workload.Start(); |
| while (workload.rows_inserted() < 100) { |
| SleepFor(MonoDelta::FromMilliseconds(10)); |
| } |
| workload.StopAndJoin(); |
| ASSERT_OK(WaitForServersToAgree(timeout, ts_map_, tablet_id, workload.batches_completed())); |
| |
| // Verify that TS 0 voted for the chosen leader. |
| ConsensusMetadataPB cmeta_pb; |
| ASSERT_OK(inspect_->ReadConsensusMetadataOnTS(kTsIndex, tablet_id, &cmeta_pb)); |
| ASSERT_EQ(1, cmeta_pb.current_term()); |
| ASSERT_EQ(leader_uuid, cmeta_pb.voted_for()); |
| |
| // Shut down all but TS 0 and try to elect TS 0. The election will fail but |
| // the TS will record a vote for itself as well as a new term (term 2). |
| cluster_->tablet_server(1)->Shutdown(); |
| cluster_->tablet_server(2)->Shutdown(); |
| NO_FATALS(WaitUntilTabletRunning(kTsIndex, tablet_id)); |
| TServerDetails* ts = ts_map_[cluster_->tablet_server(kTsIndex)->uuid()]; |
| ASSERT_OK(itest::StartElection(ts, tablet_id, timeout)); |
| for (int i = 0; i < 6000; i++) { |
| Status s = inspect_->ReadConsensusMetadataOnTS(kTsIndex, tablet_id, &cmeta_pb); |
| if (s.ok() && |
| cmeta_pb.current_term() == 2 && |
| cmeta_pb.voted_for() == ts->uuid()) { |
| break; |
| } |
| SleepFor(MonoDelta::FromMilliseconds(10)); |
| } |
| ASSERT_EQ(2, cmeta_pb.current_term()); |
| ASSERT_EQ(ts->uuid(), cmeta_pb.voted_for()); |
| |
| // Tombstone our special little guy, then shut him down. |
| ASSERT_OK(DeleteTablet(ts, tablet_id, TABLET_DATA_TOMBSTONED, timeout)); |
| NO_FATALS(WaitForTabletTombstonedOnTS(kTsIndex, tablet_id, CMETA_EXPECTED)); |
| cluster_->tablet_server(kTsIndex)->Shutdown(); |
| |
| // Restart the other dudes and re-elect the same leader. |
| ASSERT_OK(cluster_->tablet_server(1)->Restart()); |
| ASSERT_OK(cluster_->tablet_server(2)->Restart()); |
| TServerDetails* leader = ts_map_[leader_uuid]; |
| NO_FATALS(WaitUntilTabletRunning(1, tablet_id)); |
| NO_FATALS(WaitUntilTabletRunning(2, tablet_id)); |
| ASSERT_OK(itest::StartElection(leader, tablet_id, timeout)); |
| ASSERT_OK(itest::WaitUntilLeader(leader, tablet_id, timeout)); |
| |
| // Bring our special little guy back up. |
| // Wait until he gets Tablet Copied. |
| LOG(INFO) << "Bringing TS " << cluster_->tablet_server(kTsIndex)->uuid() |
| << " back up..."; |
| ASSERT_OK(cluster_->tablet_server(kTsIndex)->Restart()); |
| ASSERT_OK(inspect_->WaitForTabletDataStateOnTS(kTsIndex, tablet_id, { TABLET_DATA_READY })); |
| |
| // Assert that the election history is retained (voted for self). |
| ASSERT_OK(inspect_->ReadConsensusMetadataOnTS(kTsIndex, tablet_id, &cmeta_pb)); |
| ASSERT_EQ(2, cmeta_pb.current_term()); |
| ASSERT_EQ(ts->uuid(), cmeta_pb.voted_for()); |
| |
| // Now do the same thing as above, where we tombstone TS 0 then trigger a new |
| // term (term 3) on the other machines. TS 0 will get copied |
| // again, but this time the vote record on TS 0 for term 2 should not be |
| // retained after Tablet Copy occurs. |
| cluster_->tablet_server(1)->Shutdown(); |
| cluster_->tablet_server(2)->Shutdown(); |
| |
| // Delete with retries because the tablet might still be bootstrapping. |
| ASSERT_OK(DeleteTabletWithRetries(ts, tablet_id, TABLET_DATA_TOMBSTONED, timeout)); |
| NO_FATALS(WaitForTabletTombstonedOnTS(kTsIndex, tablet_id, CMETA_EXPECTED)); |
| |
| // Shut down the tablet server so it won't vote while tombstoned. |
| cluster_->tablet_server(kTsIndex)->Shutdown(); |
| |
| ASSERT_OK(cluster_->tablet_server(1)->Restart()); |
| ASSERT_OK(cluster_->tablet_server(2)->Restart()); |
| NO_FATALS(WaitUntilTabletRunning(1, tablet_id)); |
| NO_FATALS(WaitUntilTabletRunning(2, tablet_id)); |
| ASSERT_OK(itest::StartElection(leader, tablet_id, timeout)); |
| ASSERT_OK(itest::WaitUntilLeader(leader, tablet_id, timeout)); |
| |
| // Now restart the replica. It will get tablet copied by the leader. |
| ASSERT_OK(cluster_->tablet_server(kTsIndex)->Restart()); |
| ASSERT_OK(inspect_->WaitForTabletDataStateOnTS(kTsIndex, tablet_id, { TABLET_DATA_READY })); |
| |
| // The election history should have been wiped out for the new term, since |
| // this node did not participate. |
| ASSERT_OK(inspect_->ReadConsensusMetadataOnTS(kTsIndex, tablet_id, &cmeta_pb)); |
| ASSERT_EQ(3, cmeta_pb.current_term()); |
| ASSERT_TRUE(!cmeta_pb.has_voted_for()) << SecureShortDebugString(cmeta_pb); |
| } |
| |
| // Regression test for KUDU-987, a bug where followers with ops in REPLICATING |
| // state, which means they have not yet been committed to a majority, cannot |
| // shut down during a DeleteTablet() call. |
| TEST_F(DeleteTableITest, TestDeleteFollowerWithReplicatingOps) { |
| if (!AllowSlowTests()) { |
| // We will typically wait at least 5 seconds for timeouts to occur. |
| LOG(INFO) << "Skipping test in fast-test mode."; |
| return; |
| } |
| |
| const MonoDelta timeout = MonoDelta::FromSeconds(10); |
| |
| const int kNumTabletServers = 5; |
| vector<string> ts_flags, master_flags; |
| ts_flags.emplace_back("--enable_leader_failure_detection=false"); |
| ts_flags.emplace_back("--flush_threshold_mb=0"); // Always be flushing. |
| ts_flags.emplace_back("--maintenance_manager_polling_interval_ms=100"); |
| master_flags.emplace_back("--catalog_manager_wait_for_new_tablets_to_elect_leader=false"); |
| NO_FATALS(StartCluster(ts_flags, master_flags, kNumTabletServers)); |
| |
| const int kTsIndex = 0; // We'll test with the first TS. |
| TServerDetails* ts = ts_map_[cluster_->tablet_server(kTsIndex)->uuid()]; |
| |
| // Create the table. |
| TestWorkload workload(cluster_.get()); |
| workload.set_num_replicas(kNumTabletServers); |
| workload.Setup(); |
| |
| // Figure out the tablet ids of the created tablets. |
| vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets; |
| ASSERT_OK(WaitForNumTabletsOnTS(ts, 1, timeout, &tablets)); |
| const string& tablet_id = tablets[0].tablet_status().tablet_id(); |
| |
| // Wait until all replicas are up and running. |
| for (int i = 0; i < cluster_->num_tablet_servers(); i++) { |
| ASSERT_OK(itest::WaitUntilTabletRunning(ts_map_[cluster_->tablet_server(i)->uuid()], |
| tablet_id, timeout)); |
| } |
| |
| // Elect TS 1 as leader. |
| const int kLeaderIndex = 1; |
| const string kLeaderUuid = cluster_->tablet_server(kLeaderIndex)->uuid(); |
| TServerDetails* leader = ts_map_[kLeaderUuid]; |
| ASSERT_OK(itest::StartElection(leader, tablet_id, timeout)); |
| ASSERT_OK(WaitForServersToAgree(timeout, ts_map_, tablet_id, 1)); |
| |
| // Kill a majority, but leave the leader and a single follower. |
| LOG(INFO) << "Killing majority"; |
| for (int i = 2; i < kNumTabletServers; i++) { |
| cluster_->tablet_server(i)->Shutdown(); |
| } |
| |
| // Now write a single row to the leader. |
| // We give 5 seconds for the timeout to pretty much guarantee that a flush |
| // will occur due to the low flush threshold we set. |
| LOG(INFO) << "Writing a row"; |
| Status s = WriteSimpleTestRow(leader, tablet_id, RowOperationsPB::INSERT, |
| 1, 1, "hola, world", MonoDelta::FromSeconds(5)); |
| ASSERT_TRUE(s.IsTimedOut()); |
| ASSERT_STR_CONTAINS(s.ToString(), "timed out"); |
| |
| LOG(INFO) << "Killing the leader..."; |
| cluster_->tablet_server(kLeaderIndex)->Shutdown(); |
| |
| // Now tombstone the follower tablet. This should succeed even though there |
| // are uncommitted operations on the replica. |
| LOG(INFO) << "Tombstoning tablet " << tablet_id << " on TS " << ts->uuid(); |
| ASSERT_OK(DeleteTablet(ts, tablet_id, TABLET_DATA_TOMBSTONED, timeout)); |
| } |
| |
| // Test that orphaned blocks are cleared from the superblock when a tablet is |
| // tombstoned. |
| TEST_F(DeleteTableITest, TestOrphanedBlocksClearedOnDelete) { |
| const MonoDelta timeout = MonoDelta::FromSeconds(30); |
| vector<string> ts_flags = { |
| "--enable_leader_failure_detection=false", |
| "--flush_threshold_mb=0", // Flush quickly since we wait for a flush to occur. |
| "--maintenance_manager_polling_interval_ms=100" |
| }; |
| vector<string> master_flags = { |
| "--catalog_manager_wait_for_new_tablets_to_elect_leader=false" |
| }; |
| NO_FATALS(StartCluster(ts_flags, master_flags)); |
| |
| const int kFollowerIndex = 0; |
| TServerDetails* follower_ts = ts_map_[cluster_->tablet_server(kFollowerIndex)->uuid()]; |
| |
| // Create the table. |
| TestWorkload workload(cluster_.get()); |
| workload.Setup(); |
| |
| // Figure out the tablet id of the created tablet. |
| vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets; |
| ASSERT_OK(WaitForNumTabletsOnTS(follower_ts, 1, timeout, &tablets)); |
| const string& tablet_id = tablets[0].tablet_status().tablet_id(); |
| |
| // Wait until all replicas are up and running. |
| for (int i = 0; i < cluster_->num_tablet_servers(); i++) { |
| ASSERT_OK(itest::WaitUntilTabletRunning(ts_map_[cluster_->tablet_server(i)->uuid()], |
| tablet_id, timeout)); |
| } |
| |
| // Elect TS 1 as leader. |
| const int kLeaderIndex = 1; |
| const string kLeaderUuid = cluster_->tablet_server(kLeaderIndex)->uuid(); |
| TServerDetails* leader_ts = ts_map_[kLeaderUuid]; |
| ASSERT_OK(itest::StartElection(leader_ts, tablet_id, timeout)); |
| ASSERT_OK(WaitForServersToAgree(timeout, ts_map_, tablet_id, 1)); |
| |
| // Run a write workload and wait until we see some rowsets flush on the follower. |
| workload.Start(); |
| TabletSuperBlockPB superblock_pb; |
| for (int i = 0; i < 3000; i++) { |
| ASSERT_OK(inspect_->ReadTabletSuperBlockOnTS(kFollowerIndex, tablet_id, &superblock_pb)); |
| if (!superblock_pb.rowsets().empty()) break; |
| SleepFor(MonoDelta::FromMilliseconds(10)); |
| } |
| ASSERT_GT(superblock_pb.rowsets_size(), 0) |
| << "Timed out waiting for rowset flush on TS " << follower_ts->uuid() << ": " |
| << "Superblock:\n" << SecureDebugString(superblock_pb); |
| |
| // Shut down the leader so it doesn't try to copy a new replica to our follower later. |
| workload.StopAndJoin(); |
| cluster_->tablet_server(kLeaderIndex)->Shutdown(); |
| |
| // Tombstone the follower and check that there are no rowsets or orphaned |
| // blocks retained in the superblock. |
| ASSERT_OK(DeleteTablet(follower_ts, tablet_id, TABLET_DATA_TOMBSTONED, timeout)); |
| NO_FATALS(WaitForTabletTombstonedOnTS(kFollowerIndex, tablet_id, CMETA_EXPECTED)); |
| ASSERT_OK(inspect_->ReadTabletSuperBlockOnTS(kFollowerIndex, tablet_id, &superblock_pb)); |
| ASSERT_EQ(0, superblock_pb.rowsets_size()) << SecureDebugString(superblock_pb); |
| ASSERT_EQ(0, superblock_pb.orphaned_blocks_size()) << SecureDebugString(superblock_pb); |
| } |
| |
| vector<const string*> Grep(const string& needle, const vector<string>& haystack) { |
| vector<const string*> results; |
| for (const string& s : haystack) { |
| if (s.find(needle) != string::npos) { |
| results.push_back(&s); |
| } |
| } |
| return results; |
| } |
| |
| vector<string> ListOpenFiles(pid_t pid) { |
| string cmd = strings::Substitute("export PATH=$$PATH:/usr/bin:/usr/sbin; lsof -n -p $0", pid); |
| vector<string> argv = { "bash", "-c", cmd }; |
| string out; |
| CHECK_OK(Subprocess::Call(argv, "", &out)); |
| vector<string> lines = strings::Split(out, "\n"); |
| return lines; |
| } |
| |
| int PrintOpenTabletFiles(pid_t pid, const string& tablet_id) { |
| vector<string> lines = ListOpenFiles(pid); |
| vector<const string*> wal_lines = Grep(tablet_id, lines); |
| LOG(INFO) << "There are " << wal_lines.size() << " open WAL files for pid " << pid << ":"; |
| for (const string* l : wal_lines) { |
| LOG(INFO) << *l; |
| } |
| return wal_lines.size(); |
| } |
| |
| // Regression test for tablet deletion FD leak. See KUDU-1288. |
| TEST_F(DeleteTableITest, TestFDsNotLeakedOnTabletTombstone) { |
| const MonoDelta timeout = MonoDelta::FromSeconds(30); |
| |
| NO_FATALS(StartCluster({}, {}, 1)); |
| |
| // Create the table. |
| TestWorkload workload(cluster_.get()); |
| workload.set_num_replicas(1); |
| workload.Setup(); |
| workload.Start(); |
| while (workload.rows_inserted() < 1000) { |
| SleepFor(MonoDelta::FromMilliseconds(10)); |
| } |
| workload.StopAndJoin(); |
| |
| // Figure out the tablet id of the created tablet. |
| vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets; |
| ASSERT_OK(WaitForNumTabletsOnTS(ts_map_.begin()->second, 1, timeout, &tablets)); |
| const string& tablet_id = tablets[0].tablet_status().tablet_id(); |
| |
| // Tombstone the tablet and then ensure that lsof does not list any |
| // tablet-related paths. |
| ExternalTabletServer* ets = cluster_->tablet_server(0); |
| ASSERT_OK(DeleteTablet(ts_map_[ets->uuid()], |
| tablet_id, TABLET_DATA_TOMBSTONED, timeout)); |
| ASSERT_EQ(0, PrintOpenTabletFiles(ets->pid(), tablet_id)); |
| |
| // Restart the TS after deletion and then do the same lsof check again. |
| ets->Shutdown(); |
| ASSERT_OK(ets->Restart()); |
| ASSERT_EQ(0, PrintOpenTabletFiles(ets->pid(), tablet_id)); |
| } |
| |
| // Regression test for KUDU-1545: crash when visiting the tablet page for a |
| // tombstoned tablet. |
| TEST_F(DeleteTableITest, TestWebPageForTombstonedTablet) { |
| const MonoDelta timeout = MonoDelta::FromSeconds(30); |
| |
| NO_FATALS(StartCluster({}, {}, 1)); |
| |
| // Create the table. |
| TestWorkload workload(cluster_.get()); |
| workload.set_num_replicas(1); |
| workload.Setup(); |
| |
| // Figure out the tablet id of the created tablet. |
| vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets; |
| ASSERT_OK(WaitForNumTabletsOnTS(ts_map_.begin()->second, 1, timeout, &tablets)); |
| const string& tablet_id = tablets[0].tablet_status().tablet_id(); |
| |
| // Tombstone the tablet. |
| ExternalTabletServer* ets = cluster_->tablet_server(0); |
| ASSERT_EVENTUALLY([&]() { |
| ASSERT_OK(DeleteTablet(ts_map_[ets->uuid()], tablet_id, |
| TABLET_DATA_TOMBSTONED, timeout)); |
| }); |
| |
| // Check the various web pages associated with the tablet, ensuring |
| // they don't crash and at least have the tablet ID within them. |
| EasyCurl c; |
| const auto& pages = { "tablet", |
| "tablet-rowsetlayout-svg", |
| "tablet-consensus-status", |
| "log-anchors" }; |
| for (const auto& page : pages) { |
| faststring buf; |
| ASSERT_OK(c.FetchURL(Substitute( |
| "http://$0/$1?id=$2", |
| cluster_->tablet_server(0)->bound_http_hostport().ToString(), |
| page, |
| tablet_id), &buf)); |
| ASSERT_STR_CONTAINS(buf.ToString(), tablet_id) |
| << "Page: " << page << "; tablet_id: " << tablet_id; |
| } |
| } |
| |
| TEST_F(DeleteTableITest, TestUnknownTabletsAreNotDeleted) { |
| // |
| // NOTE on disabled RPC authentication and encryption: |
| // This test scenario would be flaky if the master/tserver authentication |
| // were done via TLS certificates. That's because the scenario involves |
| // removing master's data directory along with the IPKI information. Once |
| // the master re-generates its IPKI system records and starts using the new |
| // TLS server certificate signed by the newly generated CA private key, |
| // the tserver fails to verify the new master's certificate using the old CA |
| // certificate. |
| // |
| constexpr int kNumTabletServers = 1; |
| const vector<string> extra_ts_flags = { |
| // Speed up heartbeating so that the unknown tablet is detected faster. |
| "--heartbeat_interval_ms=10", |
| |
| // See the note above on disabled RPC authentication and encryption. |
| "--rpc_authentication=disabled", |
| "--rpc_encryption=disabled", |
| }; |
| const vector<string> extra_master_flags = { |
| // See the note above on disabled RPC authentication and encryption. |
| "--rpc_authentication=disabled", |
| "--rpc_encryption=disabled", |
| }; |
| |
| NO_FATALS(StartCluster(extra_ts_flags, extra_master_flags, kNumTabletServers)); |
| |
| Schema schema(GetSimpleTestSchema()); |
| client::KuduSchema client_schema(client::KuduSchema::FromSchema(schema)); |
| unique_ptr<KuduTableCreator> creator(client_->NewTableCreator()); |
| ASSERT_OK(creator->table_name("test") |
| .schema(&client_schema) |
| .set_range_partition_columns({"key"}) |
| .num_replicas(1) |
| .Create()); |
| |
| // Delete the master's metadata and start it back up. The tablet created |
| // above is now unknown, but should not be deleted! |
| cluster_->master()->Shutdown(); |
| ASSERT_OK(cluster_->master()->DeleteFromDisk()); |
| ASSERT_OK(cluster_->master()->Restart()); |
| |
| // Let's wait for tablet server registration with the master: it guarantees |
| // at least one heartbeat is processed by the master. |
| ASSERT_OK(cluster_->WaitForTabletServerCount(kNumTabletServers, |
| MonoDelta::FromSeconds(30))); |
| // The master should not delete the tablet. |
| int64_t num_delete_attempts; |
| ASSERT_OK(GetInt64Metric( |
| cluster_->tablet_server(0)->bound_http_hostport(), |
| &METRIC_ENTITY_server, "kudu.tabletserver", |
| &METRIC_handler_latency_kudu_tserver_TabletServerAdminService_DeleteTablet, |
| "total_count", &num_delete_attempts)); |
| ASSERT_EQ(0, num_delete_attempts); |
| } |
| |
| // Ensure that the master doesn't try to delete tombstoned tablets. |
| // Regression test for KUDU-2114. |
| TEST_F(DeleteTableITest, TestNoDeleteTombstonedTablets) { |
| if (!AllowSlowTests()) { |
| LOG(WARNING) << "This test sleeps for several seconds and only runs in slow-test mode"; |
| return; |
| } |
| |
| const MonoDelta kTimeout = MonoDelta::FromSeconds(30); |
| const vector<string> master_flags = { |
| // If running with the 3-4-3 replication scheme, the catalog manager |
| // controls replacement of replicas: it's necessary to disable that default |
| // behavior since this test manages replicas on its own. |
| "--catalog_manager_evict_excess_replicas=false", |
| "--master_add_server_when_underreplicated=false", |
| }; |
| NO_FATALS(StartCluster({}, master_flags, /*num_tablet_servers=*/ 4)); |
| const int kNumReplicas = 3; |
| |
| // Create a table on the cluster. We're just using TestWorkload |
| // as a convenient way to create it. |
| TestWorkload(cluster_.get()).Setup(); |
| |
| // The table should have replicas on three tservers. |
| ASSERT_OK(inspect_->WaitForReplicaCount(kNumReplicas)); |
| master::GetTableLocationsResponsePB table_locations; |
| ASSERT_OK(itest::GetTableLocations(cluster_->master_proxy(), TestWorkload::kDefaultTableName, |
| kTimeout, master::VOTER_REPLICA, /*table_id=*/none, |
| &table_locations)); |
| ASSERT_EQ(1, table_locations.tablet_locations_size()); // Only 1 tablet. |
| string tablet_id; |
| std::set<string> replicas; |
| for (const auto& t : table_locations.tablet_locations()) { |
| tablet_id = t.tablet_id(); |
| for (const auto& r : t.interned_replicas()) { |
| replicas.insert(table_locations.ts_infos(r.ts_info_idx()).permanent_uuid()); |
| } |
| } |
| |
| LOG(INFO) << "finding leader..."; |
| |
| // Find leader. |
| TServerDetails* leader = nullptr; |
| ASSERT_OK(FindTabletLeader(ts_map_, tablet_id, kTimeout, &leader)); |
| ASSERT_OK(WaitForOpFromCurrentTerm(leader, tablet_id, COMMITTED_OPID, kTimeout)); |
| |
| LOG(INFO) << "replacing replica..."; |
| |
| // Prepare to replace a replica. |
| TServerDetails* to_add = nullptr; |
| TServerDetails* to_remove = nullptr; |
| int to_remove_index = -1; |
| ASSERT_NE(nullptr, leader); |
| for (int i = 0; i < cluster_->num_tablet_servers(); i++) { |
| TServerDetails* ts = ts_map_[cluster_->tablet_server(i)->uuid()]; |
| if (!ContainsKey(replicas, cluster_->tablet_server(i)->uuid())) { |
| to_add = ts; |
| } else { |
| if (ts == leader) continue; |
| to_remove = ts; |
| to_remove_index = i; |
| } |
| } |
| ASSERT_NE(nullptr, to_add); |
| ASSERT_NE(nullptr, to_remove); |
| |
| // Do the config changes and wait for the master to delete the old node. |
| ASSERT_OK(AddServer(leader, tablet_id, to_add, RaftPeerPB::VOTER, kTimeout)); |
| ASSERT_OK(RemoveServer(leader, tablet_id, to_remove, kTimeout)); |
| |
| LOG(INFO) << "waiting for no data on ts..."; |
| |
| // Most of the time, we will only get 1 deletion attempt when a replica is |
| // evicted, but to avoid flakiness in this test we leave some slack for a |
| // couple more due to multiple requests coming in at once, such as the config |
| // change plus a couple of heartbeats from the evicted node. If we set this |
| // to 1, this test ends up being a few percent flaky under stress. |
| const int kMaxDeleteAttemptsPerEviction = 3; |
| |
| // 'to_remove' should eventually have no tablets and the metrics should show |
| // a deleted tablet. |
| ASSERT_EVENTUALLY([&] { |
| inspect_->WaitForNoDataOnTS(to_remove_index, kTimeout); |
| |
| int64_t num_delete_attempts; |
| ASSERT_OK(GetInt64Metric( |
| cluster_->tablet_server(to_remove_index)->bound_http_hostport(), |
| &METRIC_ENTITY_server, "kudu.tabletserver", |
| &METRIC_handler_latency_kudu_tserver_TabletServerAdminService_DeleteTablet, |
| "total_count", &num_delete_attempts)); |
| ASSERT_GE(num_delete_attempts, 1); |
| ASSERT_LE(num_delete_attempts, kMaxDeleteAttemptsPerEviction); |
| }); |
| |
| // Wait for the metrics to change without restarting, and then do the same |
| // check after a restart, to test both the code paths in the master (one |
| // related to config change, one related to receiving a full tablet report). |
| for (int i = 0; i < 2; i++) { |
| LOG(INFO) << "iter " << i << "..."; |
| int max_expected_deletes = kMaxDeleteAttemptsPerEviction; |
| if (i == 1) { |
| LOG(INFO) << "restarting cluster..."; |
| cluster_->Shutdown(); |
| ASSERT_OK(cluster_->Restart()); |
| max_expected_deletes = 0; // There should be no attempts in this case. |
| } |
| |
| LOG(INFO) << "waiting for metrics..."; |
| |
| SCOPED_TRACE(i); |
| SCOPED_TRACE(max_expected_deletes); |
| |
| int64_t prev_heartbeats; |
| ASSERT_OK(GetInt64Metric( |
| cluster_->master()->bound_http_hostport(), |
| &METRIC_ENTITY_server, "kudu.master", |
| &METRIC_handler_latency_kudu_master_MasterService_TSHeartbeat, |
| "total_count", |
| &prev_heartbeats)); |
| |
| // Wait until several heartbeats have been received by the master. |
| // We should still have the expected number of delete requests. |
| SCOPED_LOG_TIMING(INFO, "waiting for heartbeats"); |
| ASSERT_EVENTUALLY([&] { |
| int64_t num_delete_attempts; |
| ASSERT_OK(GetInt64Metric( |
| cluster_->tablet_server(to_remove_index)->bound_http_hostport(), |
| &METRIC_ENTITY_server, "kudu.tabletserver", |
| &METRIC_handler_latency_kudu_tserver_TabletServerAdminService_DeleteTablet, |
| "total_count", |
| &num_delete_attempts)); |
| ASSERT_LE(num_delete_attempts, max_expected_deletes); |
| |
| int64_t num_heartbeats; |
| ASSERT_OK(GetInt64Metric( |
| cluster_->master()->bound_http_hostport(), |
| &METRIC_ENTITY_server, "kudu.master", |
| &METRIC_handler_latency_kudu_master_MasterService_TSHeartbeat, |
| "total_count", |
| &num_heartbeats)); |
| const int kHeartbeatsToWait = 3; |
| ASSERT_GE(num_heartbeats, prev_heartbeats + (kNumReplicas * kHeartbeatsToWait)); |
| }); |
| } |
| } |
| |
| // Parameterized test case for TABLET_DATA_DELETED deletions. |
| class DeleteTableDeletedParamTest : public DeleteTableITest, |
| public ::testing::WithParamInterface<const char*> { |
| }; |
| |
| // Test that if a server crashes mid-delete that the delete will be rolled |
| // forward on startup. Parameterized by different fault flags that cause a |
| // crash at various points. |
| TEST_P(DeleteTableDeletedParamTest, TestRollForwardDelete) { |
| NO_FATALS(StartCluster()); |
| const string fault_flag = GetParam(); |
| LOG(INFO) << "Running with fault flag: " << fault_flag; |
| |
| // Dynamically set the fault flag so they crash when DeleteTablet() is called |
| // by the Master. |
| for (int i = 0; i < cluster_->num_tablet_servers(); i++) { |
| ASSERT_OK(cluster_->SetFlag(cluster_->tablet_server(i), fault_flag, "1.0")); |
| } |
| |
| // Create a table on the cluster. We're just using TestWorkload |
| // as a convenient way to create it. |
| TestWorkload(cluster_.get()).Setup(); |
| |
| // The table should have replicas on all three tservers. |
| ASSERT_OK(inspect_->WaitForReplicaCount(3)); |
| |
| // Delete it and wait for the tablet servers to crash. |
| NO_FATALS(DeleteTable(TestWorkload::kDefaultTableName)); |
| NO_FATALS(WaitForAllTSToCrash()); |
| |
| // There should still be data left on disk. |
| Status s = inspect_->CheckNoData(); |
| ASSERT_TRUE(s.IsIllegalState()) << s.ToString(); |
| |
| // Now restart the tablet servers. They should roll forward their deletes. |
| // We don't have to reset the fault flag here because it was set dynamically. |
| for (int i = 0; i < cluster_->num_tablet_servers(); i++) { |
| cluster_->tablet_server(i)->Shutdown(); |
| ASSERT_OK(cluster_->tablet_server(i)->Restart()); |
| } |
| ASSERT_OK(inspect_->WaitForNoData()); |
| } |
| |
| // Faults appropriate for the TABLET_DATA_DELETED case. |
| const char* deleted_faults[] = {"fault_crash_after_blocks_deleted", |
| "fault_crash_after_wal_deleted", |
| "fault_crash_after_cmeta_deleted"}; |
| |
| INSTANTIATE_TEST_CASE_P(FaultFlags, DeleteTableDeletedParamTest, |
| ::testing::ValuesIn(deleted_faults)); |
| |
| // Parameterized test case for TABLET_DATA_TOMBSTONED deletions. |
| class DeleteTableTombstonedParamTest : public DeleteTableITest, |
| public ::testing::WithParamInterface<const char*> { |
| }; |
| |
| // Regression test for tablet tombstoning. Tests: |
| // 1. basic creation & tombstoning of a tablet. |
| // 2. roll-forward (crash recovery) of a partially-completed tombstoning of a tablet. |
| // 3. permanent deletion of a TOMBSTONED tablet |
| // (transition from TABLET_DATA_TOMBSTONED to TABLET_DATA_DELETED). |
| TEST_P(DeleteTableTombstonedParamTest, TestTabletTombstone) { |
| vector<string> flags; |
| // We want fast log rolls and deterministic preallocation, since we wait for |
| // a certain number of logs at the beginning of the test. |
| flags.emplace_back("--log_segment_size_mb=1"); |
| flags.emplace_back("--log_async_preallocate_segments=false"); |
| flags.emplace_back("--log_min_segments_to_retain=3"); |
| flags.emplace_back("--log_compression_codec=NO_COMPRESSION"); |
| NO_FATALS(StartCluster(flags)); |
| const string fault_flag = GetParam(); |
| LOG(INFO) << "Running with fault flag: " << fault_flag; |
| |
| MonoDelta timeout = MonoDelta::FromSeconds(30); |
| |
| // Create a table with 2 tablets. We delete the first tablet without |
| // injecting any faults, then we delete the second tablet while exercising |
| // several fault injection points. |
| const int kNumTablets = 2; |
| vector<const KuduPartialRow*> split_rows; |
| Schema schema(GetSimpleTestSchema()); |
| client::KuduSchema client_schema(client::KuduSchema::FromSchema(schema)); |
| KuduPartialRow* split_row = client_schema.NewRow(); |
| ASSERT_OK(split_row->SetInt32(0, numeric_limits<int32_t>::max() / kNumTablets)); |
| split_rows.push_back(split_row); |
| unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator()); |
| #pragma GCC diagnostic push |
| #pragma GCC diagnostic ignored "-Wdeprecated-declarations" |
| ASSERT_OK(table_creator->table_name(TestWorkload::kDefaultTableName) |
| .split_rows(split_rows) |
| .schema(&client_schema) |
| .set_range_partition_columns({ "key" }) |
| .num_replicas(3) |
| .Create()); |
| #pragma GCC diagnostic pop |
| |
| // Start a workload on the cluster, and run it until we find WALs on disk. |
| TestWorkload workload(cluster_.get()); |
| workload.set_payload_bytes(32 * 1024); // Write ops of size 32KB to quickly fill the logs. |
| workload.set_write_batch_size(1); |
| workload.Setup(); |
| |
| // The table should have 2 tablets (1 split) on all 3 tservers (for a total of 6). |
| ASSERT_OK(inspect_->WaitForReplicaCount(6)); |
| |
| // Set up the proxies so we can easily send DeleteTablet() RPCs. |
| TServerDetails* ts = ts_map_[cluster_->tablet_server(0)->uuid()]; |
| |
| // Ensure the tablet server is reporting 2 tablets. |
| vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets; |
| ASSERT_OK(itest::WaitForNumTabletsOnTS(ts, 2, timeout, &tablets)); |
| |
| LOG(INFO) << "Starting workload..."; |
| |
| // Run the workload against whoever the leader is until WALs appear on TS 0 |
| // for the tablets we created. |
| const int kTsIndex = 0; // Index of the tablet server we'll use for the test. |
| workload.Start(); |
| while (workload.rows_inserted() < 100) { |
| SleepFor(MonoDelta::FromMilliseconds(10)); |
| } |
| |
| LOG(INFO) << "Waiting for 3 wal files for tablet " |
| << tablets[0].tablet_status().tablet_id() << "..."; |
| ASSERT_OK(inspect_->WaitForMinFilesInTabletWalDirOnTS(kTsIndex, |
| tablets[0].tablet_status().tablet_id(), 3)); |
| |
| LOG(INFO) << "Waiting for 3 wal files for tablet " |
| << tablets[1].tablet_status().tablet_id() << "..."; |
| ASSERT_OK(inspect_->WaitForMinFilesInTabletWalDirOnTS(kTsIndex, |
| tablets[1].tablet_status().tablet_id(), 3)); |
| |
| LOG(INFO) << "Stopping workload..."; |
| workload.StopAndJoin(); |
| |
| // Shut down the master and the other tablet servers so they don't interfere |
| // by attempting to create or copy tablets while we delete tablets. |
| cluster_->master()->Shutdown(); |
| cluster_->tablet_server(1)->Shutdown(); |
| cluster_->tablet_server(2)->Shutdown(); |
| |
| // Tombstone the first tablet. |
| string tablet_id = tablets[0].tablet_status().tablet_id(); |
| LOG(INFO) << "Tombstoning first tablet " << tablet_id << "..."; |
| ASSERT_TRUE(inspect_->DoesConsensusMetaExistForTabletOnTS(kTsIndex, tablet_id)) << tablet_id; |
| ASSERT_OK(DeleteTablet(ts, tablet_id, TABLET_DATA_TOMBSTONED, timeout)); |
| LOG(INFO) << "Waiting for first tablet to be tombstoned..."; |
| NO_FATALS(WaitForTabletTombstonedOnTS(kTsIndex, tablet_id, CMETA_EXPECTED)); |
| |
| ASSERT_OK(itest::WaitForNumTabletsOnTS(ts, 2, timeout, &tablets)); |
| for (const ListTabletsResponsePB::StatusAndSchemaPB& t : tablets) { |
| if (t.tablet_status().tablet_id() == tablet_id) { |
| ASSERT_EQ(tablet::STOPPED, t.tablet_status().state()); |
| ASSERT_EQ(TABLET_DATA_TOMBSTONED, t.tablet_status().tablet_data_state()) |
| << t.tablet_status().tablet_id() << " not tombstoned"; |
| } |
| } |
| |
| // Now tombstone the 2nd tablet, causing a fault. |
| ASSERT_OK(cluster_->SetFlag(cluster_->tablet_server(kTsIndex), fault_flag, "1.0")); |
| tablet_id = tablets[1].tablet_status().tablet_id(); |
| LOG(INFO) << "Tombstoning second tablet " << tablet_id << "..."; |
| ignore_result(DeleteTablet(ts, tablet_id, TABLET_DATA_TOMBSTONED, timeout)); |
| NO_FATALS(WaitForTSToCrash(kTsIndex)); |
| |
| // Restart the tablet server and wait for the WALs to be deleted and for the |
| // superblock to show that it is tombstoned. |
| cluster_->tablet_server(kTsIndex)->Shutdown(); |
| ASSERT_OK(cluster_->tablet_server(kTsIndex)->Restart()); |
| LOG(INFO) << "Waiting for second tablet to be tombstoned..."; |
| NO_FATALS(WaitForTabletTombstonedOnTS(kTsIndex, tablet_id, CMETA_EXPECTED)); |
| // The tombstoned tablets will still show up in ListTablets(), |
| // just with their data state set as TOMBSTONED. They should also be listed |
| // as INITIALIZED because we restarted the server. |
| ASSERT_OK(itest::WaitForNumTabletsOnTS(ts, 2, timeout, &tablets)); |
| for (const ListTabletsResponsePB::StatusAndSchemaPB& t : tablets) { |
| ASSERT_EQ(tablet::INITIALIZED, t.tablet_status().state()); |
| ASSERT_EQ(TABLET_DATA_TOMBSTONED, t.tablet_status().tablet_data_state()) |
| << t.tablet_status().tablet_id() << " not tombstoned"; |
| } |
| |
| // Check that, upon restart of the tablet server with a tombstoned tablet, |
| // we don't unnecessary "roll forward" and rewrite the tablet metadata file |
| // when it is already fully deleted. |
| int64_t orig_mtime = inspect_->GetTabletSuperBlockMTimeOrDie(kTsIndex, tablet_id); |
| cluster_->tablet_server(kTsIndex)->Shutdown(); |
| ASSERT_OK(cluster_->tablet_server(kTsIndex)->Restart()); |
| int64_t new_mtime = inspect_->GetTabletSuperBlockMTimeOrDie(kTsIndex, tablet_id); |
| ASSERT_EQ(orig_mtime, new_mtime) |
| << "Tablet superblock should not have been re-flushed unnecessarily"; |
| |
| // Finally, delete all tablets on the TS, and wait for all data to be gone. |
| LOG(INFO) << "Deleting all tablets..."; |
| for (const ListTabletsResponsePB::StatusAndSchemaPB& tablet : tablets) { |
| string tablet_id = tablet.tablet_status().tablet_id(); |
| // We need retries here, since some of the tablets may still be |
| // bootstrapping after being restarted above. |
| ASSERT_OK(DeleteTabletWithRetries(ts, tablet_id, TABLET_DATA_DELETED, timeout)); |
| } |
| ASSERT_OK(inspect_->WaitForNoDataOnTS(kTsIndex)); |
| } |
| |
| // Faults appropriate for the TABLET_DATA_TOMBSTONED case. |
| // Tombstoning a tablet does not delete the consensus metadata. |
| const char* tombstoned_faults[] = {"fault_crash_after_blocks_deleted", |
| "fault_crash_after_wal_deleted"}; |
| INSTANTIATE_TEST_CASE_P(FaultFlags, DeleteTableTombstonedParamTest, |
| ::testing::ValuesIn(tombstoned_faults)); |
| |
| |
| class DeleteTableWhileScanInProgressParamTest : |
| public DeleteTableITest, |
| public ::testing::WithParamInterface< |
| std::tuple<KuduScanner::ReadMode, KuduClient::ReplicaSelection>> { |
| }; |
| |
| // Make sure the tablet server keeps the necessary data to serve scan request in |
| // progress if tablet is marked for deletion. |
| TEST_P(DeleteTableWhileScanInProgressParamTest, Test) { |
| const auto read_mode_to_string = [](KuduScanner::ReadMode mode) { |
| switch (mode) { |
| case KuduScanner::READ_LATEST: |
| return "READ_LATEST"; |
| case KuduScanner::READ_AT_SNAPSHOT: |
| return "READ_AT_SNAPSHOT"; |
| case KuduScanner::READ_YOUR_WRITES: |
| return "READ_YOUR_WRITES"; |
| default: |
| return "UNKNOWN"; |
| } |
| }; |
| const auto replica_sel_to_string = [](KuduClient::ReplicaSelection sel) { |
| switch (sel) { |
| case KuduClient::LEADER_ONLY: |
| return "LEADER_ONLY"; |
| case KuduClient::CLOSEST_REPLICA: |
| return "CLOSEST_REPLICA"; |
| case KuduClient::FIRST_REPLICA: |
| return "FIRST_REPLICA"; |
| default: |
| return "UNKNOWN"; |
| } |
| }; |
| |
| const std::vector<std::string> extra_ts_flags = { |
| // Set the flush threshold low so that we have a mix of flushed and |
| // unflushed operations in the WAL, when we bootstrap. |
| "--flush_threshold_mb=1", |
| |
| // Set the compaction budget to be low so that we get multiple passes |
| // of compaction instead of selecting all of the rowsets in a single |
| // compaction of the whole tablet. |
| "--tablet_compaction_budget_mb=1", |
| |
| // Set the major delta compaction ratio low enough that we trigger |
| // a lot of them. |
| "--tablet_delta_store_major_compact_min_ratio=0.001", |
| }; |
| |
| // Approximate number of rows to insert. This is not exact number due to the |
| // way the test controls the progress of the test workload. |
| #if defined(ADDRESS_SANITIZER) || defined(THREAD_SANITIZER) |
| // Test is too slow in ASAN/TSAN. |
| const size_t rows_to_insert = 10000; |
| #else |
| const size_t rows_to_insert = AllowSlowTests() ? 100000 : 10000; |
| #endif |
| |
| const auto& param = GetParam(); |
| const auto mode = std::get<0>(param); |
| const auto sel = std::get<1>(param); |
| |
| // In case of failure, print out string representation of the parameterized |
| // case for ease of troubleshooting. |
| SCOPED_TRACE(Substitute("read mode $0; replica selection mode $1", |
| read_mode_to_string(mode), |
| replica_sel_to_string(sel))); |
| NO_FATALS(StartCluster(extra_ts_flags)); |
| |
| TestWorkload w(cluster_.get()); |
| w.set_write_pattern(TestWorkload::INSERT_RANDOM_ROWS); |
| w.Setup(); |
| |
| // Start the workload, and wait to see some rows actually inserted. |
| w.Start(); |
| while (w.rows_inserted() < rows_to_insert) { |
| SleepFor(MonoDelta::FromMilliseconds(50)); |
| } |
| w.StopAndJoin(); |
| const int64_t ref_row_count = w.rows_inserted(); |
| |
| using kudu::client::sp::shared_ptr; |
| shared_ptr<KuduTable> table; |
| ASSERT_OK(w.client()->OpenTable(w.table_name(), &table)); |
| KuduScanner scanner(table.get()); |
| ASSERT_OK(scanner.SetReadMode(mode)); |
| ASSERT_OK(scanner.SetSelection(sel)); |
| // Setup batch size to be small enough to guarantee the scan |
| // will not fetch all the data at once. |
| ASSERT_OK(scanner.SetBatchSizeBytes(1)); |
| ASSERT_OK(scanner.Open()); |
| ASSERT_TRUE(scanner.HasMoreRows()); |
| KuduScanBatch batch; |
| ASSERT_OK(scanner.NextBatch(&batch)); |
| size_t row_count = batch.NumRows(); |
| |
| // Once the first batch of data has been fetched and there is some more |
| // to fetch, delete the table. |
| NO_FATALS(DeleteTable(w.table_name(), ON_ERROR_DO_NOT_DUMP_STACKS)); |
| |
| // Wait while the table is no longer advertised on the cluster. |
| // This ensures the table deletion request has been processed by tablet |
| // servers. |
| vector<string> tablets; |
| do { |
| SleepFor(MonoDelta::FromMilliseconds(250)); |
| tablets = inspect_->ListTablets(); |
| } while (!tablets.empty()); |
| |
| // Make sure the scanner can continue and fetch the rest of rows. |
| ASSERT_TRUE(scanner.HasMoreRows()); |
| while (scanner.HasMoreRows()) { |
| KuduScanBatch batch; |
| const Status s = scanner.NextBatch(&batch); |
| ASSERT_TRUE(s.ok()) << s.ToString(); |
| row_count += batch.NumRows(); |
| } |
| |
| // Verify the total row count. The exact count must be there in case of |
| // READ_AT_SNAPSHOT mode regardless of replica selection or if reading |
| // from a leader tablet in any scan mode. In the case of the READ_LATEST |
| // mode the data might be fetched from a lagging replica and the scan |
| // row count might be less than the inserted row count. |
| if (mode == KuduScanner::READ_AT_SNAPSHOT || |
| sel == KuduClient::LEADER_ONLY) { |
| EXPECT_EQ(ref_row_count, row_count); |
| } |
| |
| // Close the scanner to make sure it does not hold any references on the |
| // data about to be deleted by the hosting tablet server. |
| scanner.Close(); |
| |
| // Make sure the table has been deleted. |
| EXPECT_OK(inspect_->WaitForNoData()); |
| NO_FATALS(cluster_->AssertNoCrashes()); |
| NO_FATALS(StopCluster()); |
| } |
| |
| const KuduScanner::ReadMode read_modes[] = { |
| KuduScanner::READ_LATEST, |
| KuduScanner::READ_AT_SNAPSHOT, |
| KuduScanner::READ_YOUR_WRITES, |
| }; |
| const KuduClient::ReplicaSelection replica_selectors[] = { |
| KuduClient::LEADER_ONLY, |
| KuduClient::CLOSEST_REPLICA, |
| KuduClient::FIRST_REPLICA, |
| }; |
| INSTANTIATE_TEST_CASE_P( |
| Params, DeleteTableWhileScanInProgressParamTest, |
| ::testing::Combine(::testing::ValuesIn(read_modes), |
| ::testing::ValuesIn(replica_selectors))); |
| |
| } // namespace kudu |