| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include <algorithm> |
| #include <atomic> |
| #include <cstdint> |
| #include <functional> |
| #include <iterator> |
| #include <map> |
| #include <memory> |
| #include <mutex> |
| #include <optional> |
| #include <ostream> |
| #include <string> |
| #include <thread> |
| #include <tuple> |
| #include <unordered_set> |
| #include <utility> |
| #include <vector> |
| |
| #include <gflags/gflags.h> |
| #include <glog/logging.h> |
| #include <gtest/gtest.h> |
| #include <kudu/gutil/strings/util.h> |
| |
| #include "kudu/client/client.h" |
| #include "kudu/client/schema.h" |
| #include "kudu/common/common.pb.h" |
| #include "kudu/common/wire_protocol.h" |
| #include "kudu/common/wire_protocol.pb.h" |
| #include "kudu/consensus/consensus.pb.h" |
| #include "kudu/consensus/consensus.proxy.h" |
| #include "kudu/consensus/metadata.pb.h" |
| #include "kudu/fs/dir_manager.h" |
| #include "kudu/fs/fs_manager.h" |
| #include "kudu/gutil/map-util.h" |
| #include "kudu/gutil/ref_counted.h" |
| #include "kudu/gutil/strings/join.h" |
| #include "kudu/gutil/strings/substitute.h" |
| #include "kudu/integration-tests/cluster_itest_util.h" |
| #include "kudu/integration-tests/cluster_verifier.h" |
| #include "kudu/integration-tests/test_workload.h" |
| #include "kudu/master/master.h" |
| #include "kudu/master/master.pb.h" |
| #include "kudu/master/master.proxy.h" |
| #include "kudu/master/sys_catalog.h" |
| #include "kudu/mini-cluster/external_mini_cluster.h" |
| #include "kudu/mini-cluster/mini_cluster.h" |
| #include "kudu/rpc/rpc_controller.h" |
| #include "kudu/tools/tool_test_util.h" |
| #include "kudu/tserver/tserver.pb.h" |
| #include "kudu/util/env.h" |
| #include "kudu/util/env_util.h" |
| #include "kudu/util/locks.h" |
| #include "kudu/util/metrics.h" |
| #include "kudu/util/monotime.h" |
| #include "kudu/util/net/net_util.h" |
| #include "kudu/util/net/sockaddr.h" |
| #include "kudu/util/net/socket.h" |
| #include "kudu/util/pb_util.h" |
| #include "kudu/util/random.h" |
| #include "kudu/util/scoped_cleanup.h" |
| #include "kudu/util/status.h" |
| #include "kudu/util/test_macros.h" |
| #include "kudu/util/test_util.h" |
| |
| DECLARE_string(fs_wal_dir); |
| DECLARE_string(fs_data_dirs); |
| |
| METRIC_DECLARE_histogram(log_gc_duration); |
| METRIC_DECLARE_entity(tablet); |
| |
| using kudu::client::KuduClient; |
| using kudu::client::KuduColumnSchema; |
| using kudu::client::KuduSchema; |
| using kudu::client::KuduSchemaBuilder; |
| using kudu::client::KuduTableCreator; |
| using kudu::client::sp::shared_ptr; |
| using kudu::cluster::ExternalDaemonOptions; |
| using kudu::cluster::ExternalMaster; |
| using kudu::cluster::ExternalMiniCluster; |
| using kudu::cluster::ExternalMiniClusterOptions; |
| using kudu::cluster::MiniCluster; |
| using kudu::consensus::ConsensusServiceProxy; |
| using kudu::consensus::HealthReportPB; |
| using kudu::consensus::LeaderStepDownRequestPB; |
| using kudu::consensus::LeaderStepDownResponsePB; |
| using kudu::consensus::RaftConfigPB; |
| using kudu::consensus::RaftPeerPB; |
| using kudu::pb_util::SecureShortDebugString; |
| using kudu::rpc::RpcController; |
| using std::atomic; |
| using std::map; |
| using std::optional; |
| using std::string; |
| using std::thread; |
| using std::tuple; |
| using std::unique_ptr; |
| using std::unordered_set; |
| using std::vector; |
| using strings::Substitute; |
| |
| namespace kudu { |
| namespace master { |
| |
| namespace { |
| Status CreateTableWithClient(KuduClient* client, const std::string& table_name) { |
| KuduSchema schema; |
| KuduSchemaBuilder b; |
| b.AddColumn("key")->Type(KuduColumnSchema::INT32)->NotNull()->PrimaryKey(); |
| RETURN_NOT_OK(b.Build(&schema)); |
| unique_ptr<KuduTableCreator> table_creator(client->NewTableCreator()); |
| return table_creator->table_name(table_name) |
| .schema(&schema) |
| .set_range_partition_columns({ "key" }) |
| .num_replicas(1) |
| .Create(); |
| } |
| Status CreateTable(ExternalMiniCluster* cluster, |
| const std::string& table_name) { |
| shared_ptr<KuduClient> client; |
| RETURN_NOT_OK(cluster->CreateClient(nullptr, &client)); |
| return CreateTableWithClient(client.get(), table_name); |
| } |
| |
| Status ReserveSocketForMaster(int master_idx, unique_ptr<Socket>* socket, |
| Sockaddr* addr, HostPort* hp) { |
| unique_ptr<Socket> s; |
| Sockaddr a; |
| RETURN_NOT_OK(MiniCluster::ReserveDaemonSocket(MiniCluster::MASTER, master_idx, |
| kDefaultBindMode, &s)); |
| RETURN_NOT_OK(s->GetSocketAddress(&a)); |
| *socket = std::move(s); |
| *addr = a; |
| *hp = HostPort(a); |
| return Status::OK(); |
| } |
| |
| // Functor that takes a leader_master_idx and runs the desired master RPC against |
| // the leader master returning the RPC status and the optional MasterErrorPB::Code. |
| typedef std::function< |
| std::pair<Status, optional<MasterErrorPB::Code>>(int leader_master_idx)> MasterRPC; |
| |
| // Helper function that runs the master RPC against the leader master and retries the RPC |
| // if the expected leader master returns NOT_THE_LEADER error due to leadership change. |
| // Returns a single combined Status: |
| // - RPC return status if not OK. |
| // - IllegalState for a master response error other than NOT_THE_LEADER error. |
| // - TimedOut if all attempts to run the RPC against leader master are exhausted. |
| // - OK if the master RPC is successful. |
| Status RunLeaderMasterRPC(const MasterRPC& master_rpc, ExternalMiniCluster* cluster) { |
| int64_t time_left_to_sleep_msecs = 2000; |
| while (time_left_to_sleep_msecs > 0) { |
| int leader_master_idx; |
| RETURN_NOT_OK(cluster->GetLeaderMasterIndex(&leader_master_idx)); |
| const auto& rpc_result = master_rpc(leader_master_idx); |
| RETURN_NOT_OK(rpc_result.first); |
| const auto& master_error = rpc_result.second; |
| if (!master_error) { |
| return Status::OK(); |
| } |
| if (master_error != MasterErrorPB::NOT_THE_LEADER) { |
| // Some other master error. |
| return Status::IllegalState(Substitute("Master error: $0"), |
| MasterErrorPB_Code_Name(*master_error)); |
| } |
| // NOT_THE_LEADER error, so retry after some duration. |
| static const MonoDelta kSleepDuration = MonoDelta::FromMilliseconds(100); |
| SleepFor(kSleepDuration); |
| time_left_to_sleep_msecs -= kSleepDuration.ToMilliseconds(); |
| } |
| return Status::TimedOut("Failed contacting the right leader master after multiple attempts"); |
| } |
| |
| // Run ListMasters RPC, retrying on leadership change, returning the response |
| // in 'resp'. |
| Status RunListMasters(ListMastersResponsePB* resp, ExternalMiniCluster* cluster) { |
| auto list_masters = [&] (int leader_master_idx) { |
| ListMastersRequestPB req; |
| RpcController rpc; |
| Status s = cluster->master_proxy(leader_master_idx)->ListMasters(req, resp, &rpc); |
| optional<MasterErrorPB::Code> err_code; |
| if (resp->has_error()) { |
| err_code.emplace(resp->error().code()); |
| } |
| return std::make_pair(s, err_code); |
| }; |
| return RunLeaderMasterRPC(list_masters, cluster); |
| } |
| |
| // Verify the 'cluster' contains 'num_masters' on all masters, and that they |
| // are all voters. Returns an error if the expected state is not present. |
| // |
| // This should be used instead of VerifyVoterMastersForCluster() if it is |
| // required that all masters have accepted the config change. E.g. tests that |
| // restart a cluster after adding a master should verify that all masters agree |
| // before restarting, in case lagging masters start up with stale configs. |
| // |
| // TODO(awong): we should be more robust to starting up with mismatched on-disk |
| // configs, if we can help it. |
| Status VerifyVotersOnAllMasters(int num_masters, ExternalMiniCluster* cluster) { |
| for (int i = 0; i < cluster->num_masters(); i++) { |
| ListMastersResponsePB resp; |
| ListMastersRequestPB req; |
| RpcController rpc; |
| RETURN_NOT_OK(cluster->master_proxy(i)->ListMasters(req, &resp, &rpc)); |
| if (num_masters != resp.masters_size()) { |
| return Status::IllegalState(Substitute("expected $0 masters but got $1", |
| num_masters, resp.masters_size())); |
| } |
| for (const auto& master : resp.masters()) { |
| if ((master.role() != RaftPeerPB::LEADER && master.role() != RaftPeerPB::FOLLOWER) || |
| master.member_type() != RaftPeerPB::VOTER || |
| master.registration().rpc_addresses_size() != 1) { |
| return Status::IllegalState(Substitute("bad master: $0", SecureShortDebugString(master))); |
| } |
| } |
| } |
| return Status::OK(); |
| } |
| |
| // Verify the ExternalMiniCluster 'cluster' contains 'num_masters' overall and |
| // are all VOTERS. Populates the new master addresses in 'master_hps', if not |
| // nullptr. Returns an error if the expected state is not present. |
| // |
| // This should be used instead of VerifyVotersOnAllMasters() if the test does |
| // not need to wait for all masters to agree on the config. |
| Status VerifyVoterMastersForCluster(int num_masters, vector<HostPort>* master_hps, |
| ExternalMiniCluster* cluster) { |
| ListMastersResponsePB resp; |
| RETURN_NOT_OK(RunListMasters(&resp, cluster)); |
| if (num_masters != resp.masters_size()) { |
| return Status::IllegalState(Substitute("expected $0 masters but got $1", |
| num_masters, resp.masters_size())); |
| } |
| vector<HostPort> hps; |
| for (const auto& master : resp.masters()) { |
| if ((master.role() != RaftPeerPB::LEADER && master.role() != RaftPeerPB::FOLLOWER) || |
| master.member_type() != RaftPeerPB::VOTER || |
| master.registration().rpc_addresses_size() != 1) { |
| return Status::IllegalState(Substitute("bad master: $0", SecureShortDebugString(master))); |
| } |
| hps.emplace_back(HostPortFromPB(master.registration().rpc_addresses(0))); |
| } |
| if (master_hps) { |
| *master_hps = std::move(hps); |
| } |
| return Status::OK(); |
| } |
| |
| // Initiates leadership transfer to the specified master returning status of |
| // the request. The request is performed synchronously, though the transfer of |
| // leadership is asynchronous -- callers need to wait to ensure leadership is |
| // actually transferred. |
| Status TransferMasterLeadershipAsync(ExternalMiniCluster* cluster, const string& master_uuid) { |
| int leader_master_idx; |
| RETURN_NOT_OK(cluster->GetLeaderMasterIndex(&leader_master_idx)); |
| auto leader_master_addr = cluster->master(leader_master_idx)->bound_rpc_addr(); |
| ConsensusServiceProxy consensus_proxy(cluster->messenger(), leader_master_addr, |
| leader_master_addr.host()); |
| LeaderStepDownRequestPB req; |
| req.set_dest_uuid(cluster->master(leader_master_idx)->uuid()); |
| req.set_tablet_id(master::SysCatalogTable::kSysCatalogTabletId); |
| req.set_new_leader_uuid(master_uuid); |
| req.set_mode(consensus::GRACEFUL); |
| LeaderStepDownResponsePB resp; |
| RpcController rpc; |
| RETURN_NOT_OK(consensus_proxy.LeaderStepDown(req, &resp, &rpc)); |
| if (resp.has_error()) { |
| return StatusFromPB(resp.error().status()); |
| } |
| return Status::OK(); |
| } |
| |
| // Transfers leadership among masters in the 'cluster' to the specified 'new_master_uuid' |
| // verifies the transfer is successful. |
| void TransferMasterLeadership(ExternalMiniCluster* cluster, const string& new_master_uuid) { |
| ASSERT_OK(TransferMasterLeadershipAsync(cluster, new_master_uuid)); |
| // It takes some time for the leadership transfer to complete, hence the |
| // ASSERT_EVENTUALLY. |
| ASSERT_EVENTUALLY([&] { |
| int leader_master_idx = -1; |
| ASSERT_OK(cluster->GetLeaderMasterIndex(&leader_master_idx)); |
| ASSERT_EQ(new_master_uuid, cluster->master(leader_master_idx)->uuid()); |
| }); |
| } |
| |
| // Fetch uuid of the specified 'fs_wal_dir' and 'fs_data_dirs'. |
| Status GetFsUuid(const string& fs_wal_dir, const vector<string>& fs_data_dirs, string* uuid) { |
| google::FlagSaver saver; |
| FLAGS_fs_wal_dir = fs_wal_dir; |
| FLAGS_fs_data_dirs = JoinStrings(fs_data_dirs, ","); |
| FsManagerOpts fs_opts; |
| fs_opts.read_only = true; |
| fs_opts.update_instances = fs::UpdateInstanceBehavior::DONT_UPDATE; |
| FsManager fs_manager(Env::Default(), std::move(fs_opts)); |
| RETURN_NOT_OK(fs_manager.PartialOpen()); |
| *uuid = fs_manager.uuid(); |
| return Status::OK(); |
| } |
| |
| } // anonymous namespace |
| |
| // Test class for testing addition/removal of masters to a Kudu cluster. |
| class DynamicMultiMasterTest : public KuduTest { |
| protected: |
| typedef map<string, string> EnvVars; |
| |
| void SetUpWithNumMasters(int num_masters) { |
| // Initial number of masters in the cluster before adding a master. |
| orig_num_masters_ = num_masters; |
| |
| // Reserving a port upfront for the new master that'll be added to the cluster. |
| ASSERT_OK(ReserveSocketForMaster(/*index*/orig_num_masters_, &reserved_socket_, |
| &reserved_addr_, &reserved_hp_)); |
| } |
| |
| void StartCluster(const vector<string>& extra_master_flags = {}, |
| bool supply_single_master_addr = true) { |
| opts_.num_masters = orig_num_masters_; |
| opts_.supply_single_master_addr = supply_single_master_addr; |
| opts_.extra_master_flags = extra_master_flags; |
| opts_.extra_master_flags.emplace_back("--master_auto_join_cluster=false"); |
| |
| cluster_.reset(new ExternalMiniCluster(opts_)); |
| ASSERT_OK(cluster_->Start()); |
| ASSERT_OK(cluster_->WaitForTabletServerCount(cluster_->num_tablet_servers(), |
| MonoDelta::FromSeconds(5))); |
| } |
| |
| // Bring up a cluster with bunch of tables and ensure the system catalog WAL |
| // has been GC'ed. |
| // Out parameter 'master_hps' returns the HostPort of the masters in the original |
| // cluster. |
| void StartClusterWithSysCatalogGCed(vector<HostPort>* master_hps, |
| const vector<string>& extra_flags = {}) { |
| // Using low values of log flush threshold and segment size to trigger GC of the |
| // sys catalog WAL |
| vector<string> flags = {"--flush_threshold_secs=0", "--log_segment_size_mb=1"}; |
| flags.insert(flags.end(), extra_flags.begin(), extra_flags.end()); |
| NO_FATALS(StartCluster(flags)); |
| |
| // Verify that masters are running as VOTERs and collect their addresses to be used |
| // for starting the new master. |
| NO_FATALS(VerifyVoterMasters(orig_num_masters_, master_hps)); |
| |
| // Function to fetch the GC count of the system catalog WAL. |
| auto get_sys_catalog_wal_gc_count = [&] (int master_idx) { |
| int64_t sys_catalog_wal_gc_count = 0; |
| CHECK_OK(itest::GetInt64Metric(cluster_->master(master_idx)->bound_http_hostport(), |
| &METRIC_ENTITY_tablet, |
| master::SysCatalogTable::kSysCatalogTabletId, |
| &METRIC_log_gc_duration, |
| "total_count", |
| &sys_catalog_wal_gc_count)); |
| return sys_catalog_wal_gc_count; |
| }; |
| |
| vector<int64_t> orig_gc_count(orig_num_masters_); |
| for (int master_idx = 0; master_idx < orig_num_masters_; master_idx++) { |
| orig_gc_count[master_idx] = get_sys_catalog_wal_gc_count(master_idx); |
| } |
| |
| // Function to compute whether all masters have updated the system catalog WAL GC count. |
| // Ideally we could just check against the leader master but the leader master could |
| // potentially change hence checking across all masters. |
| auto all_masters_updated_wal_gc_count = [&] { |
| int num_masters_gc_updated = 0; |
| for (int master_idx = 0; master_idx < orig_num_masters_; master_idx++) { |
| if (get_sys_catalog_wal_gc_count(master_idx) > orig_gc_count[master_idx]) { |
| num_masters_gc_updated++; |
| } |
| } |
| return num_masters_gc_updated == orig_num_masters_; |
| }; |
| |
| // Create a bunch of tables to ensure sys catalog WAL gets GC'ed. |
| // Need to create around 1k tables even with lowest flush threshold and log segment size. |
| int i; |
| bool wal_gc_counts_updated = false; |
| for (i = 1; i < 1000; i++) { |
| if (all_masters_updated_wal_gc_count()) { |
| wal_gc_counts_updated = true; |
| break; |
| } |
| string table_name = Substitute("Table.$0.$1", CURRENT_TEST_NAME(), std::to_string(i)); |
| ASSERT_OK(CreateTable(cluster_.get(), table_name)); |
| } |
| LOG(INFO) << "Number of tables created: " << i - 1; |
| if (wal_gc_counts_updated) { |
| // We are done here and no need to wait further. |
| return; |
| } |
| |
| MonoTime deadline = MonoTime::Now() + MonoDelta::FromSeconds(2); |
| while (MonoTime::Now() < deadline) { |
| if (all_masters_updated_wal_gc_count()) { |
| wal_gc_counts_updated = true; |
| break; |
| } |
| SleepFor(MonoDelta::FromMilliseconds(100)); |
| } |
| ASSERT_TRUE(wal_gc_counts_updated) << "Timed out waiting for system catalog WAL to be GC'ed"; |
| } |
| |
| // Brings up a new master 'new_master_hp' where 'master_hps' contains master addresses including |
| // the new master to be added at the index 'new_master_idx' in the ExternalMiniCluster. |
| void StartNewMaster(const vector<HostPort>& master_hps, |
| const HostPort& new_master_hp, |
| int new_master_idx, |
| scoped_refptr<ExternalMaster>* new_master_out) { |
| vector<string> master_addresses; |
| master_addresses.reserve(master_hps.size()); |
| for (const auto& hp : master_hps) { |
| master_addresses.emplace_back(hp.ToString()); |
| } |
| |
| ExternalDaemonOptions new_master_opts; |
| ASSERT_OK(BuildMasterOpts(new_master_idx, new_master_hp, &new_master_opts)); |
| auto& flags = new_master_opts.extra_flags; |
| flags.insert(flags.end(), { |
| "--master_addresses=" + JoinStrings(master_addresses, ","), |
| "--master_address_add_new_master=" + new_master_hp.ToString(), |
| "--master_auto_join_cluster=false", |
| }); |
| |
| LOG(INFO) << "Bringing up the new master at: " << new_master_hp.ToString(); |
| scoped_refptr<ExternalMaster> master = new ExternalMaster(new_master_opts); |
| ASSERT_OK(master->Start()); |
| ASSERT_OK(master->WaitForCatalogManager()); |
| |
| Sockaddr new_master_addr; |
| ASSERT_OK(SockaddrFromHostPort(new_master_hp, &new_master_addr)); |
| MasterServiceProxy new_master_proxy(new_master_opts.messenger, new_master_addr, |
| new_master_hp.host()); |
| |
| GetMasterRegistrationRequestPB req; |
| GetMasterRegistrationResponsePB resp; |
| RpcController rpc; |
| ASSERT_OK(new_master_proxy.GetMasterRegistration(req, &resp, &rpc)); |
| ASSERT_FALSE(resp.has_error()); |
| ASSERT_EQ(RaftPeerPB::NON_VOTER, resp.member_type()); |
| ASSERT_EQ(RaftPeerPB::LEARNER, resp.role()); |
| *new_master_out = std::move(master); |
| } |
| |
| void VerifyVoterMasters(int num_masters, vector<HostPort>* master_hps = nullptr, |
| ExternalMiniCluster* cluster = nullptr) { |
| if (cluster == nullptr) { |
| cluster = cluster_.get(); |
| } |
| NO_FATALS(VerifyVoterMastersForCluster(num_masters, master_hps, cluster)); |
| } |
| |
| // Fetch a follower (non-leader) master index from the cluster. |
| Status GetFollowerMasterIndex(int* follower_master_idx) { |
| int leader_master_idx; |
| RETURN_NOT_OK(cluster_->GetLeaderMasterIndex(&leader_master_idx)); |
| int follower = -1; |
| for (int i = 0; i < cluster_->num_masters(); i++) { |
| if (i != leader_master_idx) { |
| follower = i; |
| break; |
| } |
| } |
| if (follower == -1) { |
| return Status::NotFound("No follower master found"); |
| } |
| *follower_master_idx = follower; |
| return Status::OK(); |
| } |
| |
| // Builds and returns ExternalDaemonOptions used to add master with address 'rpc_addr' and |
| // 'master_idx' which helps with naming master's directories. |
| // Output 'kerberos_env_vars' parameter must be supplied for a secure cluster i.e. when Kerberos |
| // is enabled and returns Kerberos related environment variables. |
| Status BuildMasterOpts(int master_idx, const HostPort& rpc_addr, |
| ExternalDaemonOptions* master_opts, |
| EnvVars* kerberos_env_vars = nullptr) { |
| // Start with an existing master daemon's options, but modify them for use in a new master. |
| const string new_master_id = Substitute("master-$0", master_idx); |
| ExternalDaemonOptions opts = cluster_->master(0)->opts(); |
| opts.rpc_bind_address = rpc_addr; |
| opts.wal_dir = cluster_->GetWalPath(new_master_id); |
| opts.data_dirs = cluster_->GetDataPaths(new_master_id); |
| opts.log_dir = cluster_->GetLogPath(new_master_id); |
| if (opts_.enable_kerberos) { |
| CHECK(kerberos_env_vars); |
| vector<string> flags; |
| RETURN_NOT_OK(cluster::ExternalDaemon::CreateKerberosConfig( |
| cluster_->kdc(), opts_.principal, rpc_addr.host(), &flags, kerberos_env_vars)); |
| // Inserting the Kerberos related flags at the end will override flags from the master |
| // which was used as a basis for the new master. |
| opts.extra_flags.insert(opts.extra_flags.end(), std::make_move_iterator(flags.begin()), |
| std::make_move_iterator(flags.end())); |
| } |
| *master_opts = std::move(opts); |
| return Status::OK(); |
| } |
| |
| // Adds the specified master to the cluster using the CLI tool. |
| // Unset 'rpc_bind_address' in 'opts' can be used to indicate to not supply master address. |
| // Optional 'env_vars' can be used to set environment variables while running kudu tool. |
| // Optional 'wait_secs' can be used to supply wait timeout to the master add CLI tool. |
| // Optional 'kudu_binary' can be used to supply the path to the kudu binary. |
| // Returns generic RuntimeError() on failure with the actual error in the optional 'err' |
| // output parameter. |
| Status AddMasterToClusterUsingCLITool(const ExternalDaemonOptions& opts, string* err = nullptr, |
| EnvVars env_vars = {}, int wait_secs = 10, |
| const string& kudu_binary = "") { |
| auto hps = cluster_->master_rpc_addrs(); |
| vector<string> addresses; |
| addresses.reserve(hps.size()); |
| for (const auto& hp : hps) { |
| addresses.emplace_back(hp.ToString()); |
| } |
| |
| vector<string> cmd = {"master", "add", JoinStrings(addresses, ",")}; |
| if (opts.rpc_bind_address.Initialized()) { |
| cmd.emplace_back(opts.rpc_bind_address.ToString()); |
| } |
| |
| vector<string> new_master_flags = ExternalMaster::GetMasterFlags(opts); |
| cmd.insert(cmd.end(), std::make_move_iterator(new_master_flags.begin()), |
| std::make_move_iterator(new_master_flags.end())); |
| cmd.insert(cmd.end(), opts.extra_flags.begin(), opts.extra_flags.end()); |
| |
| if (wait_secs != 0) { |
| cmd.emplace_back("-wait_secs=" + std::to_string(wait_secs)); |
| } |
| if (!kudu_binary.empty()) { |
| cmd.emplace_back("-kudu_abs_path=" + kudu_binary); |
| } |
| |
| RETURN_NOT_OK(env_util::CreateDirsRecursively(Env::Default(), opts.log_dir)); |
| Status s = tools::RunKuduTool(cmd, nullptr, err, "", std::move(env_vars)); |
| if (!s.ok() && err != nullptr) { |
| LOG(INFO) << "Add master stderr: " << *err; |
| } |
| RETURN_NOT_OK(s); |
| return Status::OK(); |
| } |
| |
| // Removes the specified master from the cluster using the CLI tool. |
| // Unset 'master_to_remove' can be used to indicate to not supply master address. |
| // Returns generic RuntimeError() on failure with the actual error in the optional 'err' |
| // output parameter. |
| Status RemoveMasterFromClusterUsingCLITool(const HostPort& master_to_remove, |
| string* err = nullptr, |
| const string& master_uuid = "") { |
| auto hps = cluster_->master_rpc_addrs(); |
| vector<string> addresses; |
| addresses.reserve(hps.size()); |
| for (const auto& hp : hps) { |
| addresses.emplace_back(hp.ToString()); |
| } |
| |
| vector<string> args = {"master", "remove", JoinStrings(addresses, ",")}; |
| if (master_to_remove.Initialized()) { |
| args.push_back(master_to_remove.ToString()); |
| } |
| if (!master_uuid.empty()) { |
| args.push_back("--master_uuid=" + master_uuid); |
| } |
| RETURN_NOT_OK(tools::RunKuduTool(args, nullptr, err)); |
| return cluster_->RemoveMaster(master_to_remove); |
| } |
| |
| // Fetch consensus state of the leader master. |
| void GetLeaderMasterConsensusState(RaftConfigPB* consensus_config) { |
| int leader_master_idx; |
| ASSERT_OK(cluster_->GetLeaderMasterIndex(&leader_master_idx)); |
| auto leader_master_addr = cluster_->master(leader_master_idx)->bound_rpc_addr(); |
| ConsensusServiceProxy consensus_proxy(cluster_->messenger(), leader_master_addr, |
| leader_master_addr.host()); |
| consensus::GetConsensusStateRequestPB req; |
| consensus::GetConsensusStateResponsePB resp; |
| RpcController rpc; |
| req.set_dest_uuid(cluster_->master(leader_master_idx)->uuid()); |
| req.set_report_health(consensus::INCLUDE_HEALTH_REPORT); |
| ASSERT_OK(consensus_proxy.GetConsensusState(req, &resp, &rpc)); |
| ASSERT_FALSE(resp.has_error()); |
| ASSERT_EQ(1, resp.tablets_size()); |
| |
| // Lookup the new_master from the consensus state of the system catalog. |
| const auto& sys_catalog = resp.tablets(0); |
| ASSERT_EQ(master::SysCatalogTable::kSysCatalogTabletId, sys_catalog.tablet_id()); |
| const auto& cstate = sys_catalog.cstate(); |
| *consensus_config = cstate.has_pending_config() ? |
| cstate.pending_config() : cstate.committed_config(); |
| } |
| |
| void VerifyDeadMasterInSpecifiedState(const string& dead_master_uuid, |
| HealthReportPB::HealthStatus expected_state) { |
| RaftConfigPB config; |
| NO_FATALS(GetLeaderMasterConsensusState(&config)); |
| ASSERT_EQ(orig_num_masters_, config.peers_size()); |
| bool dead_master_found = false; |
| for (const auto& peer : config.peers()) { |
| if (peer.permanent_uuid() == dead_master_uuid) { |
| dead_master_found = true; |
| ASSERT_EQ(expected_state, peer.health_report().overall_health()); |
| break; |
| } |
| } |
| ASSERT_TRUE(dead_master_found); |
| } |
| |
| // Verification steps after the new master has been added successfully and it's promoted |
| // as VOTER. The supplied 'master_hps' includes the new_master as well. |
| void VerifyClusterAfterMasterAddition(const vector<HostPort>& master_hps, |
| int expected_num_masters) { |
| // Collect information about the cluster for verification later before shutting |
| // it down. |
| UnorderedHostPortSet master_hps_set(master_hps.begin(), master_hps.end()); |
| ASSERT_EQ(master_hps.size(), master_hps_set.size()) << "Duplicates found in master_hps"; |
| |
| unordered_set<string> master_uuids; |
| for (int i = 0; i < cluster_->num_masters(); i++) { |
| master_uuids.emplace(cluster_->master(i)->uuid()); |
| } |
| string new_master_uuid; |
| ASSERT_OK(GetFsUuid(new_master_opts_.wal_dir, new_master_opts_.data_dirs, &new_master_uuid)); |
| master_uuids.emplace(new_master_uuid); |
| |
| // Shutdown the cluster and the new master daemon process. |
| // This allows ExternalMiniCluster to manage the newly added master and allows |
| // client to connect to the new master if it's elected the leader. |
| LOG(INFO) << "Shutting down the old cluster"; |
| cluster_.reset(); |
| |
| LOG(INFO) << "Bringing up the migrated cluster"; |
| opts_.num_masters = expected_num_masters; |
| opts_.master_rpc_addresses = master_hps; |
| ExternalMiniCluster migrated_cluster(opts_); |
| ASSERT_OK(migrated_cluster.Start()); |
| for (int i = 0; i < migrated_cluster.num_masters(); i++) { |
| ASSERT_OK(migrated_cluster.master(i)->WaitForCatalogManager()); |
| } |
| |
| // Verify the cluster still has the same masters. |
| { |
| ListMastersResponsePB resp; |
| ASSERT_OK(RunListMasters(&resp, &migrated_cluster)); |
| ASSERT_EQ(expected_num_masters, resp.masters_size()); |
| |
| UnorderedHostPortSet hps_found; |
| unordered_set<string> uuids_found; |
| for (const auto& master : resp.masters()) { |
| ASSERT_EQ(RaftPeerPB::VOTER, master.member_type()); |
| ASSERT_TRUE(master.role() == RaftPeerPB::LEADER || master.role() == RaftPeerPB::FOLLOWER); |
| ASSERT_EQ(1, master.registration().rpc_addresses_size()); |
| HostPort actual_hp = HostPortFromPB(master.registration().rpc_addresses(0)); |
| ASSERT_TRUE(ContainsKey(master_hps_set, actual_hp)); |
| hps_found.insert(actual_hp); |
| ASSERT_TRUE(ContainsKey(master_uuids, master.instance_id().permanent_uuid())); |
| uuids_found.emplace(master.instance_id().permanent_uuid()); |
| } |
| ASSERT_EQ(master_hps_set, hps_found); |
| ASSERT_EQ(master_uuids, uuids_found); |
| } |
| |
| // Transfer leadership to the new master. |
| LOG(INFO) << "Transferring leadership to master: " << new_master_uuid; |
| NO_FATALS(TransferMasterLeadership(&migrated_cluster, new_master_uuid)); |
| |
| shared_ptr<KuduClient> client; |
| ASSERT_OK(migrated_cluster.CreateClient(nullptr, &client)); |
| |
| ClusterVerifier cv(&migrated_cluster); |
| NO_FATALS(cv.CheckCluster()); |
| LOG(INFO) << "Verifying the first table"; |
| NO_FATALS(cv.CheckRowCount(kTableName, ClusterVerifier::EXACTLY, 0)); |
| |
| LOG(INFO) << "Creating and verifying the second table"; |
| // Perform an operation that requires replication to masters. |
| ASSERT_OK(CreateTable(&migrated_cluster, "second_table")); |
| NO_FATALS(cv.CheckRowCount("second_table", ClusterVerifier::EXACTLY, 0)); |
| |
| // Pause one master at a time and create table at the same time which will allow |
| // new leader to be elected if the paused master is a leader. |
| // Need at least 3 masters to form consensus and elect a new leader. |
| if (expected_num_masters >= 3) { |
| LOG(INFO) << "Pausing and resuming individual masters"; |
| string table_name = kTableName; |
| for (int i = 0; i < expected_num_masters; i++) { |
| auto* master = migrated_cluster.master(i); |
| LOG(INFO) << Substitute("Pausing master $0, $1", master->uuid(), |
| master->bound_rpc_hostport().ToString()); |
| ASSERT_OK(master->Pause()); |
| cluster::ScopedResumeExternalDaemon resume_daemon(master); |
| |
| // We can run into table not found error in cases where the |
| // previously paused master that's leader of prior term resumes |
| // and the up to date follower doesn't become leader and the resumed |
| // master from previous term isn't up to date. See KUDU-3266 for details. |
| ASSERT_EVENTUALLY([&] { |
| NO_FATALS(cv.CheckRowCount(table_name, ClusterVerifier::EXACTLY, 0)); |
| }); |
| |
| // See MasterFailoverTest.TestCreateTableSync to understand why we must |
| // check for IsAlreadyPresent as well. |
| table_name = Substitute("table-$0", i); |
| Status s = CreateTable(&migrated_cluster, table_name); |
| ASSERT_TRUE(s.ok() || s.IsAlreadyPresent()); |
| } |
| } |
| } |
| |
| // Function to prevent leadership changes among masters for quick tests. |
| void DisableMasterLeadershipTransfer() { |
| for (int i = 0 ; i < cluster_->num_masters(); i++) { |
| // Starting the cluster with following flag leads to a case sometimes |
| // wherein no leader gets elected leading to failure in ConnectToMaster() RPC. |
| // So instead set the flag after the cluster is running. |
| ASSERT_OK(cluster_->SetFlag(cluster_->master(i), |
| "leader_failure_max_missed_heartbeat_periods", "10.0")); |
| } |
| } |
| |
| // Helper function for recovery tests that shuts down and deletes state of a follower master |
| // and removes it from the Raft configuration. |
| // Output parameters: |
| // dead_master_idx: Index of the follower master selected for recovery in the |
| // ExternalMiniCluster |
| // dead_master_hp: HostPort of the follower master selected for recovery |
| // src_master_hp: HostPort of an existing master that can be used as a source for recovery |
| void FailAndRemoveFollowerMaster(const vector<HostPort>& master_hps, int* dead_master_idx, |
| HostPort* dead_master_hp, HostPort* src_master_hp) { |
| // We'll be selecting a follower master to be shutdown to simulate a dead master |
| // and to prevent the test from being flaky we disable master leadership change. |
| NO_FATALS(DisableMasterLeadershipTransfer()); |
| |
| int master_to_recover_idx = -1; |
| ASSERT_OK(GetFollowerMasterIndex(&master_to_recover_idx)); |
| ASSERT_NE(master_to_recover_idx, -1); |
| scoped_refptr<ExternalMaster> master_to_recover(cluster_->master(master_to_recover_idx)); |
| const auto master_to_recover_hp = master_to_recover->bound_rpc_hostport(); |
| |
| LOG(INFO) << Substitute("Shutting down and deleting the state of the master to be recovered " |
| "HostPort: $0, UUID: $1, index : $2", master_to_recover_hp.ToString(), |
| master_to_recover->uuid(), master_to_recover_idx); |
| |
| NO_FATALS(master_to_recover->Shutdown()); |
| ASSERT_OK(master_to_recover->DeleteFromDisk()); |
| |
| LOG(INFO) << "Detecting transition to terminal FAILED state"; |
| ASSERT_EVENTUALLY([&] { |
| VerifyDeadMasterInSpecifiedState(master_to_recover->uuid(), HealthReportPB::FAILED); |
| }); |
| |
| // Verify the master to be removed is part of the list of masters. |
| ASSERT_NE(std::find(master_hps.begin(), master_hps.end(), master_to_recover_hp), |
| master_hps.end()); |
| |
| // Source master to copy system catalog out of at least 2 masters in the cluster. |
| // Recovery tests start with at least 2 masters. |
| // Need to capture the source master before removing master as once the master |
| // is removed it won't be tracked by ExternalMiniCluster and the indices would change. |
| ASSERT_GE(cluster_->num_masters(), 2); |
| int src_master_idx = master_to_recover_idx == 0 ? 1 : 0; |
| *src_master_hp = cluster_->master(src_master_idx)->bound_rpc_hostport(); |
| |
| ASSERT_OK(RemoveMasterFromClusterUsingCLITool(master_to_recover_hp)); |
| |
| // Verify we have one master less and the desired master was removed. |
| vector<HostPort> updated_master_hps; |
| NO_FATALS(VerifyVoterMasters(orig_num_masters_ - 1, &updated_master_hps)); |
| UnorderedHostPortSet expected_master_hps(master_hps.begin(), master_hps.end()); |
| expected_master_hps.erase(master_to_recover_hp); |
| UnorderedHostPortSet actual_master_hps(updated_master_hps.begin(), updated_master_hps.end()); |
| ASSERT_EQ(expected_master_hps, actual_master_hps); |
| |
| ClusterVerifier cv(cluster_.get()); |
| NO_FATALS(cv.CheckCluster()); |
| NO_FATALS(cv.CheckRowCount(kTableName, ClusterVerifier::EXACTLY, 0)); |
| |
| // Set the remaining output parameters. |
| *dead_master_idx = master_to_recover_idx; |
| *dead_master_hp = master_to_recover_hp; |
| } |
| |
| // Tracks the current number of masters in the cluster |
| int orig_num_masters_; |
| ExternalMiniClusterOptions opts_; |
| unique_ptr<ExternalMiniCluster> cluster_; |
| |
| // Socket, HostPort, proxy etc. for the new master to be added |
| unique_ptr<Socket> reserved_socket_; |
| Sockaddr reserved_addr_; |
| HostPort reserved_hp_; |
| ExternalDaemonOptions new_master_opts_; |
| |
| static const char* const kTableName; |
| }; |
| |
| const char* const DynamicMultiMasterTest::kTableName = "first_table"; |
| |
| // Parameterized DynamicMultiMasterTest class that works with different initial number of masters |
| // and secure/unsecure clusters. |
| class ParameterizedAddMasterTest : public DynamicMultiMasterTest, |
| public ::testing::WithParamInterface<tuple<int, bool>> { |
| public: |
| void SetUp() override { |
| NO_FATALS(SetUpWithNumMasters(std::get<0>(GetParam()))); |
| opts_.enable_kerberos = std::get<1>(GetParam()); |
| } |
| }; |
| |
| INSTANTIATE_TEST_SUITE_P(, ParameterizedAddMasterTest, |
| ::testing::Combine( |
| // Initial number of masters in the cluster before adding a new |
| // master |
| ::testing::Values(1, 2), |
| // Whether Kerberos is enabled |
| ::testing::Bool())); |
| |
| // This test starts a cluster, creates a table and then adds a new master. |
| // For a system catalog with little data, the new master can be caught up from WAL and |
| // promoted to a VOTER without requiring tablet copy. |
| TEST_P(ParameterizedAddMasterTest, TestAddMasterCatchupFromWAL) { |
| SKIP_IF_SLOW_NOT_ALLOWED(); |
| |
| NO_FATALS(StartCluster()); |
| |
| // Verify that masters are running as VOTERs and collect their addresses to be used |
| // for starting the new master. |
| vector<HostPort> master_hps; |
| NO_FATALS(VerifyVoterMasters(orig_num_masters_, &master_hps)); |
| |
| ASSERT_OK(CreateTable(cluster_.get(), kTableName)); |
| |
| // Add new master to the cluster. |
| { |
| string err; |
| EnvVars env_vars; |
| ASSERT_OK(BuildMasterOpts(orig_num_masters_, reserved_hp_, &new_master_opts_, &env_vars)); |
| ASSERT_OK(AddMasterToClusterUsingCLITool(new_master_opts_, &err, std::move(env_vars), |
| 4 /* wait_secs */, tools::GetKuduToolAbsolutePath())); |
| ASSERT_STR_CONTAINS(err, Substitute("Master $0 successfully caught up from WAL.", |
| new_master_opts_.rpc_bind_address.ToString())); |
| } |
| |
| // Adding the same master again should print a message but not throw an error. |
| { |
| string err; |
| ExternalDaemonOptions opts; |
| EnvVars env_vars; |
| ASSERT_OK(BuildMasterOpts(orig_num_masters_, reserved_hp_, &opts, &env_vars)); |
| ASSERT_OK(AddMasterToClusterUsingCLITool(opts, &err, std::move(env_vars))); |
| ASSERT_STR_CONTAINS(err, "Master already present"); |
| } |
| |
| // Adding one of the running former masters will throw an error. |
| { |
| string err; |
| const auto& hp = master_hps[0]; |
| ExternalDaemonOptions opts; |
| EnvVars env_vars; |
| ASSERT_OK(BuildMasterOpts(0, hp, &opts, &env_vars)); |
| Status s = AddMasterToClusterUsingCLITool(opts, &err, std::move(env_vars)); |
| ASSERT_TRUE(s.IsRuntimeError()) << s.ToString(); |
| ASSERT_STR_CONTAINS(err, Substitute("Master $0 already running", hp.ToString())); |
| } |
| |
| master_hps.emplace_back(reserved_hp_); |
| NO_FATALS(VerifyClusterAfterMasterAddition(master_hps, orig_num_masters_ + 1)); |
| } |
| |
| // This test goes through the workflow required to copy system catalog to the newly added master. |
| TEST_P(ParameterizedAddMasterTest, TestAddMasterSysCatalogCopy) { |
| SKIP_IF_SLOW_NOT_ALLOWED(); |
| |
| vector<HostPort> master_hps; |
| NO_FATALS(StartClusterWithSysCatalogGCed(&master_hps)); |
| |
| ASSERT_OK(CreateTable(cluster_.get(), kTableName)); |
| |
| // Add new master to the cluster. |
| { |
| string err; |
| EnvVars env_vars; |
| ASSERT_OK(BuildMasterOpts(orig_num_masters_, reserved_hp_, &new_master_opts_, &env_vars)); |
| ASSERT_OK(AddMasterToClusterUsingCLITool(new_master_opts_, &err, std::move(env_vars), |
| 4 /* wait_secs */)); |
| ASSERT_STR_CONTAINS(err, Substitute("Master $0 could not be caught up from WAL.", |
| reserved_hp_.ToString())); |
| ASSERT_STR_CONTAINS(err, "Successfully copied system catalog and new master is healthy"); |
| } |
| |
| // Adding the same master again should print a message but not throw an error. |
| { |
| string err; |
| ExternalDaemonOptions opts; |
| EnvVars env_vars; |
| ASSERT_OK(BuildMasterOpts(orig_num_masters_, reserved_hp_, &opts, &env_vars)); |
| ASSERT_OK(AddMasterToClusterUsingCLITool(opts, &err, std::move(env_vars))); |
| ASSERT_STR_CONTAINS(err, "Master already present"); |
| } |
| |
| // Adding one of the running former masters will throw an error. |
| { |
| string err; |
| const auto& hp = master_hps[0]; |
| ExternalDaemonOptions opts; |
| EnvVars env_vars; |
| ASSERT_OK(BuildMasterOpts(0, hp, &opts, &env_vars)); |
| Status s = AddMasterToClusterUsingCLITool(opts, &err, std::move(env_vars)); |
| ASSERT_TRUE(s.IsRuntimeError()) << s.ToString(); |
| ASSERT_STR_CONTAINS(err, Substitute("Master $0 already running", hp.ToString())); |
| } |
| |
| master_hps.emplace_back(reserved_hp_); |
| NO_FATALS(VerifyClusterAfterMasterAddition(master_hps, orig_num_masters_ + 1)); |
| } |
| |
| |
| class ParameterizedRemoveMasterTest : public DynamicMultiMasterTest, |
| public ::testing::WithParamInterface<tuple<int, bool>> { |
| public: |
| void SetUp() override { |
| NO_FATALS(SetUpWithNumMasters(std::get<0>(GetParam()))); |
| } |
| }; |
| |
| INSTANTIATE_TEST_SUITE_P(, ParameterizedRemoveMasterTest, |
| ::testing::Combine( |
| // Initial number of masters in the cluster before removing a master |
| ::testing::Values(2, 3), |
| // Whether the master to be removed is dead/shutdown |
| ::testing::Bool())); |
| |
| // Tests removing a non-leader master from the cluster. |
| TEST_P(ParameterizedRemoveMasterTest, TestRemoveMaster) { |
| NO_FATALS(StartCluster({// Keeping RPC timeouts short to quickly detect downed servers. |
| // This will put the health status into an UNKNOWN state until the point |
| // where they are considered FAILED. |
| "--consensus_rpc_timeout_ms=2000", |
| "--follower_unavailable_considered_failed_sec=4"})); |
| |
| // Verify that existing masters are running as VOTERs. |
| vector<HostPort> master_hps; |
| NO_FATALS(VerifyVoterMasters(orig_num_masters_, &master_hps)); |
| |
| // When an ExternalMiniCluster is restarted after removal of a master then one of the |
| // remaining masters can get reassigned to the same wal dir which was previously assigned |
| // to the removed master. This causes problems during verification, so we always try to |
| // remove the last master in terms of index for test purposes. |
| int leader_master_idx = -1; |
| ASSERT_OK(cluster_->GetLeaderMasterIndex(&leader_master_idx)); |
| ASSERT_NE(leader_master_idx, -1); |
| const int non_leader_master_idx = orig_num_masters_ - 1; |
| if (leader_master_idx == non_leader_master_idx) { |
| // Move the leader to the first master index |
| auto first_master_uuid = cluster_->master(0)->uuid(); |
| LOG(INFO) << "Transferring leadership to master: " << first_master_uuid; |
| NO_FATALS(TransferMasterLeadership(cluster_.get(), first_master_uuid)); |
| } |
| ASSERT_OK(cluster_->GetLeaderMasterIndex(&leader_master_idx)); |
| ASSERT_NE(leader_master_idx, non_leader_master_idx); |
| const auto master_to_remove = cluster_->master(non_leader_master_idx)->bound_rpc_hostport(); |
| const auto master_to_remove_uuid = cluster_->master(non_leader_master_idx)->uuid(); |
| |
| // A NO_OP operation is issued after assuming leadership so that ChangeConfig operation |
| // can be issued against the new leader in the current term. |
| // Don't know of a good way to wait/verify that the NO_OP operation has completed. Table |
| // creation helps with a new operation in the current term and is used later for verification. |
| // Hence creating a table after possible master leadership transfer and before initiating remove |
| // master ChangeConfig request. |
| ASSERT_OK(CreateTable(cluster_.get(), kTableName)); |
| |
| bool shutdown = std::get<1>(GetParam()); |
| if (shutdown) { |
| LOG(INFO) << "Shutting down the master to be removed"; |
| cluster_->master(non_leader_master_idx)->Shutdown(); |
| LOG(INFO) << "Detecting transition to terminal FAILED state"; |
| ASSERT_EVENTUALLY([&] { |
| VerifyDeadMasterInSpecifiedState(master_to_remove_uuid, HealthReportPB::FAILED); |
| }); |
| } |
| |
| // Verify the master to be removed is part of the list of masters. |
| ASSERT_NE(std::find(master_hps.begin(), master_hps.end(), master_to_remove), master_hps.end()); |
| ASSERT_OK(RemoveMasterFromClusterUsingCLITool(master_to_remove, nullptr, master_to_remove_uuid)); |
| |
| // Verify we have one master less and the desired master was removed. |
| vector<HostPort> updated_master_hps; |
| NO_FATALS(VerifyVoterMasters(orig_num_masters_ - 1, &updated_master_hps)); |
| UnorderedHostPortSet expected_master_hps(master_hps.begin(), master_hps.end()); |
| expected_master_hps.erase(master_to_remove); |
| UnorderedHostPortSet actual_master_hps(updated_master_hps.begin(), updated_master_hps.end()); |
| ASSERT_EQ(expected_master_hps, actual_master_hps); |
| |
| ClusterVerifier cv(cluster_.get()); |
| NO_FATALS(cv.CheckCluster()); |
| NO_FATALS(cv.CheckRowCount(kTableName, ClusterVerifier::EXACTLY, 0)); |
| |
| // Removing the same master again should result in an error |
| string err; |
| Status s = RemoveMasterFromClusterUsingCLITool(master_to_remove, &err, master_to_remove_uuid); |
| ASSERT_TRUE(s.IsRuntimeError()) << s.ToString(); |
| ASSERT_STR_CONTAINS(err, Substitute("Master $0 not found", master_to_remove.ToString())); |
| |
| // Attempt transferring leadership to the removed master |
| LOG(INFO) << "Transferring leadership to master: " << master_to_remove_uuid; |
| s = TransferMasterLeadershipAsync(cluster_.get(), master_to_remove_uuid); |
| ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString(); |
| ASSERT_STR_CONTAINS(s.ToString(), |
| Substitute("tablet server $0 is not a voter in the active config", |
| master_to_remove_uuid)); |
| |
| LOG(INFO) << "Shutting down the old cluster"; |
| cluster_.reset(); |
| |
| LOG(INFO) << "Bringing up the migrated cluster"; |
| opts_.num_masters = orig_num_masters_ - 1; |
| opts_.master_rpc_addresses = updated_master_hps; |
| ExternalMiniCluster migrated_cluster(opts_); |
| ASSERT_OK(migrated_cluster.Start()); |
| for (int i = 0; i < migrated_cluster.num_masters(); i++) { |
| ASSERT_OK(migrated_cluster.master(i)->WaitForCatalogManager()); |
| } |
| |
| vector<HostPort> migrated_master_hps; |
| NO_FATALS(VerifyVoterMasters(orig_num_masters_ - 1, &migrated_master_hps, |
| &migrated_cluster)); |
| actual_master_hps.clear(); |
| actual_master_hps.insert(migrated_master_hps.begin(), migrated_master_hps.end()); |
| ASSERT_EQ(expected_master_hps, actual_master_hps); |
| |
| ClusterVerifier mcv(&migrated_cluster); |
| NO_FATALS(mcv.CheckCluster()); |
| NO_FATALS(mcv.CheckRowCount(kTableName, ClusterVerifier::EXACTLY, 0)); |
| } |
| |
| |
| class ParameterizedRecoverMasterTest : public DynamicMultiMasterTest, |
| public ::testing::WithParamInterface<int> { |
| public: |
| void SetUp() override { |
| NO_FATALS(SetUpWithNumMasters(GetParam())); |
| } |
| }; |
| |
| INSTANTIATE_TEST_SUITE_P(, ParameterizedRecoverMasterTest, |
| // Number of masters in a cluster |
| ::testing::Values(2, 3)); |
| |
| // Tests recovering a dead master at the same HostPort without explicit system catalog copy |
| TEST_P(ParameterizedRecoverMasterTest, TestRecoverDeadMasterCatchupfromWAL) { |
| SKIP_IF_SLOW_NOT_ALLOWED(); |
| |
| NO_FATALS(StartCluster({// Keeping RPC timeouts short to quickly detect downed servers. |
| // This will put the health status into an UNKNOWN state until the point |
| // where they are considered FAILED. |
| "--consensus_rpc_timeout_ms=2000", |
| "--follower_unavailable_considered_failed_sec=4"})); |
| |
| // Verify that existing masters are running as VOTERs. |
| vector<HostPort> master_hps; |
| NO_FATALS(VerifyVoterMasters(orig_num_masters_, &master_hps)); |
| |
| ASSERT_OK(CreateTable(cluster_.get(), kTableName)); |
| |
| int master_to_recover_idx = -1; |
| HostPort master_to_recover_hp; |
| HostPort src_master_hp; |
| NO_FATALS(FailAndRemoveFollowerMaster(master_hps, &master_to_recover_idx, &master_to_recover_hp, |
| &src_master_hp)); |
| NO_FATALS(VerifyVoterMasters(orig_num_masters_ - 1)); |
| |
| // Add new master at the same HostPort |
| { |
| string err; |
| ASSERT_OK(BuildMasterOpts(master_to_recover_idx, master_to_recover_hp, &new_master_opts_)); |
| ASSERT_OK(AddMasterToClusterUsingCLITool(new_master_opts_, &err)); |
| ASSERT_STR_CONTAINS(err, Substitute("Master $0 successfully caught up from WAL.", |
| new_master_opts_.rpc_bind_address.ToString())); |
| } |
| |
| NO_FATALS(VerifyClusterAfterMasterAddition(master_hps, orig_num_masters_)); |
| } |
| |
| // Tests recovering a dead master at the same HostPort with explicit system catalog copy |
| TEST_P(ParameterizedRecoverMasterTest, TestRecoverDeadMasterSysCatalogCopy) { |
| SKIP_IF_SLOW_NOT_ALLOWED(); |
| |
| vector<HostPort> master_hps; |
| NO_FATALS(StartClusterWithSysCatalogGCed( |
| &master_hps, |
| // Keeping RPC timeouts short to quickly detect downed servers. |
| // This will put the health status into an UNKNOWN state until the point |
| // where they are considered FAILED. |
| {"--consensus_rpc_timeout_ms=2000", |
| "--follower_unavailable_considered_failed_sec=4"})); |
| |
| // Verify that existing masters are running as VOTERs. |
| NO_FATALS(VerifyVoterMasters(orig_num_masters_)); |
| |
| ASSERT_OK(CreateTable(cluster_.get(), kTableName)); |
| |
| int master_to_recover_idx = -1; |
| HostPort master_to_recover_hp; |
| HostPort src_master_hp; |
| NO_FATALS(FailAndRemoveFollowerMaster(master_hps, &master_to_recover_idx, &master_to_recover_hp, |
| &src_master_hp)); |
| NO_FATALS(VerifyVoterMasters(orig_num_masters_ - 1)); |
| |
| // Add new master at the same HostPort |
| string err; |
| ASSERT_OK(BuildMasterOpts(master_to_recover_idx, master_to_recover_hp, &new_master_opts_)); |
| ASSERT_OK(AddMasterToClusterUsingCLITool(new_master_opts_, &err)); |
| ASSERT_STR_CONTAINS(err, Substitute("Master $0 could not be caught up from WAL.", |
| master_to_recover_hp.ToString())); |
| ASSERT_STR_CONTAINS(err, "Successfully copied system catalog and new master is healthy"); |
| |
| NO_FATALS(VerifyClusterAfterMasterAddition(master_hps, orig_num_masters_)); |
| } |
| |
| // Test that brings up a single master cluster with 'last_known_addr' not populated by |
| // not specifying '--master_addresses' and then attempts to add a new master which is |
| // expected to fail due to invalid Raft config. |
| TEST_F(DynamicMultiMasterTest, TestAddMasterWithNoLastKnownAddr) { |
| NO_FATALS(SetUpWithNumMasters(1)); |
| NO_FATALS(StartCluster({}, false/* supply_single_master_addr */)); |
| |
| // Verify that existing masters are running as VOTERs. |
| NO_FATALS(VerifyVoterMasters(orig_num_masters_)); |
| |
| // Add master to the cluster. |
| string err; |
| ExternalDaemonOptions opts; |
| ASSERT_OK(BuildMasterOpts(orig_num_masters_, reserved_hp_, &opts)); |
| Status actual = AddMasterToClusterUsingCLITool(opts, &err); |
| ASSERT_TRUE(actual.IsRuntimeError()) << actual.ToString(); |
| ASSERT_STR_MATCHES(err, "'last_known_addr' field in single master Raft configuration not set. " |
| "Please restart master with --master_addresses flag"); |
| |
| // Verify no change in number of masters. |
| NO_FATALS(VerifyVoterMasters(orig_num_masters_)); |
| } |
| |
| // Test that attempts to add a new master without enabling the feature flag for master Raft |
| // change config. |
| TEST_F(DynamicMultiMasterTest, TestAddMasterFeatureFlagNotSpecified) { |
| NO_FATALS(SetUpWithNumMasters(1)); |
| NO_FATALS(StartCluster({ "--master_support_change_config=false" })); |
| |
| // Verify that existing masters are running as VOTERs. |
| NO_FATALS(VerifyVoterMasters(orig_num_masters_)); |
| |
| // Add master to the cluster. |
| string err; |
| ExternalDaemonOptions opts; |
| ASSERT_OK(BuildMasterOpts(orig_num_masters_, reserved_hp_, &opts)); |
| Status actual = AddMasterToClusterUsingCLITool(opts, &err); |
| ASSERT_TRUE(actual.IsRuntimeError()) << actual.ToString(); |
| ASSERT_STR_CONTAINS(err, "cluster does not support AddMaster " |
| "with feature(s) DYNAMIC_MULTI_MASTER"); |
| |
| // Verify no change in number of masters. |
| NO_FATALS(VerifyVoterMasters(orig_num_masters_)); |
| } |
| |
| // Test that attempts to remove an existing master without enabling the feature flag for master |
| // Raft change config. |
| TEST_F(DynamicMultiMasterTest, TestRemoveMasterFeatureFlagNotSpecified) { |
| NO_FATALS(SetUpWithNumMasters(2)); |
| NO_FATALS(StartCluster({"--master_support_change_config=false"})); |
| |
| // Verify that existing masters are running as VOTERs. |
| vector<HostPort> master_hps; |
| NO_FATALS(VerifyVoterMasters(orig_num_masters_, &master_hps)); |
| |
| // Try removing non-leader master. |
| { |
| int non_leader_master_idx = -1; |
| ASSERT_OK(GetFollowerMasterIndex(&non_leader_master_idx)); |
| auto master_to_remove = cluster_->master(non_leader_master_idx)->bound_rpc_hostport(); |
| string err; |
| Status s = RemoveMasterFromClusterUsingCLITool(master_to_remove, &err); |
| ASSERT_TRUE(s.IsRuntimeError()) << s.ToString(); |
| ASSERT_STR_CONTAINS(err, "cluster does not support RemoveMaster " |
| "with feature(s) DYNAMIC_MULTI_MASTER"); |
| } |
| |
| // Try removing leader master |
| { |
| int leader_master_idx = -1; |
| ASSERT_OK(cluster_->GetLeaderMasterIndex(&leader_master_idx)); |
| auto master_to_remove = cluster_->master(leader_master_idx)->bound_rpc_hostport(); |
| string err; |
| Status s = RemoveMasterFromClusterUsingCLITool(master_to_remove, &err); |
| ASSERT_TRUE(s.IsRuntimeError()) << s.ToString(); |
| ASSERT_STR_CONTAINS(err, "cluster does not support RemoveMaster " |
| "with feature(s) DYNAMIC_MULTI_MASTER"); |
| } |
| |
| // Verify no change in number of masters. |
| NO_FATALS(VerifyVoterMasters(orig_num_masters_, &master_hps)); |
| } |
| |
| // Test that attempts to request a non-leader master to add a new master. |
| TEST_F(DynamicMultiMasterTest, TestAddMasterToNonLeader) { |
| NO_FATALS(SetUpWithNumMasters(2)); |
| NO_FATALS(StartCluster()); |
| |
| // Verify that existing masters are running as VOTERs and collect their addresses to be used |
| // for starting the new master. |
| vector<HostPort> master_hps; |
| NO_FATALS(VerifyVoterMasters(orig_num_masters_, &master_hps)); |
| |
| // Bring up the new master and add to the cluster. |
| master_hps.emplace_back(reserved_hp_); |
| scoped_refptr<ExternalMaster> master; |
| NO_FATALS(StartNewMaster(master_hps, reserved_hp_, orig_num_masters_, &master)); |
| NO_FATALS(VerifyVoterMasters(orig_num_masters_)); |
| |
| // Verify sending add master request to a non-leader master returns NOT_THE_LEADER error. |
| // It's possible there is a leadership change between querying for leader master and |
| // sending the add master request to non-leader master and hence using ASSERT_EVENTUALLY. |
| ASSERT_EVENTUALLY([&] { |
| AddMasterRequestPB req; |
| AddMasterResponsePB resp; |
| RpcController rpc; |
| *req.mutable_rpc_addr() = HostPortToPB(reserved_hp_); |
| rpc.RequireServerFeature(MasterFeatures::DYNAMIC_MULTI_MASTER); |
| |
| int leader_master_idx; |
| ASSERT_OK(cluster_->GetLeaderMasterIndex(&leader_master_idx)); |
| ASSERT_TRUE(leader_master_idx == 0 || leader_master_idx == 1); |
| int non_leader_master_idx = !leader_master_idx; |
| ASSERT_OK(cluster_->master_proxy(non_leader_master_idx)->AddMaster(req, &resp, &rpc)); |
| ASSERT_TRUE(resp.has_error()); |
| ASSERT_EQ(MasterErrorPB::NOT_THE_LEADER, resp.error().code()); |
| }); |
| |
| // Verify no change in number of masters. |
| NO_FATALS(VerifyVoterMasters(orig_num_masters_)); |
| } |
| |
| // Test that attempts to request a non-leader master to remove a master. |
| TEST_F(DynamicMultiMasterTest, TestRemoveMasterToNonLeader) { |
| NO_FATALS(SetUpWithNumMasters(2)); |
| NO_FATALS(StartCluster()); |
| |
| // Verify that existing masters are running as VOTERs and collect their addresses to be used |
| // for starting the new master. |
| vector<HostPort> master_hps; |
| NO_FATALS(VerifyVoterMasters(orig_num_masters_, &master_hps)); |
| |
| // In test below we use the master RPC directly to the non-leader master and a retry |
| // will have unintended consequences hence disabling master leadership transfer. |
| NO_FATALS(DisableMasterLeadershipTransfer()); |
| |
| // Verify sending remove master request to a non-leader master returns NOT_THE_LEADER error. |
| RemoveMasterRequestPB req; |
| RemoveMasterResponsePB resp; |
| RpcController rpc; |
| |
| int leader_master_idx; |
| ASSERT_OK(cluster_->GetLeaderMasterIndex(&leader_master_idx)); |
| ASSERT_TRUE(leader_master_idx == 0 || leader_master_idx == 1); |
| int non_leader_master_idx = !leader_master_idx; |
| *req.mutable_rpc_addr() = HostPortToPB(cluster_->master(leader_master_idx)->bound_rpc_hostport()); |
| rpc.RequireServerFeature(MasterFeatures::DYNAMIC_MULTI_MASTER); |
| // Using the master proxy directly instead of using CLI as this test wants to test |
| // invoking RemoveMaster RPC to non-leader master. |
| ASSERT_OK(cluster_->master_proxy(non_leader_master_idx)->RemoveMaster(req, &resp, &rpc)); |
| ASSERT_TRUE(resp.has_error()); |
| ASSERT_EQ(MasterErrorPB::NOT_THE_LEADER, resp.error().code()); |
| |
| // Verify no change in number of masters. |
| NO_FATALS(VerifyVoterMasters(orig_num_masters_)); |
| } |
| |
| // Test that attempts to add a master with missing master address and a non-routable incorrect |
| // address. |
| TEST_F(DynamicMultiMasterTest, TestAddMasterMissingAndIncorrectAddress) { |
| NO_FATALS(SetUpWithNumMasters(1)); |
| NO_FATALS(StartCluster()); |
| |
| // Verify that existing masters are running as VOTERs. |
| NO_FATALS(VerifyVoterMasters(orig_num_masters_)); |
| |
| // Empty HostPort |
| { |
| string err; |
| ExternalDaemonOptions opts; |
| ASSERT_OK(BuildMasterOpts(orig_num_masters_, HostPort(), &opts)); |
| Status actual = AddMasterToClusterUsingCLITool(opts, &err); |
| ASSERT_TRUE(actual.IsRuntimeError()) << actual.ToString(); |
| ASSERT_STR_CONTAINS(err, "must provide positional argument master_address"); |
| } |
| |
| // Non-routable incorrect hostname. |
| { |
| string err; |
| ExternalDaemonOptions opts; |
| ASSERT_OK(BuildMasterOpts(orig_num_masters_, |
| HostPort("non-existent-path.local", Master::kDefaultPort), &opts)); |
| Status actual = AddMasterToClusterUsingCLITool(opts, &err); |
| ASSERT_TRUE(actual.IsRuntimeError()) << actual.ToString(); |
| ASSERT_STR_CONTAINS(err, "unable to resolve address for non-existent-path.local"); |
| } |
| |
| // Verify no change in number of masters. |
| NO_FATALS(VerifyVoterMasters(orig_num_masters_)); |
| } |
| |
| // Test that attempts to remove a master with missing master address and a non-existent |
| // hostname. |
| TEST_F(DynamicMultiMasterTest, TestRemoveMasterMissingAndIncorrectHostname) { |
| NO_FATALS(SetUpWithNumMasters(2)); |
| NO_FATALS(StartCluster()); |
| |
| // Verify that existing masters are running as VOTERs. |
| vector<HostPort> master_hps; |
| NO_FATALS(VerifyVoterMasters(orig_num_masters_, &master_hps)); |
| |
| // Empty HostPort. |
| { |
| string err; |
| Status actual = RemoveMasterFromClusterUsingCLITool(HostPort(), &err); |
| ASSERT_TRUE(actual.IsRuntimeError()) << actual.ToString(); |
| ASSERT_STR_CONTAINS(err, "must provide positional argument master_address"); |
| } |
| |
| // Non-existent hostname. |
| { |
| HostPort dummy_hp = HostPort("non-existent-path.local", Master::kDefaultPort); |
| string err; |
| Status actual = RemoveMasterFromClusterUsingCLITool(dummy_hp, &err); |
| ASSERT_TRUE(actual.IsRuntimeError()) << actual.ToString(); |
| ASSERT_STR_CONTAINS(err, Substitute("Master $0 not found", dummy_hp.ToString())); |
| } |
| |
| // Verify no change in number of masters. |
| NO_FATALS(VerifyVoterMasters(orig_num_masters_, &master_hps)); |
| } |
| |
| // Test that attempts to remove a master with mismatching hostname and uuid. |
| TEST_F(DynamicMultiMasterTest, TestRemoveMasterMismatchHostnameAndUuid) { |
| NO_FATALS(SetUpWithNumMasters(2)); |
| NO_FATALS(StartCluster()); |
| |
| // Verify that existing masters are running as VOTERs. |
| vector<HostPort> master_hps; |
| NO_FATALS(VerifyVoterMasters(orig_num_masters_, &master_hps)); |
| |
| // Master leadership transfer could result in a different error and hence disabling it. |
| NO_FATALS(DisableMasterLeadershipTransfer()); |
| |
| // Random uuid |
| Random rng(SeedRandom()); |
| auto random_uuid = std::to_string(rng.Next64()); |
| int non_leader_idx = -1; |
| ASSERT_OK(GetFollowerMasterIndex(&non_leader_idx)); |
| auto master_to_remove = cluster_->master(non_leader_idx)->bound_rpc_hostport(); |
| ASSERT_NE(random_uuid, cluster_->master(non_leader_idx)->uuid()); |
| string err; |
| Status actual = RemoveMasterFromClusterUsingCLITool(master_to_remove, &err, random_uuid); |
| ASSERT_TRUE(actual.IsRuntimeError()) << actual.ToString(); |
| ASSERT_STR_CONTAINS(err, |
| Substitute("Mismatch in UUID of the master $0 to be removed. " |
| "Expected: $1, supplied: $2.", master_to_remove.ToString(), |
| cluster_->master(non_leader_idx)->uuid(), random_uuid)); |
| |
| // Verify no change in number of masters. |
| NO_FATALS(VerifyVoterMasters(orig_num_masters_, &master_hps)); |
| } |
| |
| // Test that attempts to add a master with non-existent kudu executable path. |
| TEST_F(DynamicMultiMasterTest, TestAddMasterIncorrectKuduBinary) { |
| NO_FATALS(SetUpWithNumMasters(1)); |
| NO_FATALS(StartCluster()); |
| |
| // Verify that existing masters are running as VOTERs. |
| NO_FATALS(VerifyVoterMasters(orig_num_masters_)); |
| |
| // Add master to the cluster. |
| string err; |
| string kudu_abs_path = "/tmp/path/to/nowhere"; |
| ExternalDaemonOptions opts; |
| ASSERT_OK(BuildMasterOpts(orig_num_masters_, reserved_hp_, &opts)); |
| Status actual = AddMasterToClusterUsingCLITool(opts, &err, {} /* env_vars */, 4, kudu_abs_path); |
| ASSERT_TRUE(actual.IsRuntimeError()) << actual.ToString(); |
| ASSERT_STR_CONTAINS(err, Substitute("kudu binary not found at $0", kudu_abs_path)); |
| |
| // Verify no change in number of masters. |
| NO_FATALS(VerifyVoterMasters(orig_num_masters_)); |
| } |
| |
| // Test that attempts removing a leader master itself from a cluster with |
| // 1 or 2 masters. |
| class ParameterizedRemoveLeaderMasterTest : public DynamicMultiMasterTest, |
| public ::testing::WithParamInterface<int> { |
| public: |
| void SetUp() override { |
| NO_FATALS(SetUpWithNumMasters(GetParam())); |
| } |
| }; |
| |
| INSTANTIATE_TEST_SUITE_P(, ParameterizedRemoveLeaderMasterTest, |
| ::testing::Values(1, 2)); |
| |
| TEST_P(ParameterizedRemoveLeaderMasterTest, TestRemoveLeaderMaster) { |
| NO_FATALS(StartCluster()); |
| |
| // Verify that existing masters are running as VOTERs and collect their addresses to be used |
| // for starting the new master. |
| vector<HostPort> master_hps; |
| NO_FATALS(VerifyVoterMasters(orig_num_masters_, &master_hps)); |
| |
| // In test below a retry in case of master leadership transfer will have unintended |
| // consequences and hence disabling master leadership transfer. |
| NO_FATALS(DisableMasterLeadershipTransfer()); |
| |
| int leader_master_idx; |
| ASSERT_OK(cluster_->GetLeaderMasterIndex(&leader_master_idx)); |
| const auto master_to_remove = cluster_->master(leader_master_idx)->bound_rpc_hostport(); |
| string err; |
| Status s = RemoveMasterFromClusterUsingCLITool(master_to_remove, &err); |
| ASSERT_TRUE(s.IsRuntimeError()) << s.ToString(); |
| if (orig_num_masters_ == 1) { |
| ASSERT_STR_CONTAINS(err, Substitute("Can't remove master $0 in a single master " |
| "configuration", master_to_remove.ToString())); |
| } else { |
| ASSERT_GT(orig_num_masters_, 1); |
| ASSERT_STR_CONTAINS(err, Substitute("Can't remove the leader master $0", |
| master_to_remove.ToString())); |
| } |
| |
| // Verify no change in number of masters. |
| NO_FATALS(VerifyVoterMasters(orig_num_masters_, &master_hps)); |
| } |
| |
| struct MultiMasterClusterArgs { |
| int orig_num_masters; |
| bool is_secure; |
| }; |
| |
| class AutoAddMasterTest : public KuduTest { |
| public: |
| Status SetUpWithTestArgs(const MultiMasterClusterArgs& args) { |
| opts_.num_masters = args.orig_num_masters; |
| opts_.enable_kerberos = args.is_secure; |
| args_ = args; |
| cluster_.reset(new ExternalMiniCluster(opts_)); |
| RETURN_NOT_OK(cluster_->Start()); |
| return cluster_->WaitForTabletServerCount(cluster_->num_tablet_servers(), |
| MonoDelta::FromSeconds(10)); |
| } |
| void SetUp() override { |
| ASSERT_OK(SetUpWithTestArgs({ /*orig_num_masters*/2, /*is_secure*/false })); |
| TestWorkload w(cluster_.get()); |
| w.set_num_replicas(1); |
| w.Setup(); |
| } |
| protected: |
| MultiMasterClusterArgs args_; |
| ExternalMiniClusterOptions opts_; |
| unique_ptr<ExternalMiniCluster> cluster_; |
| }; |
| |
| constexpr const int64_t kShortRetryIntervalSecs = 1; |
| |
| // Test that nothing goes wrong when starting up masters but the entire cluster |
| // isn't fully healthy. The auto-add checks should still run, but should be |
| // inconsequential if they fail because the entire cluster isn't healthy. |
| TEST_F(AutoAddMasterTest, TestRestartMastersWhileSomeDown) { |
| SKIP_IF_SLOW_NOT_ALLOWED(); |
| // We'll start with three masters, and then restart two, leaving one down. |
| ASSERT_OK(cluster_->AddMaster()); |
| ASSERT_EVENTUALLY([&] { |
| ASSERT_OK(VerifyVoterMastersForCluster(cluster_->num_masters(), nullptr, cluster_.get())); |
| }); |
| |
| // Emulate one of the masters going down by only restarting two. |
| cluster_->Shutdown(); |
| for (int i = 1; i < cluster_->num_masters(); i++) { |
| ASSERT_OK(cluster_->master(i)->Restart()); |
| } |
| int table_idx = 0; |
| constexpr const char* kTablePrefix = "default.table"; |
| const auto& deadline = MonoTime::Now() + MonoDelta::FromSeconds(30); |
| while (MonoTime::Now() > deadline) { |
| SleepFor(MonoDelta::FromSeconds(1)); |
| // Nothing sinister should happen despite one master being down. The |
| // remaining masters should be operable and alive. |
| ASSERT_OK(CreateTable(cluster_.get(), Substitute("$0-$1", kTablePrefix, ++table_idx))); |
| for (int i = 1; i < cluster_->num_masters(); i++) { |
| ASSERT_TRUE(cluster_->master(i)->IsProcessAlive()); |
| } |
| } |
| ASSERT_OK(cluster_->master(0)->Restart()); |
| ASSERT_EVENTUALLY([&] { |
| ASSERT_OK(VerifyVoterMastersForCluster(cluster_->num_masters(), nullptr, cluster_.get())); |
| }); |
| } |
| |
| // Test the procedure when some masters aren't reachable. |
| TEST_F(AutoAddMasterTest, TestSomeMastersUnreachable) { |
| SKIP_IF_SLOW_NOT_ALLOWED(); |
| auto* stopped_master = cluster_->master(0); |
| ASSERT_OK(stopped_master->Pause()); |
| // Adding a master to a cluster wherein a master is already down will fail. |
| // This is similar behavior to starting a new master while some are down |
| // since the new master can't resolve all peers' UUIDs. Shorten the time |
| // masters will wait to communicate to all peers to speed up this test. |
| ASSERT_OK(cluster_->AddMaster({ "--raft_get_node_instance_timeout_ms=3000" })); |
| auto* new_master = cluster_->master(args_.orig_num_masters); |
| ASSERT_EVENTUALLY([&] { |
| ASSERT_FALSE(new_master->IsProcessAlive()); |
| }); |
| ASSERT_OK(stopped_master->Resume()); |
| |
| // Even after restarting, we still won't be quite able to start healthily |
| // because our previous crashes will have left an unusable set of metadata |
| // (i.e. no consensus metadata). |
| new_master->Shutdown(); |
| ASSERT_OK(new_master->Restart()); |
| ASSERT_EVENTUALLY([&] { |
| ASSERT_FALSE(new_master->IsProcessAlive()); |
| }); |
| // If we blow away our new master and start anew, we should be able to |
| // proceed. |
| new_master->Shutdown(); |
| ASSERT_OK(new_master->DeleteFromDisk()); |
| ASSERT_OK(new_master->Restart()); |
| ASSERT_EVENTUALLY([&] { |
| ASSERT_OK(VerifyVoterMastersForCluster(cluster_->num_masters(), nullptr, cluster_.get())); |
| }); |
| // Ensure that even after waiting a bit, our cluster is stable. |
| SleepFor(MonoDelta::FromSeconds(3)); |
| ASSERT_OK(VerifyVoterMastersForCluster(cluster_->num_masters(), nullptr, cluster_.get())); |
| } |
| |
| // Test if we fail to replicate the AddMaster request. |
| TEST_F(AutoAddMasterTest, TestFailWithoutReplicatingAddMaster) { |
| // Make master followers unable to accept updates, including config changes. |
| // We'll set this for all masters including leaders for simplicity. |
| for (int i = 0; i < cluster_->num_masters(); i++) { |
| ASSERT_OK(cluster_->SetFlag(cluster_->master(i), |
| "follower_reject_update_consensus_requests", "true")); |
| } |
| // Upon starting, the master will attempt to add itself, but fail to do so. |
| // Even after several attempts. |
| ASSERT_OK(cluster_->AddMaster({ Substitute("--master_auto_join_retry_interval_secs=$0", |
| kShortRetryIntervalSecs) })); |
| auto* new_master = cluster_->master(args_.orig_num_masters); |
| SleepFor(MonoDelta::FromSeconds(5 * kShortRetryIntervalSecs)); |
| |
| // The new master should still be around, but not be added as a part of the |
| // Raft group. |
| ASSERT_TRUE(new_master->IsProcessAlive()); |
| Status s = VerifyVoterMastersForCluster(cluster_->num_masters(), nullptr, cluster_.get()); |
| ASSERT_FALSE(s.ok()); |
| ASSERT_STR_CONTAINS(s.ToString(), "expected 3 masters but got 2"); |
| |
| // Since nothing was successfully replicated, it shouldn't be a problem to |
| // start up again and re-add. |
| new_master->Shutdown(); |
| for (int i = 0; i < cluster_->num_masters() - 1; i++) { |
| ASSERT_OK(cluster_->SetFlag(cluster_->master(i), |
| "follower_reject_update_consensus_requests", "false")); |
| } |
| ASSERT_OK(new_master->Restart()); |
| ASSERT_EVENTUALLY([&] { |
| ASSERT_OK(VerifyVoterMastersForCluster(cluster_->num_masters(), nullptr, cluster_.get())); |
| }); |
| } |
| |
| // Test when the new master fails to copy. |
| TEST_F(AutoAddMasterTest, TestFailTabletCopy) { |
| SKIP_IF_SLOW_NOT_ALLOWED(); |
| ASSERT_OK(cluster_->AddMaster({ "--tablet_copy_fault_crash_during_download_wal=1" })); |
| auto* new_master = cluster_->master(args_.orig_num_masters); |
| ASSERT_EVENTUALLY([&] { |
| ASSERT_FALSE(new_master->IsProcessAlive()); |
| }); |
| // We should have been able to add the master to the Raft quorum, but been |
| // able to copy. Upon doing so, the new master should fail to come up. |
| new_master->Shutdown(); |
| new_master->mutable_flags()->emplace_back("--tablet_copy_fault_crash_during_download_wal=0"); |
| ASSERT_OK(new_master->Restart()); |
| ASSERT_EVENTUALLY([&] { |
| ASSERT_FALSE(new_master->IsProcessAlive()); |
| }); |
| |
| // Even blowing the new master away entirely will result in a new master |
| // being unable to join. The cluster already believes there to be a new |
| // master, but no live majority, so we're unable to add _another_ master. |
| ASSERT_OK(new_master->DeleteFromDisk()); |
| new_master->Shutdown(); |
| ASSERT_OK(new_master->Restart()); |
| ASSERT_EVENTUALLY([&] { |
| ASSERT_FALSE(new_master->IsProcessAlive()); |
| }); |
| new_master->Shutdown(); |
| |
| // So, we first need to remove the master from the quorum, and then restart, |
| // at which point the new master should be able to join the cluster. |
| vector<string> addresses; |
| for (const auto& hp : cluster_->master_rpc_addrs()) { |
| addresses.emplace_back(hp.ToString()); |
| } |
| // TODO(awong): we should really consider automating this step from the |
| // leader master. |
| ASSERT_OK(tools::RunKuduTool({ "master", "remove", JoinStrings(addresses, ","), |
| new_master->bound_rpc_hostport().ToString() })); |
| ASSERT_OK(new_master->Restart()); |
| ASSERT_EVENTUALLY([&] { |
| ASSERT_OK(VerifyVoterMastersForCluster(cluster_->num_masters(), nullptr, cluster_.get())); |
| }); |
| SleepFor(MonoDelta::FromSeconds(3)); |
| ASSERT_OK(VerifyVoterMastersForCluster(cluster_->num_masters(), nullptr, cluster_.get())); |
| } |
| |
| TEST_F(AutoAddMasterTest, TestAddWithOnGoingDdl) { |
| simple_spinlock master_addrs_lock; |
| vector<string> master_addrs_unlocked; |
| for (const auto& hp : cluster_->master_rpc_addrs()) { |
| master_addrs_unlocked.emplace_back(hp.ToString()); |
| } |
| |
| // Start a thread that creates a client and tries to create tables. |
| const auto generate_client = [&] (shared_ptr<KuduClient>* c) { |
| vector<string> master_addrs; |
| { |
| std::lock_guard<simple_spinlock> l(master_addrs_lock); |
| master_addrs = master_addrs_unlocked; |
| } |
| shared_ptr<KuduClient> client; |
| RETURN_NOT_OK(client::KuduClientBuilder() |
| .master_server_addrs(master_addrs) |
| .Build(&client)); |
| *c = std::move(client); |
| return Status::OK(); |
| }; |
| |
| atomic<bool> proceed = true; |
| constexpr const int kNumThreads = 2; |
| vector<thread> threads; |
| threads.reserve(kNumThreads); |
| vector<Status> errors(kNumThreads); |
| for (int i = 0; i < kNumThreads; i++) { |
| threads.emplace_back([&, i] { |
| int idx = 0; |
| while (proceed) { |
| client::sp::shared_ptr<KuduClient> c; |
| Status s = generate_client(&c).AndThen([&] { |
| return CreateTableWithClient(c.get(), Substitute("default.$0_$1", i, ++idx)); |
| }); |
| if (!s.ok()) { |
| errors[i] = s; |
| } |
| SleepFor(MonoDelta::FromSeconds(1)); |
| } |
| }); |
| } |
| auto thread_joiner = MakeScopedCleanup([&] { |
| proceed = false; |
| for (auto& t : threads) { |
| t.join(); |
| } |
| }); |
| |
| int num_masters = args_.orig_num_masters; |
| for (int i = 0; i < 3; i++) { |
| ASSERT_OK(cluster_->AddMaster()); |
| auto* new_master = cluster_->master(args_.orig_num_masters); |
| ASSERT_OK(new_master->WaitForCatalogManager()); |
| num_masters++; |
| ASSERT_EVENTUALLY([&] { |
| ASSERT_OK(VerifyVotersOnAllMasters(num_masters, cluster_.get())); |
| }); |
| { |
| std::lock_guard<simple_spinlock> l(master_addrs_lock); |
| master_addrs_unlocked.emplace_back(new_master->bound_rpc_hostport().ToString()); |
| } |
| cluster_->Shutdown(); |
| ASSERT_OK(cluster_->Restart()); |
| ASSERT_OK(cluster_->WaitForTabletServerCount(cluster_->num_tablet_servers(), |
| MonoDelta::FromSeconds(5))); |
| } |
| proceed = false; |
| thread_joiner.cancel(); |
| for (auto& t : threads) { |
| t.join(); |
| } |
| for (const auto& e : errors) { |
| // NOTE: the table may exist if the CreateTable call is retried. |
| if (e.ok() || e.IsTimedOut() || e.IsAlreadyPresent()) { |
| continue; |
| } |
| // TODO(awong): we should relax the need for clients to have the precise |
| // list of masters. |
| if (e.IsConfigurationError()) { |
| ASSERT_STR_CONTAINS(e.ToString(), "cluster indicates it expects"); |
| continue; |
| } |
| // TODO(KUDU-1358): we should probably allow clients to retry if the RF is |
| // within some normal-looking range. |
| ASSERT_TRUE(e.IsInvalidArgument()) << e.ToString(); |
| ASSERT_STR_CONTAINS(e.ToString(), "not enough live tablet servers"); |
| } |
| } |
| |
| TEST_F(AutoAddMasterTest, TestAddNewMaster) { |
| SKIP_IF_SLOW_NOT_ALLOWED(); |
| |
| // Let's get the current master addresses and add a new one to them |
| vector<HostPort> master_addrs = cluster_->master_rpc_addrs(); |
| unique_ptr<Socket> reserved_socket; |
| ASSERT_OK(MiniCluster::ReserveDaemonSocket( |
| MiniCluster::DaemonType::MASTER, |
| master_addrs.size(), |
| opts_.bind_mode, |
| &reserved_socket)); |
| |
| Sockaddr addr; |
| ASSERT_OK(reserved_socket->GetSocketAddress(&addr)); |
| master_addrs.emplace_back(addr.host(), addr.port()); |
| |
| const auto& new_master_addrs_list = |
| HostPort::ToCommaSeparatedString(master_addrs); |
| |
| cluster_->Shutdown(); |
| |
| // We have to remove the previous '--master_addresses' flag to ensure there |
| // are no duplicates, then we set the new addresses on the existing masters. |
| for (int i = 0; i < cluster_->num_masters(); i++) { |
| auto* flags = cluster_->master(i)->mutable_flags(); |
| flags->erase( |
| remove_if(flags->begin(), |
| flags->end(), |
| [](const string &s) { |
| return HasPrefixString(s, "--master_addresses="); |
| }), flags->end()); |
| } |
| |
| for (int i = 0; i < cluster_->num_masters(); i++) { |
| cluster_->master(i)->mutable_flags()->emplace_back( |
| Substitute("--master_addresses=$0", new_master_addrs_list)); |
| } |
| |
| // Starting up the two existing masters with three addresses should cause no |
| // issues. |
| ASSERT_OK(cluster_->Restart()); |
| ASSERT_OK(cluster_->master(0)->WaitForCatalogManager()); |
| |
| // Let's create the new master and start it to ensure it starts up okay. |
| scoped_refptr<ExternalMaster> peer; |
| auto idx = cluster_->master_rpc_addrs().size(); |
| ASSERT_OK(cluster_->CreateMaster(master_addrs, idx, &peer)); |
| ASSERT_OK(peer->Start()); |
| ASSERT_OK(peer->WaitForCatalogManager()); |
| auto expected_num_masters = ++idx; |
| ASSERT_EVENTUALLY([&] { |
| ASSERT_OK(VerifyVotersOnAllMasters(expected_num_masters, cluster_.get())); |
| }); |
| NO_FATALS(cluster_->AssertNoCrashes()); |
| } |
| |
| class ParameterizedAutoAddMasterTest : public AutoAddMasterTest, |
| public ::testing::WithParamInterface<tuple<int, bool>> { |
| public: |
| void SetUp() override { |
| ASSERT_OK(SetUpWithTestArgs({ /*orig_num_masters*/std::get<0>(GetParam()), |
| /*is_secure*/std::get<1>(GetParam()) })); |
| } |
| }; |
| |
| TEST_P(ParameterizedAutoAddMasterTest, TestBasicAddition) { |
| TestWorkload w(cluster_.get()); |
| w.set_num_replicas(1); |
| w.Setup(); |
| w.Start(); |
| int num_masters = args_.orig_num_masters; |
| for (int i = 0; i < 3; i++) { |
| ASSERT_OK(cluster_->AddMaster()); |
| auto* new_master = cluster_->master(args_.orig_num_masters); |
| ASSERT_OK(new_master->WaitForCatalogManager()); |
| num_masters++; |
| ASSERT_EVENTUALLY([&] { |
| ASSERT_OK(VerifyVoterMastersForCluster(num_masters, nullptr, cluster_.get())); |
| }); |
| } |
| w.StopAndJoin(); |
| ClusterVerifier cv(cluster_.get()); |
| NO_FATALS(cv.CheckCluster()); |
| NO_FATALS(cv.CheckRowCount(w.kDefaultTableName, ClusterVerifier::EXACTLY, w.rows_inserted())); |
| |
| cluster_->Shutdown(); |
| ASSERT_OK(cluster_->Restart()); |
| ASSERT_EVENTUALLY([&] { |
| ASSERT_OK(VerifyVoterMastersForCluster(num_masters, nullptr, cluster_.get())); |
| }); |
| |
| NO_FATALS(cv.CheckCluster()); |
| NO_FATALS(cv.CheckRowCount(w.kDefaultTableName, ClusterVerifier::EXACTLY, w.rows_inserted())); |
| } |
| |
| INSTANTIATE_TEST_SUITE_P(, |
| ParameterizedAutoAddMasterTest, ::testing::Combine( |
| ::testing::Values(1, 2), |
| ::testing::Bool()), |
| [] (const ::testing::TestParamInfo<ParameterizedAutoAddMasterTest::ParamType>& info) { |
| return Substitute("$0_orig_masters_$1secure", std::get<0>(info.param), |
| std::get<1>(info.param) ? "" : "not_"); |
| }); |
| |
| } // namespace master |
| } // namespace kudu |