| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "kudu/integration-tests/raft_consensus-itest-base.h" |
| |
| #include <algorithm> |
| #include <cstdint> |
| #include <memory> |
| #include <ostream> |
| #include <string> |
| #include <unordered_map> |
| #include <utility> |
| #include <vector> |
| |
| #include <gflags/gflags.h> |
| #include <glog/logging.h> |
| #include <gtest/gtest.h> |
| |
| #include "kudu/client/client-test-util.h" |
| #include "kudu/client/client.h" |
| #include "kudu/client/shared_ptr.h" // IWYU pragma: keep |
| #include "kudu/client/write_op.h" |
| #include "kudu/common/partial_row.h" |
| #include "kudu/common/wire_protocol.h" |
| #include "kudu/consensus/consensus.pb.h" |
| #include "kudu/consensus/opid.pb.h" |
| #include "kudu/gutil/macros.h" |
| #include "kudu/gutil/map-util.h" |
| #include "kudu/gutil/stringprintf.h" |
| #include "kudu/integration-tests/cluster_itest_util.h" |
| #include "kudu/integration-tests/mini_cluster_fs_inspector.h" |
| #include "kudu/integration-tests/test_workload.h" |
| #include "kudu/integration-tests/ts_itest-base.h" |
| #include "kudu/mini-cluster/external_mini_cluster.h" |
| #include "kudu/rpc/rpc_controller.h" |
| #include "kudu/tserver/tablet_server-test-base.h" |
| #include "kudu/tserver/tserver.pb.h" |
| #include "kudu/tserver/tserver_service.proxy.h" |
| #include "kudu/util/countdown_latch.h" |
| #include "kudu/util/metrics.h" |
| #include "kudu/util/monotime.h" |
| #include "kudu/util/pb_util.h" |
| #include "kudu/util/slice.h" |
| #include "kudu/util/status.h" |
| #include "kudu/util/test_macros.h" |
| // The IWYU confuses BehindWalGcBehavior::SHUTDOWN with TabletStatePB::SHUTDOWN. |
| // IWYU pragma: no_include "kudu/tablet/metadata.pb.h" |
| |
| DEFINE_int32(num_client_threads, 8, |
| "Number of client threads to launch"); |
| DEFINE_int32(client_inserts_per_thread, 50, |
| "Number of rows inserted by each client thread"); |
| DECLARE_int32(consensus_rpc_timeout_ms); |
| DEFINE_int32(client_num_batches_per_thread, 5, |
| "In how many batches to group the rows, for each client"); |
| |
| METRIC_DECLARE_entity(tablet); |
| METRIC_DECLARE_counter(transaction_memory_pressure_rejections); |
| METRIC_DECLARE_gauge_int64(raft_term); |
| |
| using kudu::client::KuduInsert; |
| using kudu::client::KuduSession; |
| using kudu::client::KuduTable; |
| using kudu::client::sp::shared_ptr; |
| using kudu::cluster::ExternalTabletServer; |
| using kudu::consensus::OpId; |
| using kudu::itest::GetInt64Metric; |
| using kudu::itest::TServerDetails; |
| using kudu::pb_util::SecureDebugString; |
| using kudu::rpc::RpcController; |
| using std::string; |
| using std::unique_ptr; |
| using std::vector; |
| |
| namespace kudu { |
| namespace tserver { |
| |
| static const int kConsensusRpcTimeoutForTests = 50; |
| |
| RaftConsensusITestBase::RaftConsensusITestBase() |
| : inserters_(FLAGS_num_client_threads) { |
| } |
| |
| void RaftConsensusITestBase::SetUp() { |
| TabletServerIntegrationTestBase::SetUp(); |
| FLAGS_consensus_rpc_timeout_ms = kConsensusRpcTimeoutForTests; |
| } |
| |
| void RaftConsensusITestBase::ScanReplica(TabletServerServiceProxy* replica_proxy, |
| vector<string>* results) { |
| ScanRequestPB req; |
| ScanResponsePB resp; |
| RpcController rpc; |
| rpc.set_timeout(MonoDelta::FromSeconds(10)); // Squelch warnings. |
| |
| NewScanRequestPB* scan = req.mutable_new_scan_request(); |
| scan->set_tablet_id(tablet_id_); |
| ASSERT_OK(SchemaToColumnPBs(schema_, scan->mutable_projected_columns())); |
| |
| // Send the call |
| { |
| req.set_batch_size_bytes(0); |
| SCOPED_TRACE(SecureDebugString(req)); |
| ASSERT_OK(replica_proxy->Scan(req, &resp, &rpc)); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| if (resp.has_error()) { |
| ASSERT_OK(StatusFromPB(resp.error().status())); |
| } |
| } |
| |
| if (!resp.has_more_results()) { |
| return; |
| } |
| |
| // Drain all the rows from the scanner. |
| NO_FATALS(DrainScannerToStrings(resp.scanner_id(), |
| schema_, |
| results, |
| replica_proxy)); |
| |
| std::sort(results->begin(), results->end()); |
| } |
| |
| void RaftConsensusITestBase::InsertTestRowsRemoteThread( |
| int first_row, int count, int num_batches, |
| const vector<unique_ptr<CountDownLatch>>& latches) { |
| shared_ptr<KuduTable> table; |
| CHECK_OK(client_->OpenTable(kTableId, &table)); |
| |
| shared_ptr<KuduSession> session = client_->NewSession(); |
| session->SetTimeoutMillis(60000); |
| CHECK_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH)); |
| |
| for (int i = 0; i < num_batches; i++) { |
| int first_row_in_batch = first_row + (i * count / num_batches); |
| int last_row_in_batch = first_row_in_batch + count / num_batches; |
| |
| for (int j = first_row_in_batch; j < last_row_in_batch; j++) { |
| unique_ptr<KuduInsert> insert(table->NewInsert()); |
| KuduPartialRow* row = insert->mutable_row(); |
| CHECK_OK(row->SetInt32(0, j)); |
| CHECK_OK(row->SetInt32(1, j * 2)); |
| CHECK_OK(row->SetStringCopy(2, Slice(StringPrintf("hello %d", j)))); |
| CHECK_OK(session->Apply(insert.release())); |
| } |
| |
| FlushSessionOrDie(session); |
| |
| int inserted = last_row_in_batch - first_row_in_batch; |
| for (const auto& latch : latches) { |
| latch->CountDown(inserted); |
| } |
| } |
| |
| inserters_.CountDown(); |
| } |
| |
| void RaftConsensusITestBase::AddFlagsForLogRolls(vector<string>* extra_tserver_flags) { |
| // We configure a small log segment size so that we roll frequently, |
| // configure a small cache size so that we evict data from the cache, and |
| // retain as few segments as possible. We also turn off async segment |
| // allocation -- this ensures that we roll many segments of logs (with async |
| // allocation, it's possible that the preallocation is slow and we wouldn't |
| // roll deterministically). |
| // |
| // Additionally, we disable log compression, since these tests write a lot of |
| // repetitive data to cause the rolls, and compression would make it all tiny. |
| extra_tserver_flags->push_back("--log_compression_codec=no_compression"); |
| extra_tserver_flags->push_back("--log_cache_size_limit_mb=1"); |
| extra_tserver_flags->push_back("--log_segment_size_mb=1"); |
| extra_tserver_flags->push_back("--log_async_preallocate_segments=false"); |
| extra_tserver_flags->push_back("--log_min_segments_to_retain=1"); |
| extra_tserver_flags->push_back("--log_max_segments_to_retain=3"); |
| extra_tserver_flags->push_back("--maintenance_manager_polling_interval_ms=100"); |
| extra_tserver_flags->push_back("--log_target_replay_size_mb=1"); |
| // We write 128KB cells in CauseFollowerToFallBehindLogGC(): bump the limit. |
| extra_tserver_flags->push_back("--max_cell_size_bytes=1000000"); |
| } |
| |
| void RaftConsensusITestBase::CauseFollowerToFallBehindLogGC( |
| const itest::TabletServerMap& tablet_servers, |
| string* leader_uuid, |
| int64_t* orig_term, |
| string* fell_behind_uuid, |
| BehindWalGcBehavior tserver_behavior, |
| const MonoDelta& pre_workload_delay) { |
| MonoDelta kTimeout = MonoDelta::FromSeconds(10); |
| // Wait for all of the replicas to have acknowledged the elected |
| // leader and logged the first NO_OP. |
| ASSERT_OK(WaitForServersToAgree(kTimeout, tablet_servers, tablet_id_, 1)); |
| |
| // Pause or shutdown one server. This might be the leader, and making it |
| // unresponsive will cause a leader election to happen. |
| TServerDetails* replica = tablet_replicas_.begin()->second; |
| CauseSpecificFollowerToFallBehindLogGC(tablet_servers, replica->uuid(), |
| leader_uuid, orig_term, |
| tserver_behavior, pre_workload_delay); |
| if (fell_behind_uuid) *fell_behind_uuid = replica->uuid(); |
| } |
| |
| void RaftConsensusITestBase::CauseSpecificFollowerToFallBehindLogGC( |
| const itest::TabletServerMap& tablet_servers, |
| const string& follower_uuid_to_fail, |
| string* leader_uuid, |
| int64_t* orig_term, |
| BehindWalGcBehavior tserver_behavior, |
| const MonoDelta& pre_workload_delay) { |
| MonoDelta kTimeout = MonoDelta::FromSeconds(10); |
| |
| TServerDetails* replica = FindOrDie(tablet_servers_, follower_uuid_to_fail); |
| ExternalTabletServer* replica_ets = cluster_->tablet_server_by_uuid(replica->uuid()); |
| switch (tserver_behavior) { |
| case BehindWalGcBehavior::STOP_CONTINUE: |
| ASSERT_OK(replica_ets->Pause()); |
| break; |
| case BehindWalGcBehavior::SHUTDOWN_RESTART: FALLTHROUGH_INTENDED; |
| case BehindWalGcBehavior::SHUTDOWN: |
| replica_ets->Shutdown(); |
| break; |
| case BehindWalGcBehavior::DO_NOT_TAMPER: |
| // Do nothing. |
| break; |
| default: |
| CHECK(false) << tserver_behavior |
| << ": unknown behavior for tserver behind WAL GC threshold"; |
| break; |
| } |
| |
| // Find a leader. In case we paused the leader above, this will wait until |
| // we have elected a new one. |
| TServerDetails* leader = nullptr; |
| while (true) { |
| Status s = GetLeaderReplicaWithRetries(tablet_id_, &leader); |
| if (s.ok() && leader != nullptr && leader != replica) { |
| break; |
| } |
| SleepFor(MonoDelta::FromMilliseconds(10)); |
| } |
| if (leader_uuid) *leader_uuid = leader->uuid(); |
| int leader_index = cluster_->tablet_server_index_by_uuid(leader->uuid()); |
| |
| if (pre_workload_delay.Initialized()) { |
| SleepFor(pre_workload_delay); |
| } |
| |
| TestWorkload workload(cluster_.get()); |
| workload.set_table_name(kTableId); |
| workload.set_timeout_allowed(true); |
| workload.set_payload_bytes(128 * 1024); // Write ops of size 128KB. |
| workload.set_write_batch_size(1); |
| workload.set_num_write_threads(4); |
| workload.Setup(); |
| workload.Start(); |
| |
| LOG(INFO) << "Waiting until we've written at least 4MB..."; |
| while (workload.rows_inserted() < 8 * 4) { |
| SleepFor(MonoDelta::FromMilliseconds(10)); |
| } |
| workload.StopAndJoin(); |
| |
| LOG(INFO) << "Waiting for log GC on " << leader->uuid(); |
| // Some WAL segments must exist, but wal segment 1 must not exist. |
| ASSERT_OK(inspect_->WaitForFilePatternInTabletWalDirOnTs( |
| leader_index, tablet_id_, { "wal-" }, { "wal-000000001" })); |
| |
| LOG(INFO) << "Log GC complete on " << leader->uuid(); |
| |
| // Then wait another couple of seconds to be sure that it has bothered to try |
| // to write to the paused peer. |
| // TODO(unknown): would be nice to be able to poll the leader with an RPC like |
| // GetLeaderStatus() which could tell us whether it has made any requests |
| // since the log GC. |
| SleepFor(MonoDelta::FromSeconds(2)); |
| |
| // Make a note of whatever the current term of the cluster is, |
| // before we resume the follower. |
| { |
| OpId op_id; |
| ASSERT_OK(GetLastOpIdForReplica(tablet_id_, leader, consensus::RECEIVED_OPID, kTimeout, |
| &op_id)); |
| if (orig_term) *orig_term = op_id.term(); |
| LOG(INFO) << "Servers converged with original term " << op_id.term(); |
| } |
| |
| if (tserver_behavior == BehindWalGcBehavior::STOP_CONTINUE) { |
| // Resume the follower. |
| LOG(INFO) << "Resuming " << replica->uuid(); |
| ASSERT_OK(replica_ets->Resume()); |
| } else if (tserver_behavior == BehindWalGcBehavior::SHUTDOWN_RESTART) { |
| LOG(INFO) << "Restarting " << replica->uuid(); |
| ASSERT_OK(replica_ets->Restart()); |
| } |
| |
| // Make sure the involved servsers didn't crash. |
| for (const auto& e: tablet_servers) { |
| const auto& uuid = e.first; |
| if (tserver_behavior == BehindWalGcBehavior::SHUTDOWN && |
| uuid == replica->uuid()) { |
| ASSERT_TRUE(cluster_->tablet_server_by_uuid(uuid)->IsShutdown()) |
| << "Tablet server " << uuid << " is not shutdown"; |
| } else if (tserver_behavior != BehindWalGcBehavior::DO_NOT_TAMPER) { |
| ASSERT_TRUE(cluster_->tablet_server_by_uuid(uuid)->IsProcessAlive()) |
| << "Tablet server " << uuid << " crashed"; |
| } |
| } |
| } |
| |
| Status RaftConsensusITestBase::GetTermMetricValue(ExternalTabletServer* ts, |
| int64_t *term) { |
| return GetInt64Metric(ts->bound_http_hostport(), |
| &METRIC_ENTITY_tablet, nullptr, &METRIC_raft_term, |
| "value", term); |
| } |
| |
| } // namespace tserver |
| } // namespace kudu |
| |