| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include <cstdint> |
| #include <functional> |
| #include <initializer_list> |
| #include <memory> |
| #include <ostream> |
| #include <string> |
| #include <unordered_map> |
| #include <utility> |
| #include <vector> |
| |
| #include <gflags/gflags_declare.h> |
| #include <glog/logging.h> |
| #include <gtest/gtest.h> |
| |
| #include "kudu/common/common.pb.h" |
| #include "kudu/common/schema.h" |
| #include "kudu/common/timestamp.h" |
| #include "kudu/common/wire_protocol-test-util.h" |
| #include "kudu/common/wire_protocol.h" |
| #include "kudu/common/wire_protocol.pb.h" |
| #include "kudu/consensus/consensus.pb.h" |
| #include "kudu/consensus/log.h" |
| #include "kudu/consensus/log.pb.h" |
| #include "kudu/consensus/log_index.h" |
| #include "kudu/consensus/log_reader.h" |
| #include "kudu/consensus/log_util.h" |
| #include "kudu/consensus/metadata.pb.h" |
| #include "kudu/consensus/raft_consensus.h" |
| #include "kudu/gutil/map-util.h" |
| #include "kudu/gutil/ref_counted.h" |
| #include "kudu/gutil/strings/substitute.h" |
| #include "kudu/integration-tests/cluster_itest_util.h" |
| #include "kudu/integration-tests/internal_mini_cluster-itest-base.h" |
| #include "kudu/integration-tests/test_workload.h" |
| #include "kudu/mini-cluster/internal_mini_cluster.h" |
| #include "kudu/mini-cluster/mini_cluster.h" |
| #include "kudu/rpc/rpc_controller.h" |
| #include "kudu/tablet/metadata.pb.h" |
| #include "kudu/tablet/mvcc.h" |
| #include "kudu/tablet/tablet.h" |
| #include "kudu/tablet/tablet_replica.h" |
| #include "kudu/tserver/mini_tablet_server.h" |
| #include "kudu/tserver/tablet_server.h" |
| #include "kudu/tserver/ts_tablet_manager.h" |
| #include "kudu/tserver/tserver.pb.h" |
| #include "kudu/tserver/tserver_service.proxy.h" |
| #include "kudu/util/metrics.h" |
| #include "kudu/util/monotime.h" |
| #include "kudu/util/pb_util.h" |
| #include "kudu/util/status.h" |
| #include "kudu/util/test_macros.h" |
| #include "kudu/util/test_util.h" |
| |
| DECLARE_bool(enable_maintenance_manager); |
| DECLARE_bool(compaction_force_small_rowset_tradeoff); |
| DECLARE_bool(prevent_kudu_2233_corruption); |
| DECLARE_bool(log_preallocate_segments); |
| DECLARE_bool(log_async_preallocate_segments); |
| DECLARE_bool(raft_enable_pre_election); |
| DECLARE_double(compaction_minimum_improvement); |
| DECLARE_double(leader_failure_max_missed_heartbeat_periods); |
| DECLARE_int32(consensus_inject_latency_ms_in_notifications); |
| DECLARE_int32(heartbeat_interval_ms); |
| DECLARE_int32(raft_heartbeat_interval_ms); |
| DECLARE_int32(tablet_compaction_budget_mb); |
| DECLARE_int32(tablet_history_max_age_sec); |
| |
| #ifndef NDEBUG |
| DECLARE_bool(dcheck_on_kudu_2233_invariants); |
| #endif |
| |
| namespace kudu { |
| |
| using consensus::RaftPeerPB; |
| using log::LogReader; |
| using pb_util::SecureShortDebugString; |
| using rpc::RpcController; |
| using std::shared_ptr; |
| using std::string; |
| using std::unique_ptr; |
| using std::vector; |
| using tablet::TabletReplica; |
| using tserver::MiniTabletServer; |
| using tserver::NewScanRequestPB; |
| using tserver::ScanResponsePB; |
| using tserver::ScanRequestPB; |
| using tserver::TabletServerErrorPB; |
| using tserver::TabletServerServiceProxy; |
| |
| namespace itest { |
| |
| class TimestampAdvancementITest : public MiniClusterITestBase { |
| public: |
| const MonoDelta kTimeout = MonoDelta::FromSeconds(30); |
| |
| // Many tests will operate on a single tserver. The first one is chosen |
| // arbitrarily. |
| const int kTserver = 0; |
| |
| // Sets up a cluster and returns the tablet replica on 'ts' that has written |
| // to its WAL. 'replica' will write further messages to a new WAL segment. |
| // If 'delete_and_reinsert' is set to true, rows will be flushed, deleted, |
| // reinserted, and flushed again. |
| void SetupClusterWithWritesInWAL(int ts, bool delete_and_reinsert, |
| scoped_refptr<TabletReplica>* replica) { |
| // We're going to manually trigger maintenance ops, so disable maintenance op |
| // scheduling. |
| FLAGS_enable_maintenance_manager = false; |
| |
| // Prevent preallocation of WAL segments in order to prevent races between |
| // the WAL allocation thread and our manual rolling over of the WAL. |
| FLAGS_log_preallocate_segments = false; |
| FLAGS_log_async_preallocate_segments = false; |
| |
| NO_FATALS(StartCluster(3)); |
| |
| // Write some rows to the cluster. |
| TestWorkload write(cluster_.get()); |
| |
| // Set a low batch size so we have finer-grained control over flushing of |
| // the WAL. Too large, and the WAL may end up flushing in the background. |
| write.set_write_batch_size(1); |
| // Certain corruptions only happen when there are deletes and reinserts, so |
| // perform those ops here and below. |
| if (delete_and_reinsert) { |
| write.set_write_pattern(TestWorkload::INSERT_SEQUENTIAL_ROWS_WITH_DELETE); |
| } |
| write.Setup(); |
| write.Start(); |
| table_name_ = write.table_name(); |
| while (write.batches_completed() < 10) { |
| SleepFor(MonoDelta::FromMilliseconds(1)); |
| } |
| write.StopAndJoin(); |
| auto batches_completed = write.batches_completed(); |
| scoped_refptr<TabletReplica> tablet_replica = tablet_replica_on_ts(ts); |
| if (delete_and_reinsert) { |
| // Flush so we get a disk rowset with our deleted rows, before inserting |
| // to a new rowset. |
| ASSERT_OK(tablet_replica->tablet()->Flush()); |
| TestWorkload write(cluster_.get()); |
| write.set_write_batch_size(1); |
| write.set_table_name(table_name_); |
| write.set_write_pattern(TestWorkload::INSERT_SEQUENTIAL_ROWS); |
| write.Setup(); |
| write.Start(); |
| while (write.batches_completed() < 10) { |
| SleepFor(MonoDelta::FromMilliseconds(1)); |
| } |
| write.StopAndJoin(); |
| batches_completed += write.batches_completed(); |
| } |
| // Ensure that the replicas eventually get to a point where all of them |
| // have all the rows. This will allow us to GC the WAL, as they will not |
| // need to retain them if fully-replicated. |
| ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(30), |
| ts_map_, tablet_replica->tablet_id(), batches_completed)); |
| |
| // Flush the current log batch and roll over to get a fresh WAL segment. |
| ASSERT_OK(tablet_replica->log()->WaitUntilAllFlushed()); |
| ASSERT_OK(tablet_replica->log()->AllocateSegmentAndRollOverForTests()); |
| ASSERT_OK(tablet_replica->tablet()->Flush()); |
| |
| *replica = std::move(tablet_replica); |
| } |
| |
| // Returns the tablet server 'ts'. |
| MiniTabletServer* tserver(int ts) const { |
| DCHECK(cluster_); |
| return cluster_->mini_tablet_server(ts); |
| } |
| |
| // Get the tablet replica on the tablet server 'ts'. |
| scoped_refptr<TabletReplica> tablet_replica_on_ts(int ts) const { |
| vector<scoped_refptr<TabletReplica>> replicas; |
| tserver(ts)->server()->tablet_manager()->GetTabletReplicas(&replicas); |
| DCHECK_EQ(1, replicas.size()); |
| return replicas[0]; |
| } |
| |
| // Returns a scan response from the tablet on the given tablet server. |
| ScanResponsePB ScanResponseForTablet(int ts, const string& tablet_id) const { |
| ScanResponsePB resp; |
| RpcController rpc; |
| ScanRequestPB req; |
| NewScanRequestPB* scan = req.mutable_new_scan_request(); |
| scan->set_tablet_id(tablet_id); |
| scan->set_read_mode(READ_LATEST); |
| const Schema schema = GetSimpleTestSchema(); |
| CHECK_OK(SchemaToColumnPBs(schema, scan->mutable_projected_columns())); |
| shared_ptr<TabletServerServiceProxy> tserver_proxy = cluster_->tserver_proxy(ts); |
| CHECK_OK(tserver_proxy->Scan(req, &resp, &rpc)); |
| return resp; |
| } |
| |
| // Returns true if there are any write replicate messages in the WALs of |
| // 'tablet_id' on 'ts'. |
| Status CheckForWriteReplicatesInLog(MiniTabletServer* ts, const string& tablet_id, |
| bool* has_write_replicates) const { |
| shared_ptr<LogReader> reader; |
| RETURN_NOT_OK(LogReader::Open( |
| ts->server()->fs_manager(), |
| /*index*/nullptr, |
| tablet_id, |
| /*metric_entity*/nullptr, |
| ts->server()->file_cache(), |
| &reader)); |
| log::SegmentSequence segs; |
| reader->GetSegmentsSnapshot(&segs); |
| unique_ptr<log::LogEntryPB> entry; |
| for (const auto& seg : segs) { |
| log::LogEntryReader reader(seg.get()); |
| while (true) { |
| Status s = reader.ReadNextEntry(&entry); |
| if (s.IsEndOfFile()) break; |
| RETURN_NOT_OK(s); |
| if (entry->type() == log::REPLICATE && |
| entry->replicate().op_type() == consensus::WRITE_OP) { |
| *has_write_replicates = true; |
| return Status::OK(); |
| } |
| } |
| } |
| *has_write_replicates = false; |
| return Status::OK(); |
| } |
| |
| // Repeatedly GCs the replica's WALs until there are no more write replicates |
| // in the WAL. |
| void GCUntilNoWritesInWAL(MiniTabletServer* tserver, |
| scoped_refptr<TabletReplica> replica) { |
| ASSERT_EVENTUALLY([&] { |
| LOG(INFO) << "GCing logs..."; |
| int64_t gcable_size; |
| ASSERT_OK(replica->GetGCableDataSize(&gcable_size)); |
| ASSERT_GT(gcable_size, 0); |
| ASSERT_OK(replica->RunLogGC()); |
| |
| // Ensure that we have no writes in our WALs. |
| bool has_write_replicates; |
| ASSERT_OK(CheckForWriteReplicatesInLog(tserver, replica->tablet_id(), |
| &has_write_replicates)); |
| ASSERT_FALSE(has_write_replicates); |
| }); |
| } |
| |
| // Shuts down all the nodes in a cluster and restarts the given tserver. |
| // Waits for the given replica on the tserver to start. |
| Status ShutdownAllNodesAndRestartTserver(MiniTabletServer* tserver, |
| const string& tablet_id) { |
| LOG(INFO) << "Shutting down cluster..."; |
| cluster_->ShutdownNodes(cluster::ClusterNodes::MASTERS_ONLY); |
| // Note: We shut down tservers individually rather than using |
| // ClusterNodes::TS, since the latter would invalidate our reference to |
| // 'tserver'. |
| for (int i = 0; i < cluster_->num_tablet_servers(); i++) { |
| cluster_->mini_tablet_server(i)->Shutdown(); |
| } |
| LOG(INFO) << "Restarting single tablet server..."; |
| RETURN_NOT_OK(tserver->Restart()); |
| TServerDetails* ts_details = FindOrDie(ts_map_, tserver->uuid()); |
| return WaitUntilTabletRunning(ts_details, tablet_id, kTimeout); |
| } |
| protected: |
| string table_name_; |
| }; |
| |
| // Simulate being on an older version of Kudu and set up the conditions for |
| // KUDU-2233, where there are no writes in the WAL, and perform a compaction. |
| // Then, "upgrade" to a more recent version that handles KUDU-2233 corruption, |
| // and ensure that compacting corrupted data is handled gracefully. |
| TEST_F(TimestampAdvancementITest, TestUpgradeFromOlderCorruptedData) { |
| FLAGS_prevent_kudu_2233_corruption = false; |
| // Set a low Raft heartbeat interval so we can inject churn elections. |
| FLAGS_raft_heartbeat_interval_ms = 100; |
| |
| // This test case starts out with two overlapping rowsets, and to witness |
| // corruption in a way that causes compaction failures post-upgrade, we'll |
| // need to KUDU-2233-compact just one of them. Do so by reducing the |
| // compaction budget, and allowing small minimum improvement scores. |
| FLAGS_tablet_compaction_budget_mb = 1; |
| FLAGS_compaction_force_small_rowset_tradeoff = true; |
| FLAGS_compaction_minimum_improvement = -1; |
| |
| // Setup a cluster with some writes and a new WAL segment. |
| scoped_refptr<TabletReplica> replica; |
| NO_FATALS(SetupClusterWithWritesInWAL(kTserver, /*delete_and_reinsert=*/true, &replica)); |
| |
| MiniTabletServer* ts = tserver(kTserver); |
| const string tablet_id = replica->tablet_id(); |
| |
| // Now that we're on a new WAL segment, inject latency to consensus so we |
| // trigger elections, and wait for some terms to pass. |
| FLAGS_raft_enable_pre_election = false; |
| FLAGS_leader_failure_max_missed_heartbeat_periods = 1.0; |
| FLAGS_consensus_inject_latency_ms_in_notifications = 100; |
| const int64_t kNumExtraTerms = 10; |
| int64_t initial_raft_term = replica->consensus()->CurrentTerm(); |
| int64_t raft_term = initial_raft_term; |
| while (raft_term < initial_raft_term + kNumExtraTerms) { |
| SleepFor(MonoDelta::FromMilliseconds(10)); |
| raft_term = replica->consensus()->CurrentTerm(); |
| } |
| |
| // Reduce election churn so we can achieve a stable quorum and get to a point |
| // where we can GC our logs. Note: we won't GC if there are replicas that |
| // need to be caught up. |
| FLAGS_consensus_inject_latency_ms_in_notifications = 0; |
| NO_FATALS(GCUntilNoWritesInWAL(ts, replica)); |
| replica.reset(); |
| ASSERT_OK(ShutdownAllNodesAndRestartTserver(ts, tablet_id)); |
| |
| replica = tablet_replica_on_ts(kTserver); |
| Timestamp cleantime = replica->tablet()->mvcc_manager()->GetCleanTimestamp(); |
| ASSERT_EQ(Timestamp::kInitialTimestamp, cleantime); |
| |
| // Perform a compaction with the low budget so only a single rowset is |
| // "compacted" (yes, we can compact a single rowset, no it's not natural to |
| // do so, but it's what we need here). This will ensure that only one of the |
| // two version of a given row is corrupted, which should allow us to perform |
| // a compaction later that fails. |
| ASSERT_OK(replica->tablet()->Compact(tablet::Tablet::COMPACT_NO_FLAGS)); |
| |
| // Now restart the server with more normal flags. Despite disallowing further |
| // corruptions, when we compact, we should still be left with an error, as |
| // our data has been corrupted on an simulated older version. |
| replica.reset(); |
| FLAGS_prevent_kudu_2233_corruption = true; |
| FLAGS_tablet_compaction_budget_mb = 128; |
| // Don't crash in the event of broken invariants though, since we're working |
| // with corrupted data here. |
| #ifndef NDEBUG |
| FLAGS_dcheck_on_kudu_2233_invariants = false; |
| #endif |
| ASSERT_OK(ShutdownAllNodesAndRestartTserver(ts, tablet_id)); |
| |
| replica = tablet_replica_on_ts(kTserver); |
| Status s = replica->tablet()->Compact(tablet::Tablet::COMPACT_NO_FLAGS); |
| ASSERT_TRUE(s.IsCorruption()) << s.ToString(); |
| ASSERT_EVENTUALLY([&] { |
| ASSERT_EQ(tablet::TabletStatePB::FAILED, replica->state()); |
| }); |
| } |
| |
| // Test that bootstrapping a Raft no-op from the WAL will advance the replica's |
| // MVCC clean time timestamps. |
| TEST_F(TimestampAdvancementITest, TestNoOpAdvancesMvccSafeTimeOnBootstrap) { |
| // Set a low Raft heartbeat interval so we can inject churn elections. |
| FLAGS_raft_heartbeat_interval_ms = 100; |
| |
| // Setup a cluster with some writes and a new WAL segment. |
| scoped_refptr<TabletReplica> replica; |
| NO_FATALS(SetupClusterWithWritesInWAL(kTserver, /*delete_and_reinsert=*/false, &replica)); |
| MiniTabletServer* ts = tserver(kTserver); |
| const string tablet_id = replica->tablet_id(); |
| |
| // Now that we're on a new WAL segment, inject latency to consensus so we |
| // trigger elections, and wait for some terms to pass. |
| FLAGS_raft_enable_pre_election = false; |
| FLAGS_leader_failure_max_missed_heartbeat_periods = 1.0; |
| FLAGS_consensus_inject_latency_ms_in_notifications = 100; |
| const int64_t kNumExtraTerms = 10; |
| int64_t initial_raft_term = replica->consensus()->CurrentTerm(); |
| int64_t raft_term = initial_raft_term; |
| while (raft_term < initial_raft_term + kNumExtraTerms) { |
| SleepFor(MonoDelta::FromMilliseconds(10)); |
| raft_term = replica->consensus()->CurrentTerm(); |
| } |
| |
| // Reduce election churn so we can achieve a stable quorum and get to a point |
| // where we can GC our logs. Note: we won't GC if there are replicas that |
| // need to be caught up. |
| FLAGS_consensus_inject_latency_ms_in_notifications = 0; |
| NO_FATALS(GCUntilNoWritesInWAL(ts, replica)); |
| replica.reset(); |
| ASSERT_OK(ShutdownAllNodesAndRestartTserver(ts, tablet_id)); |
| |
| // Despite there being no writes, there are no-ops, with which we can advance |
| // MVCC's timestamps. |
| replica = tablet_replica_on_ts(kTserver); |
| Timestamp cleantime = replica->tablet()->mvcc_manager()->GetCleanTimestamp(); |
| ASSERT_NE(cleantime, Timestamp::kInitialTimestamp); |
| |
| // Verify that we can scan the replica with its MVCC timestamp raised. |
| ScanResponsePB resp = ScanResponseForTablet(kTserver, replica->tablet_id()); |
| ASSERT_FALSE(resp.has_error()) << SecureShortDebugString(resp); |
| } |
| |
| // Regression test for KUDU-2463, wherein scans would return incorrect results |
| // if a tablet's MVCC snapshot hasn't advanced. Currently, the only way to |
| // achieve this is if the cluster is restarted, the WAL only has change |
| // configs, and the tablet cannot join a quorum. |
| TEST_F(TimestampAdvancementITest, Kudu2463Test) { |
| scoped_refptr<TabletReplica> replica; |
| NO_FATALS(SetupClusterWithWritesInWAL(kTserver, /*delete_and_reinsert=*/false, &replica)); |
| MiniTabletServer* ts = tserver(kTserver); |
| |
| const string tablet_id = replica->tablet_id(); |
| |
| // Update one of the followers repeatedly to generate a bunch of config |
| // changes in all the replicas' WALs. |
| TServerDetails* leader; |
| ASSERT_OK(FindTabletLeader(ts_map_, tablet_id, kTimeout, &leader)); |
| vector<TServerDetails*> followers; |
| ASSERT_EVENTUALLY([&] { |
| ASSERT_OK(FindTabletFollowers(ts_map_, tablet_id, kTimeout, &followers)); |
| }); |
| ASSERT_FALSE(followers.empty()); |
| for (int i = 0; i < 20; i++) { |
| RaftPeerPB::MemberType type = i % 2 == 0 ? RaftPeerPB::NON_VOTER : RaftPeerPB::VOTER; |
| WARN_NOT_OK(ChangeReplicaType(leader, tablet_id, followers[0], type, kTimeout), |
| "Couldn't send a change config!"); |
| } |
| NO_FATALS(GCUntilNoWritesInWAL(ts, replica)); |
| |
| // Note: we need to reset the replica reference before restarting the server. |
| replica.reset(); |
| ASSERT_OK(ShutdownAllNodesAndRestartTserver(ts, tablet_id)); |
| |
| // Now open a scanner for the server. |
| ScanResponsePB resp = ScanResponseForTablet(kTserver, tablet_id); |
| |
| // Scanning the tablet should yield an error. |
| LOG(INFO) << "Got response: " << SecureShortDebugString(resp); |
| ASSERT_TRUE(resp.has_error()); |
| const TabletServerErrorPB& error = resp.error(); |
| ASSERT_EQ(error.code(), TabletServerErrorPB::TABLET_NOT_RUNNING); |
| ASSERT_STR_CONTAINS(resp.error().status().message(), "clean time has not yet been initialized"); |
| ASSERT_EQ(error.status().code(), AppStatusPB::UNINITIALIZED); |
| } |
| |
| // Test to ensure that MVCC's current snapshot gets updated via Raft no-ops, in |
| // both the "normal" case and the single-replica case. |
| TEST_F(TimestampAdvancementITest, TestTimestampsAdvancedFromRaftNoOp) { |
| const int kTserver = 0; |
| for (int num_replicas : { 1, 3 }) { |
| LOG(INFO) << strings::Substitute("Running with $0 replicas", num_replicas); |
| NO_FATALS(StartCluster(num_replicas)); |
| |
| // Create an empty tablet with a single replica. |
| TestWorkload create_tablet(cluster_.get()); |
| create_tablet.set_num_replicas(num_replicas); |
| create_tablet.Setup(); |
| scoped_refptr<TabletReplica> replica = tablet_replica_on_ts(kTserver); |
| |
| // Despite there not being any writes, the replica will eventually bump its |
| // MVCC clean time on its own when a leader gets elected and replicates a |
| // no-op message. |
| ASSERT_EVENTUALLY([&] { |
| ASSERT_NE(replica->tablet()->mvcc_manager()->GetCleanTimestamp(), |
| Timestamp::kInitialTimestamp); |
| }); |
| replica.reset(); |
| StopCluster(); |
| } |
| } |
| |
| } // namespace itest |
| } // namespace kudu |