| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include <algorithm> |
| #include <cstdint> |
| #include <cstdio> |
| #include <deque> |
| #include <functional> |
| #include <iterator> |
| #include <limits> |
| #include <memory> |
| #include <ostream> |
| #include <string> |
| #include <tuple> |
| #include <type_traits> |
| #include <unordered_map> |
| #include <unordered_set> |
| #include <utility> |
| #include <vector> |
| |
| #include <gflags/gflags_declare.h> |
| #include <glog/logging.h> |
| #include <gtest/gtest.h> |
| |
| #include "kudu/client/client-test-util.h" |
| #include "kudu/client/client.h" |
| #include "kudu/client/schema.h" |
| #include "kudu/client/shared_ptr.h" // IWYU pragma: keep |
| #include "kudu/client/value.h" |
| #include "kudu/client/write_op.h" |
| #include "kudu/common/common.pb.h" |
| #include "kudu/common/partial_row.h" |
| #include "kudu/common/partition.h" |
| #include "kudu/common/schema.h" |
| #include "kudu/common/wire_protocol.pb.h" |
| #include "kudu/consensus/consensus.pb.h" |
| #include "kudu/consensus/metadata.pb.h" |
| #include "kudu/consensus/opid.pb.h" |
| #include "kudu/consensus/quorum_util.h" |
| #include "kudu/consensus/raft_consensus.h" |
| #include "kudu/fs/fs_manager.h" |
| #include "kudu/gutil/basictypes.h" |
| #include "kudu/gutil/map-util.h" |
| #include "kudu/gutil/ref_counted.h" |
| #include "kudu/gutil/stl_util.h" |
| #include "kudu/gutil/strings/join.h" |
| #include "kudu/gutil/strings/split.h" |
| #include "kudu/gutil/strings/strip.h" |
| #include "kudu/gutil/strings/substitute.h" |
| #include "kudu/integration-tests/cluster_itest_util.h" |
| #include "kudu/integration-tests/cluster_verifier.h" |
| #include "kudu/integration-tests/test_workload.h" |
| #include "kudu/integration-tests/ts_itest-base.h" |
| #include "kudu/master/catalog_manager.h" |
| #include "kudu/master/master.h" |
| #include "kudu/master/master.pb.h" |
| #include "kudu/master/master_options.h" |
| #include "kudu/master/sys_catalog.h" |
| #include "kudu/mini-cluster/external_mini_cluster.h" |
| #include "kudu/tablet/metadata.pb.h" |
| #include "kudu/tablet/tablet_replica.h" |
| #include "kudu/tools/tool_test_util.h" |
| #include "kudu/tserver/tablet_server-test-base.h" |
| #include "kudu/util/cow_object.h" |
| #include "kudu/util/env.h" |
| #include "kudu/util/monotime.h" |
| #include "kudu/util/net/net_util.h" |
| #include "kudu/util/net/sockaddr.h" |
| #include "kudu/util/pb_util.h" |
| #include "kudu/util/slice.h" |
| #include "kudu/util/status.h" |
| #include "kudu/util/test_macros.h" |
| #include "kudu/util/test_util.h" |
| |
| DECLARE_int32(num_replicas); |
| DECLARE_int32(num_tablet_servers); |
| DECLARE_string(sasl_protocol_name); |
| |
| using kudu::client::KuduClient; |
| using kudu::client::KuduClientBuilder; |
| using kudu::client::KuduColumnSchema; |
| using kudu::client::KuduColumnStorageAttributes; |
| using kudu::client::KuduInsert; |
| using kudu::client::KuduSchema; |
| using kudu::client::KuduSchemaBuilder; |
| using kudu::client::KuduTable; |
| using kudu::client::KuduTableAlterer; |
| using kudu::client::KuduTableCreator; |
| using kudu::client::KuduValue; |
| using kudu::client::sp::shared_ptr; |
| using kudu::cluster::ExternalMiniCluster; |
| using kudu::cluster::ExternalTabletServer; |
| using kudu::cluster::ExternalMiniCluster; |
| using kudu::cluster::ExternalMiniClusterOptions; |
| using kudu::consensus::COMMITTED_OPID; |
| using kudu::consensus::ConsensusStatePB; |
| using kudu::consensus::EXCLUDE_HEALTH_REPORT; |
| using kudu::consensus::LeaderStepDownMode; |
| using kudu::consensus::OpId; |
| using kudu::itest::FindTabletFollowers; |
| using kudu::itest::FindTabletLeader; |
| using kudu::itest::GetConsensusState; |
| using kudu::itest::ListTablesWithInfo; |
| using kudu::itest::StartElection; |
| using kudu::itest::WaitUntilLeader; |
| using kudu::itest::TabletServerMap; |
| using kudu::itest::TServerDetails; |
| using kudu::itest::WAIT_FOR_LEADER; |
| using kudu::itest::WaitForReplicasReportedToMaster; |
| using kudu::itest::WaitForServersToAgree; |
| using kudu::itest::WaitUntilCommittedConfigNumVotersIs; |
| using kudu::itest::WaitUntilTabletInState; |
| using kudu::itest::WaitUntilTabletRunning; |
| using kudu::master::Master; |
| using kudu::master::MasterOptions; |
| using kudu::master::SysCatalogTable; |
| using kudu::master::TableInfo; |
| using kudu::master::TableInfoLoader; |
| using kudu::master::TableMetadataLock; |
| using kudu::master::TabletInfo; |
| using kudu::master::TabletInfoLoader; |
| using kudu::master::TabletMetadataGroupLock; |
| using kudu::master::VOTER_REPLICA; |
| using kudu::pb_util::SecureDebugString; |
| using std::back_inserter; |
| using std::copy; |
| using std::deque; |
| using std::endl; |
| using std::ostringstream; |
| using std::string; |
| using std::pair; |
| using std::tuple; |
| using std::unique_ptr; |
| using std::vector; |
| using strings::Split; |
| using strings::Substitute; |
| |
| namespace kudu { |
| |
| namespace tools { |
| |
| namespace { |
| Status NoOpCb() { |
| return Status::OK(); |
| } |
| } // anonymous namespace |
| |
| // Helper to format info when a tool action fails. |
| static string ToolRunInfo(const Status& s, const string& out, const string& err) { |
| ostringstream str; |
| str << s.ToString() << endl; |
| str << "stdout: " << out << endl; |
| str << "stderr: " << err << endl; |
| return str.str(); |
| } |
| |
| // Helper macro for tool tests. Use as follows: |
| // |
| // ASSERT_TOOL_OK("cluster", "ksck", master_addrs); |
| // |
| // The failure Status result of RunKuduTool is usually useless, so this macro |
| // also logs the stdout and stderr in case of failure, for easier diagnosis. |
| // TODO(wdberkeley): Add a macro to retrieve stdout or stderr, or a macro for |
| // when one of those should match a string. |
| #define ASSERT_TOOL_OK(...) do { \ |
| const vector<string>& _args{__VA_ARGS__}; \ |
| string _out, _err; \ |
| const Status& _s = RunKuduTool(_args, &_out, &_err); \ |
| if (_s.ok()) { \ |
| SUCCEED(); \ |
| } else { \ |
| FAIL() << ToolRunInfo(_s, _out, _err); \ |
| } \ |
| } while (0) |
| |
| class AdminCliTest : public tserver::TabletServerIntegrationTestBase { |
| protected: |
| // Find a remaining node which will be picked for re-replication. |
| const TServerDetails* |
| FindNodeForReReplication(const TabletServerMap& active_tablet_servers) const { |
| vector<TServerDetails*> all_tservers; |
| AppendValuesFromMap(tablet_servers_, &all_tservers); |
| for (const auto *ts : all_tservers) { |
| if (!ContainsKey(active_tablet_servers, ts->uuid())) { |
| return ts; |
| } |
| } |
| return nullptr; |
| } |
| }; |
| |
| TEST_F(AdminCliTest, TestFindTabletFollowers) { |
| FLAGS_num_tablet_servers = 5; |
| FLAGS_num_replicas = 3; |
| |
| NO_FATALS(BuildAndStart()); |
| |
| TServerDetails* leader; |
| const auto timeout = MonoDelta::FromSeconds(30); |
| ASSERT_OK(FindTabletLeader(tablet_servers_, tablet_id_, timeout, &leader)); |
| vector<TServerDetails*> followers; |
| ASSERT_EVENTUALLY([&] { |
| ASSERT_OK(FindTabletFollowers(tablet_servers_, tablet_id_, timeout, &followers)); |
| }); |
| ASSERT_EQ(FLAGS_num_replicas - 1, followers.size()); |
| for (const auto& follower : followers) { |
| ASSERT_NE(leader->uuid(), follower->uuid()); |
| } |
| } |
| |
| TEST_F(AdminCliTest, TestFindTabletFollowersWithIncompleteTabletServers) { |
| FLAGS_num_tablet_servers = 5; |
| FLAGS_num_replicas = 3; |
| |
| NO_FATALS(BuildAndStart()); |
| |
| TServerDetails* leader; |
| const auto timeout = MonoDelta::FromSeconds(30); |
| ASSERT_OK(FindTabletLeader(tablet_servers_, tablet_id_, timeout, &leader)); |
| |
| // Incomplete tablet servers that contains only the leader tablet server. |
| TabletServerMap incomplete_tablet_servers; |
| InsertOrDie(&incomplete_tablet_servers, leader->uuid(), leader); |
| vector<TServerDetails*> followers; |
| ASSERT_EVENTUALLY([&] { |
| Status s = FindTabletFollowers(incomplete_tablet_servers, tablet_id_, timeout, &followers); |
| ASSERT_TRUE(s.IsInvalidArgument()); |
| ASSERT_STR_CONTAINS( |
| s.ToString(), |
| "Not all peers reported by tablet servers are part of the supplied tablet servers"); |
| }); |
| } |
| |
| // Test config change while running a workload. |
| // 1. Instantiate external mini cluster with 3 TS. |
| // 2. Create table with 2 replicas. |
| // 3. Invoke CLI to trigger a config change. |
| // 4. Wait until the new server bootstraps. |
| // 5. Profit! |
| TEST_F(AdminCliTest, TestChangeConfig) { |
| const vector<string> kMasterFlags = { |
| "--catalog_manager_wait_for_new_tablets_to_elect_leader=false", |
| "--allow_unsafe_replication_factor=true", |
| |
| // If running with the 3-4-3 replication scheme, the catalog manager removes |
| // excess replicas, so it's necessary to disable that default behavior |
| // since this test manages replicas on its own. |
| "--catalog_manager_evict_excess_replicas=false", |
| }; |
| const vector<string> kTserverFlags = { |
| "--enable_leader_failure_detection=false", |
| }; |
| |
| FLAGS_num_tablet_servers = 3; |
| FLAGS_num_replicas = 2; |
| NO_FATALS(BuildAndStart(kTserverFlags, kMasterFlags)); |
| |
| vector<TServerDetails*> tservers; |
| AppendValuesFromMap(tablet_servers_, &tservers); |
| ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size()); |
| |
| TabletServerMap active_tablet_servers; |
| TabletServerMap::const_iterator iter = tablet_replicas_.find(tablet_id_); |
| TServerDetails* leader = iter->second; |
| TServerDetails* follower = (++iter)->second; |
| InsertOrDie(&active_tablet_servers, leader->uuid(), leader); |
| InsertOrDie(&active_tablet_servers, follower->uuid(), follower); |
| |
| TServerDetails* new_node = nullptr; |
| for (TServerDetails* ts : tservers) { |
| if (!ContainsKey(active_tablet_servers, ts->uuid())) { |
| new_node = ts; |
| break; |
| } |
| } |
| ASSERT_NE(nullptr, new_node); |
| |
| // Elect the leader (still only a consensus config size of 2). |
| ASSERT_OK(StartElection(leader, tablet_id_, MonoDelta::FromSeconds(10))); |
| ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(30), active_tablet_servers, |
| tablet_id_, 1)); |
| |
| TestWorkload workload(cluster_.get()); |
| workload.set_table_name(kTableId); |
| workload.set_timeout_allowed(true); |
| workload.set_write_timeout_millis(10000); |
| workload.set_num_replicas(FLAGS_num_replicas); |
| workload.set_num_write_threads(1); |
| workload.set_write_batch_size(1); |
| workload.Setup(); |
| workload.Start(); |
| |
| // Wait until the Master knows about the leader tserver. |
| TServerDetails* master_observed_leader; |
| ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &master_observed_leader)); |
| ASSERT_EQ(leader->uuid(), master_observed_leader->uuid()); |
| |
| LOG(INFO) << "Adding replica at tserver with UUID " |
| << new_node->uuid() << " as VOTER..."; |
| ASSERT_TOOL_OK( |
| "tablet", |
| "change_config", |
| "add_replica", |
| cluster_->master()->bound_rpc_addr().ToString(), |
| tablet_id_, |
| new_node->uuid(), |
| "VOTER" |
| ); |
| |
| InsertOrDie(&active_tablet_servers, new_node->uuid(), new_node); |
| ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(active_tablet_servers.size(), |
| leader, tablet_id_, |
| MonoDelta::FromSeconds(10))); |
| |
| workload.StopAndJoin(); |
| int num_batches = workload.batches_completed(); |
| |
| LOG(INFO) << "Waiting for replicas to agree..."; |
| // Wait for all servers to replicate everything up through the last write op. |
| // Since we don't batch, there should be at least # rows inserted log entries, |
| // plus the initial leader's no-op, plus 1 for |
| // the added replica for a total == #rows + 2. |
| int min_log_index = num_batches + 2; |
| ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(30), |
| active_tablet_servers, tablet_id_, |
| min_log_index)); |
| |
| int rows_inserted = workload.rows_inserted(); |
| LOG(INFO) << "Number of rows inserted: " << rows_inserted; |
| |
| ClusterVerifier v(cluster_.get()); |
| NO_FATALS(v.CheckCluster()); |
| NO_FATALS(v.CheckRowCount(kTableId, ClusterVerifier::AT_LEAST, rows_inserted)); |
| |
| // Now remove the server. |
| LOG(INFO) << "Removing replica at tserver with UUID " |
| << new_node->uuid() << " from the config..."; |
| ASSERT_TOOL_OK( |
| "tablet", |
| "change_config", |
| "remove_replica", |
| cluster_->master()->bound_rpc_addr().ToString(), |
| tablet_id_, |
| new_node->uuid() |
| ); |
| |
| ASSERT_EQ(1, active_tablet_servers.erase(new_node->uuid())); |
| ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(active_tablet_servers.size(), |
| leader, tablet_id_, |
| MonoDelta::FromSeconds(10))); |
| } |
| |
| enum class DownTS { |
| None, |
| TabletPeer, |
| // Regression case for KUDU-2331. |
| UninvolvedTS, |
| }; |
| class MoveTabletParamTest : |
| public AdminCliTest, |
| public ::testing::WithParamInterface<tuple<Kudu1097, DownTS>> { |
| }; |
| |
| TEST_P(MoveTabletParamTest, Test) { |
| const MonoDelta timeout = MonoDelta::FromSeconds(30); |
| const auto& param = GetParam(); |
| const auto enable_kudu_1097 = std::get<0>(param); |
| const auto downTS = std::get<1>(param); |
| |
| FLAGS_num_tablet_servers = 5; |
| FLAGS_num_replicas = 3; |
| |
| vector<string> ts_flags, master_flags; |
| ts_flags = master_flags = { |
| Substitute("--raft_prepare_replacement_before_eviction=$0", |
| enable_kudu_1097 == Kudu1097::Enable) }; |
| NO_FATALS(BuildAndStart(ts_flags, master_flags)); |
| |
| vector<string> tservers; |
| AppendKeysFromMap(tablet_servers_, &tservers); |
| ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size()); |
| |
| deque<string> active_tservers; |
| for (auto iter = tablet_replicas_.find(tablet_id_); iter != tablet_replicas_.cend(); ++iter) { |
| active_tservers.push_back(iter->second->uuid()); |
| } |
| ASSERT_EQ(FLAGS_num_replicas, active_tservers.size()); |
| |
| deque<string> inactive_tservers; |
| std::sort(tservers.begin(), tservers.end()); |
| std::sort(active_tservers.begin(), active_tservers.end()); |
| std::set_difference(tservers.cbegin(), tservers.cend(), |
| active_tservers.cbegin(), active_tservers.cend(), |
| std::back_inserter(inactive_tservers)); |
| ASSERT_EQ(FLAGS_num_tablet_servers - FLAGS_num_replicas, inactive_tservers.size()); |
| |
| // The workload is light (1 thread, 1 op batches) so that new replicas |
| // bootstrap and converge quickly. |
| TestWorkload workload(cluster_.get()); |
| workload.set_table_name(kTableId); |
| workload.set_num_replicas(FLAGS_num_replicas); |
| workload.set_num_write_threads(1); |
| workload.set_write_batch_size(1); |
| workload.set_write_timeout_millis(timeout.ToMilliseconds()); |
| workload.Setup(); |
| workload.Start(); |
| |
| if (downTS == DownTS::TabletPeer) { |
| // To test that the move fails if any peer is down, when downTS is |
| // 'TabletPeer', shut down a server besides 'add' that hosts a replica. |
| NO_FATALS(cluster_->tablet_server_by_uuid(active_tservers.back())->Shutdown()); |
| } else if (downTS == DownTS::UninvolvedTS) { |
| // Regression case for KUDU-2331, where move_replica would fail if any tablet |
| // server is down, even if that tablet server was not involved in the move. |
| NO_FATALS(cluster_->tablet_server_by_uuid(inactive_tservers.back())->Shutdown()); |
| } |
| |
| // If we're not bringing down a tablet server, do 3 moves. |
| // Assuming no ad hoc leadership changes, 3 guarantees the leader is moved at least once. |
| int num_moves = AllowSlowTests() && (downTS == DownTS::None) ? 3 : 1; |
| for (int i = 0; i < num_moves; i++) { |
| const string remove = active_tservers.front(); |
| const string add = inactive_tservers.front(); |
| vector<string> tool_command = { |
| "tablet", |
| "change_config", |
| "move_replica", |
| cluster_->master()->bound_rpc_addr().ToString(), |
| tablet_id_, |
| remove, |
| add, |
| }; |
| |
| string stdout, stderr; |
| Status s = RunKuduTool(tool_command, &stdout, &stderr); |
| if (downTS == DownTS::TabletPeer) { |
| ASSERT_TRUE(s.IsRuntimeError()); |
| workload.StopAndJoin(); |
| return; |
| } |
| ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr); |
| |
| active_tservers.pop_front(); |
| active_tservers.push_back(add); |
| inactive_tservers.pop_front(); |
| inactive_tservers.push_back(remove); |
| |
| // Allow the added server time to catch up so it applies the newest configuration. |
| // If we don't wait, the initial ksck of move_tablet can fail with consensus conflict. |
| TabletServerMap active_tservers_map; |
| for (const string& uuid : active_tservers) { |
| InsertOrDie(&active_tservers_map, uuid, tablet_servers_[uuid]); |
| } |
| ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(/*num_voters=*/ FLAGS_num_replicas, |
| active_tservers_map[add], |
| tablet_id_, timeout)); |
| NO_FATALS(WaitUntilCommittedConfigNumMembersIs(/*num_members=*/ FLAGS_num_replicas, |
| active_tservers_map[add], |
| tablet_id_, timeout)); |
| |
| } |
| workload.StopAndJoin(); |
| NO_FATALS(cluster_->AssertNoCrashes()); |
| |
| // If a tablet server is down, we need to skip the ClusterVerifier. |
| if (downTS == DownTS::None) { |
| ClusterVerifier v(cluster_.get()); |
| NO_FATALS(v.CheckCluster()); |
| } |
| } |
| |
| INSTANTIATE_TEST_SUITE_P(EnableKudu1097AndDownTS, MoveTabletParamTest, |
| ::testing::Combine(::testing::Values(Kudu1097::Disable, |
| Kudu1097::Enable), |
| ::testing::Values(DownTS::None, |
| DownTS::TabletPeer, |
| DownTS::UninvolvedTS))); |
| |
| Status RunUnsafeChangeConfig(const string& tablet_id, |
| const string& dst_host, |
| const vector<string>& peer_uuid_list) { |
| vector<string> command_args = { |
| "remote_replica", |
| "unsafe_change_config", |
| dst_host, |
| tablet_id |
| }; |
| copy(peer_uuid_list.begin(), peer_uuid_list.end(), back_inserter(command_args)); |
| return RunKuduTool(command_args); |
| } |
| |
| // Test unsafe config change when there is one follower survivor in the cluster. |
| // 1. Instantiate external mini cluster with 1 tablet having 3 replicas and 5 TS. |
| // 2. Shut down leader and follower1. |
| // 3. Trigger unsafe config change on follower2 having follower2 in the config. |
| // 4. Wait until the new config is populated on follower2(new leader) and master. |
| // 5. Bring up leader and follower1 and verify replicas are deleted. |
| // 6. Verify that new config doesn't contain old leader and follower1. |
| TEST_F(AdminCliTest, TestUnsafeChangeConfigOnSingleFollower) { |
| const MonoDelta kTimeout = MonoDelta::FromSeconds(30); |
| FLAGS_num_tablet_servers = 5; |
| FLAGS_num_replicas = 3; |
| // tserver_unresponsive_timeout_ms is useful so that master considers |
| // the live tservers for tablet re-replication. |
| NO_FATALS(BuildAndStart()); |
| |
| LOG(INFO) << "Finding tablet leader and waiting for things to start..."; |
| string tablet_id = tablet_replicas_.begin()->first; |
| |
| // Determine the list of tablet servers currently in the config. |
| TabletServerMap active_tablet_servers; |
| auto iter = tablet_replicas_.equal_range(tablet_id); |
| for (auto it = iter.first; it != iter.second; ++it) { |
| InsertOrDie(&active_tablet_servers, it->second->uuid(), it->second); |
| } |
| |
| // Get a baseline config reported to the master. |
| LOG(INFO) << "Waiting for Master to see the current replicas..."; |
| master::GetTabletLocationsResponsePB tablet_locations; |
| bool has_leader; |
| ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(), |
| 3, tablet_id, kTimeout, |
| WAIT_FOR_LEADER, |
| VOTER_REPLICA, |
| &has_leader, &tablet_locations)); |
| LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations); |
| ASSERT_TRUE(has_leader) << SecureDebugString(tablet_locations); |
| |
| TServerDetails* leader_ts; |
| ASSERT_OK(FindTabletLeader(active_tablet_servers, tablet_id, kTimeout, &leader_ts)); |
| vector<TServerDetails*> followers; |
| ASSERT_EVENTUALLY([&] { |
| ASSERT_OK(FindTabletFollowers(active_tablet_servers, tablet_id, kTimeout, &followers)); |
| }); |
| // Wait for initial NO_OP to be committed by the leader. |
| ASSERT_OK(WaitForOpFromCurrentTerm(leader_ts, tablet_id, COMMITTED_OPID, kTimeout)); |
| |
| // Shut down master so it doesn't interfere while we shut down the leader and |
| // one of the other followers. |
| cluster_->master()->Shutdown(); |
| cluster_->tablet_server_by_uuid(leader_ts->uuid())->Shutdown(); |
| cluster_->tablet_server_by_uuid(followers[1]->uuid())->Shutdown(); |
| |
| |
| LOG(INFO) << "Forcing unsafe config change on remaining follower " << followers[0]->uuid(); |
| const string& follower0_addr = |
| cluster_->tablet_server_by_uuid(followers[0]->uuid())->bound_rpc_addr().ToString(); |
| ASSERT_OK(RunUnsafeChangeConfig(tablet_id, follower0_addr, { followers[0]->uuid() })); |
| ASSERT_OK(WaitUntilLeader(followers[0], tablet_id, kTimeout)); |
| ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(1, followers[0], tablet_id, kTimeout)); |
| |
| LOG(INFO) << "Restarting master..."; |
| |
| // Restart master so it can re-replicate the tablet to remaining tablet servers. |
| ASSERT_OK(cluster_->master()->Restart()); |
| |
| // Wait for master to re-replicate. |
| ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(3, followers[0], tablet_id, kTimeout)); |
| ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(), |
| 3, tablet_id, kTimeout, |
| WAIT_FOR_LEADER, VOTER_REPLICA, |
| &has_leader, &tablet_locations)); |
| OpId opid; |
| ASSERT_OK(WaitForOpFromCurrentTerm(followers[0], tablet_id, COMMITTED_OPID, kTimeout, &opid)); |
| |
| active_tablet_servers.clear(); |
| std::unordered_set<string> replica_uuids; |
| for (const auto& loc : tablet_locations.tablet_locations(0).interned_replicas()) { |
| const auto& uuid = tablet_locations.ts_infos(loc.ts_info_idx()).permanent_uuid(); |
| InsertOrDie(&active_tablet_servers, uuid, tablet_servers_[uuid]); |
| } |
| ASSERT_OK(WaitForServersToAgree(kTimeout, active_tablet_servers, tablet_id, opid.index())); |
| |
| // Verify that two new servers are part of new config and old |
| // servers are gone. |
| for (const auto& replica : tablet_locations.tablet_locations(0).interned_replicas()) { |
| const auto& uuid = tablet_locations.ts_infos(replica.ts_info_idx()).permanent_uuid(); |
| ASSERT_NE(uuid, followers[1]->uuid()); |
| ASSERT_NE(uuid, leader_ts->uuid()); |
| } |
| |
| // Also verify that when we bring back followers[1] and leader, |
| // we should see the tablet in TOMBSTONED state on these servers. |
| ASSERT_OK(cluster_->tablet_server_by_uuid(leader_ts->uuid())->Restart()); |
| ASSERT_OK(cluster_->tablet_server_by_uuid(followers[1]->uuid())->Restart()); |
| ASSERT_OK(WaitUntilTabletInState(leader_ts, tablet_id, tablet::STOPPED, kTimeout)); |
| ASSERT_OK(WaitUntilTabletInState(followers[1], tablet_id, tablet::STOPPED, kTimeout)); |
| } |
| |
| // Test unsafe config change when there is one leader survivor in the cluster. |
| // 1. Instantiate external mini cluster with 1 tablet having 3 replicas and 5 TS. |
| // 2. Shut down both followers. |
| // 3. Trigger unsafe config change on leader having leader in the config. |
| // 4. Wait until the new config is populated on leader and master. |
| // 5. Verify that new config does not contain old followers. |
| TEST_F(AdminCliTest, TestUnsafeChangeConfigOnSingleLeader) { |
| const MonoDelta kTimeout = MonoDelta::FromSeconds(30); |
| FLAGS_num_tablet_servers = 5; |
| FLAGS_num_replicas = 3; |
| NO_FATALS(BuildAndStart()); |
| |
| // Determine the list of tablet servers currently in the config. |
| TabletServerMap active_tablet_servers; |
| auto iter = tablet_replicas_.equal_range(tablet_id_); |
| for (auto it = iter.first; it != iter.second; ++it) { |
| InsertOrDie(&active_tablet_servers, it->second->uuid(), it->second); |
| } |
| |
| // Get a baseline config reported to the master. |
| LOG(INFO) << "Waiting for Master to see the current replicas..."; |
| master::GetTabletLocationsResponsePB tablet_locations; |
| bool has_leader; |
| ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(), |
| 3, tablet_id_, kTimeout, |
| WAIT_FOR_LEADER, VOTER_REPLICA, |
| &has_leader, &tablet_locations)); |
| LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations); |
| ASSERT_TRUE(has_leader) << SecureDebugString(tablet_locations); |
| |
| TServerDetails* leader_ts; |
| ASSERT_OK(FindTabletLeader(active_tablet_servers, tablet_id_, kTimeout, &leader_ts)); |
| vector<TServerDetails*> followers; |
| ASSERT_EVENTUALLY([&] { |
| ASSERT_OK(FindTabletFollowers(active_tablet_servers, tablet_id_, kTimeout, &followers)); |
| }); |
| // Wait for initial NO_OP to be committed by the leader. |
| ASSERT_OK(WaitForOpFromCurrentTerm(leader_ts, tablet_id_, COMMITTED_OPID, kTimeout)); |
| |
| // Shut down servers follower1 and follower2, |
| // so that we can force new config on remaining leader. |
| cluster_->tablet_server_by_uuid(followers[0]->uuid())->Shutdown(); |
| cluster_->tablet_server_by_uuid(followers[1]->uuid())->Shutdown(); |
| // Restart master to cleanup cache of dead servers from its list of candidate |
| // servers to trigger placement of new replicas on healthy servers. |
| cluster_->master()->Shutdown(); |
| ASSERT_OK(cluster_->master()->Restart()); |
| |
| LOG(INFO) << "Forcing unsafe config change on tserver " << leader_ts->uuid(); |
| const string& leader_addr = Substitute("$0:$1", |
| leader_ts->registration.rpc_addresses(0).host(), |
| leader_ts->registration.rpc_addresses(0).port()); |
| const vector<string> peer_uuid_list = { leader_ts->uuid() }; |
| ASSERT_OK(RunUnsafeChangeConfig(tablet_id_, leader_addr, peer_uuid_list)); |
| |
| // Check that new config is populated to a new follower. |
| vector<TServerDetails*> all_tservers; |
| TServerDetails *new_follower = nullptr; |
| AppendValuesFromMap(tablet_servers_, &all_tservers); |
| for (const auto& ts :all_tservers) { |
| if (!ContainsKey(active_tablet_servers, ts->uuid())) { |
| new_follower = ts; |
| break; |
| } |
| } |
| ASSERT_TRUE(new_follower != nullptr); |
| |
| // Master may try to add the servers which are down until tserver_unresponsive_timeout_ms, |
| // so it is safer to wait until consensus metadata has 3 voters on new_follower. |
| ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(3, new_follower, tablet_id_, kTimeout)); |
| |
| // Wait for the master to be notified of the config change. |
| LOG(INFO) << "Waiting for Master to see new config..."; |
| ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(), |
| 3, tablet_id_, kTimeout, |
| WAIT_FOR_LEADER, VOTER_REPLICA, |
| &has_leader, &tablet_locations)); |
| LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations); |
| for (const auto& replica : tablet_locations.tablet_locations(0).interned_replicas()) { |
| const auto& uuid = tablet_locations.ts_infos(replica.ts_info_idx()).permanent_uuid(); |
| ASSERT_NE(uuid, followers[0]->uuid()); |
| ASSERT_NE(uuid, followers[1]->uuid()); |
| } |
| } |
| |
| // Test unsafe config change when the unsafe config contains 2 nodes. |
| // 1. Instantiate external minicluster with 1 tablet having 3 replicas and 5 TS. |
| // 2. Shut down leader. |
| // 3. Trigger unsafe config change on follower1 having follower1 and follower2 in the config. |
| // 4. Wait until the new config is populated on new_leader and master. |
| // 5. Verify that new config does not contain old leader. |
| TEST_F(AdminCliTest, TestUnsafeChangeConfigForConfigWithTwoNodes) { |
| const MonoDelta kTimeout = MonoDelta::FromSeconds(30); |
| FLAGS_num_tablet_servers = 4; |
| FLAGS_num_replicas = 3; |
| NO_FATALS(BuildAndStart()); |
| |
| // Determine the list of tablet servers currently in the config. |
| TabletServerMap active_tablet_servers; |
| auto iter = tablet_replicas_.equal_range(tablet_id_); |
| for (auto it = iter.first; it != iter.second; ++it) { |
| InsertOrDie(&active_tablet_servers, it->second->uuid(), it->second); |
| } |
| |
| // Get a baseline config reported to the master. |
| LOG(INFO) << "Waiting for Master to see the current replicas..."; |
| master::GetTabletLocationsResponsePB tablet_locations; |
| bool has_leader; |
| ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(), |
| 3, tablet_id_, kTimeout, |
| WAIT_FOR_LEADER, VOTER_REPLICA, |
| &has_leader, &tablet_locations)); |
| LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations); |
| ASSERT_TRUE(has_leader) << SecureDebugString(tablet_locations); |
| |
| TServerDetails* leader_ts; |
| ASSERT_OK(FindTabletLeader(active_tablet_servers, tablet_id_, kTimeout, &leader_ts)); |
| vector<TServerDetails*> followers; |
| ASSERT_EVENTUALLY([&] { |
| ASSERT_OK(FindTabletFollowers(active_tablet_servers, tablet_id_, kTimeout, &followers)); |
| }); |
| // Wait for initial NO_OP to be committed by the leader. |
| ASSERT_OK(WaitForOpFromCurrentTerm(leader_ts, tablet_id_, COMMITTED_OPID, kTimeout)); |
| |
| // Shut down leader and prepare 2-node config. |
| cluster_->tablet_server_by_uuid(leader_ts->uuid())->Shutdown(); |
| // Restart master to cleanup cache of dead servers from its list of candidate |
| // servers to trigger placement of new replicas on healthy servers. |
| cluster_->master()->Shutdown(); |
| ASSERT_OK(cluster_->master()->Restart()); |
| |
| LOG(INFO) << "Forcing unsafe config change on tserver " << followers[1]->uuid(); |
| const string& follower1_addr = Substitute("$0:$1", |
| followers[1]->registration.rpc_addresses(0).host(), |
| followers[1]->registration.rpc_addresses(0).port()); |
| const vector<string> peer_uuid_list = { followers[0]->uuid(), |
| followers[1]->uuid(), }; |
| ASSERT_OK(RunUnsafeChangeConfig(tablet_id_, follower1_addr, peer_uuid_list)); |
| |
| const TServerDetails* new_node = FindNodeForReReplication(active_tablet_servers); |
| ASSERT_TRUE(new_node != nullptr); |
| |
| // Master may try to add the servers which are down until tserver_unresponsive_timeout_ms, |
| // so it is safer to wait until consensus metadata has 3 voters on follower1. |
| ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(3, new_node, tablet_id_, kTimeout)); |
| |
| // Wait for the master to be notified of the config change. |
| LOG(INFO) << "Waiting for Master to see new config..."; |
| ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(), |
| 3, tablet_id_, kTimeout, |
| WAIT_FOR_LEADER, VOTER_REPLICA, |
| &has_leader, &tablet_locations)); |
| LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations); |
| for (const auto& replica : tablet_locations.tablet_locations(0).interned_replicas()) { |
| const auto& uuid = tablet_locations.ts_infos(replica.ts_info_idx()).permanent_uuid(); |
| ASSERT_NE(uuid, leader_ts->uuid()); |
| } |
| } |
| |
| // Test unsafe config change on a 5-replica tablet when the unsafe config contains 2 nodes. |
| // 1. Instantiate external minicluster with 1 tablet having 5 replicas and 8 TS. |
| // 2. Shut down leader and 2 followers. |
| // 3. Trigger unsafe config change on a surviving follower with those |
| // 2 surviving followers in the new config. |
| // 4. Wait until the new config is populated on new_leader and master. |
| // 5. Verify that new config does not contain old leader and old followers. |
| TEST_F(AdminCliTest, TestUnsafeChangeConfigWithFiveReplicaConfig) { |
| const MonoDelta kTimeout = MonoDelta::FromSeconds(30); |
| |
| // Retire the dead servers early with these settings. |
| FLAGS_num_tablet_servers = 8; |
| FLAGS_num_replicas = 5; |
| NO_FATALS(BuildAndStart()); |
| |
| vector<TServerDetails*> tservers; |
| vector<ExternalTabletServer*> external_tservers; |
| AppendValuesFromMap(tablet_servers_, &tservers); |
| for (TServerDetails* ts : tservers) { |
| external_tservers.push_back(cluster_->tablet_server_by_uuid(ts->uuid())); |
| } |
| |
| // Determine the list of tablet servers currently in the config. |
| TabletServerMap active_tablet_servers; |
| auto iter = tablet_replicas_.equal_range(tablet_id_); |
| for (auto it = iter.first; it != iter.second; ++it) { |
| InsertOrDie(&active_tablet_servers, it->second->uuid(), it->second); |
| } |
| |
| // Get a baseline config reported to the master. |
| LOG(INFO) << "Waiting for Master to see the current replicas..."; |
| master::GetTabletLocationsResponsePB tablet_locations; |
| bool has_leader; |
| ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(), |
| 5, tablet_id_, kTimeout, |
| WAIT_FOR_LEADER, VOTER_REPLICA, |
| &has_leader, &tablet_locations)); |
| LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations); |
| ASSERT_TRUE(has_leader) << SecureDebugString(tablet_locations); |
| |
| TServerDetails* leader_ts; |
| ASSERT_OK(FindTabletLeader(active_tablet_servers, tablet_id_, kTimeout, &leader_ts)); |
| vector<TServerDetails*> followers; |
| ASSERT_EVENTUALLY([&] { |
| ASSERT_OK(FindTabletFollowers(active_tablet_servers, tablet_id_, kTimeout, &followers)); |
| }); |
| ASSERT_EQ(followers.size(), 4); |
| // Wait for initial NO_OP to be committed by the leader. |
| ASSERT_OK(WaitForOpFromCurrentTerm(leader_ts, tablet_id_, COMMITTED_OPID, kTimeout)); |
| |
| cluster_->tablet_server_by_uuid(followers[2]->uuid())->Shutdown(); |
| cluster_->tablet_server_by_uuid(followers[3]->uuid())->Shutdown(); |
| cluster_->tablet_server_by_uuid(leader_ts->uuid())->Shutdown(); |
| // Restart master to cleanup cache of dead servers from its list of candidate |
| // servers to trigger placement of new replicas on healthy servers. |
| cluster_->master()->Shutdown(); |
| ASSERT_OK(cluster_->master()->Restart()); |
| |
| LOG(INFO) << "Forcing unsafe config change on tserver " << followers[1]->uuid(); |
| const string& follower1_addr = Substitute("$0:$1", |
| followers[1]->registration.rpc_addresses(0).host(), |
| followers[1]->registration.rpc_addresses(0).port()); |
| const vector<string> peer_uuid_list = { followers[0]->uuid(), |
| followers[1]->uuid(), }; |
| ASSERT_OK(RunUnsafeChangeConfig(tablet_id_, follower1_addr, peer_uuid_list)); |
| |
| const TServerDetails* new_node = FindNodeForReReplication(active_tablet_servers); |
| ASSERT_TRUE(new_node != nullptr); |
| |
| // Master may try to add the servers which are down until tserver_unresponsive_timeout_ms, |
| // so it is safer to wait until consensus metadata has 5 voters back on new_node. |
| ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(5, new_node, tablet_id_, kTimeout)); |
| |
| // Wait for the master to be notified of the config change. |
| LOG(INFO) << "Waiting for Master to see new config..."; |
| ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(), |
| 5, tablet_id_, kTimeout, |
| WAIT_FOR_LEADER, VOTER_REPLICA, |
| &has_leader, &tablet_locations)); |
| LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations); |
| for (const auto& replica : tablet_locations.tablet_locations(0).interned_replicas()) { |
| const auto& uuid = tablet_locations.ts_infos(replica.ts_info_idx()).permanent_uuid(); |
| ASSERT_NE(uuid, leader_ts->uuid()); |
| ASSERT_NE(uuid, followers[2]->uuid()); |
| ASSERT_NE(uuid, followers[3]->uuid()); |
| } |
| } |
| |
| // Test unsafe config change when there is a pending config on a surviving leader. |
| // 1. Instantiate external minicluster with 1 tablet having 3 replicas and 5 TS. |
| // 2. Shut down both the followers. |
| // 3. Trigger a regular config change on the leader which remains pending on leader. |
| // 4. Trigger unsafe config change on the surviving leader. |
| // 5. Wait until the new config is populated on leader and master. |
| // 6. Verify that new config does not contain old followers and a standby node |
| // has populated the new config. |
| TEST_F(AdminCliTest, TestUnsafeChangeConfigLeaderWithPendingConfig) { |
| const MonoDelta kTimeout = MonoDelta::FromSeconds(30); |
| FLAGS_num_tablet_servers = 5; |
| FLAGS_num_replicas = 3; |
| NO_FATALS(BuildAndStart()); |
| |
| // Determine the list of tablet servers currently in the config. |
| TabletServerMap active_tablet_servers; |
| auto iter = tablet_replicas_.equal_range(tablet_id_); |
| for (auto it = iter.first; it != iter.second; ++it) { |
| InsertOrDie(&active_tablet_servers, it->second->uuid(), it->second); |
| } |
| |
| // Get a baseline config reported to the master. |
| master::GetTabletLocationsResponsePB tablet_locations; |
| bool has_leader; |
| ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(), |
| 3, tablet_id_, kTimeout, |
| WAIT_FOR_LEADER, VOTER_REPLICA, |
| &has_leader, &tablet_locations)); |
| ASSERT_TRUE(has_leader) << SecureDebugString(tablet_locations); |
| |
| TServerDetails* leader_ts; |
| ASSERT_OK(FindTabletLeader(active_tablet_servers, tablet_id_, kTimeout, &leader_ts)); |
| vector<TServerDetails*> followers; |
| ASSERT_EVENTUALLY([&] { |
| ASSERT_OK(FindTabletFollowers(active_tablet_servers, tablet_id_, kTimeout, &followers)); |
| }); |
| ASSERT_EQ(2, followers.size()); |
| // Wait for initial NO_OP to be committed by the leader. |
| ASSERT_OK(WaitForOpFromCurrentTerm(leader_ts, tablet_id_, COMMITTED_OPID, kTimeout)); |
| |
| // Shut down servers follower1 and follower2, |
| // so that leader can't replicate future config change ops. |
| cluster_->tablet_server_by_uuid(followers[0]->uuid())->Shutdown(); |
| cluster_->tablet_server_by_uuid(followers[1]->uuid())->Shutdown(); |
| |
| // Now try to replicate a ChangeConfig operation. This should get stuck and |
| // time out because the server can't replicate any operations. |
| const auto s = RemoveServer( |
| leader_ts, tablet_id_, followers[1], MonoDelta::FromSeconds(2), -1); |
| ASSERT_TRUE(s.IsTimedOut()) << s.ToString(); |
| |
| const string& leader_addr = Substitute("$0:$1", |
| leader_ts->registration.rpc_addresses(0).host(), |
| leader_ts->registration.rpc_addresses(0).port()); |
| const vector<string> peer_uuid_list = { leader_ts->uuid() }; |
| ASSERT_OK(RunUnsafeChangeConfig(tablet_id_, leader_addr, peer_uuid_list)); |
| |
| // Restart master to cleanup cache of dead servers from its list of candidate |
| // servers to trigger placement of new replicas on healthy servers. |
| cluster_->master()->Shutdown(); |
| ASSERT_OK(cluster_->master()->Restart()); |
| |
| const TServerDetails* new_node = FindNodeForReReplication(active_tablet_servers); |
| ASSERT_NE(nullptr, new_node); |
| |
| // Master may try to add the servers which are down until tserver_unresponsive_timeout_ms, |
| // so it is safer to wait until consensus metadata has 3 voters on new_node. |
| ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(3, new_node, tablet_id_, kTimeout)); |
| |
| // Wait for the master to be notified of the config change. |
| ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(), |
| 3, tablet_id_, kTimeout, |
| WAIT_FOR_LEADER, VOTER_REPLICA, |
| &has_leader, &tablet_locations)); |
| for (const auto& replica : tablet_locations.tablet_locations(0).interned_replicas()) { |
| const auto& uuid = tablet_locations.ts_infos(replica.ts_info_idx()).permanent_uuid(); |
| ASSERT_NE(uuid, followers[0]->uuid()); |
| ASSERT_NE(uuid, followers[1]->uuid()); |
| } |
| } |
| |
| // Test unsafe config change when there is a pending config on a surviving follower. |
| // 1. Instantiate external minicluster with 1 tablet having 3 replicas and 5 TS. |
| // 2. Shut down both the followers. |
| // 3. Trigger a regular config change on the leader which remains pending on leader. |
| // 4. Trigger a leader_step_down command such that leader is forced to become follower. |
| // 5. Trigger unsafe config change on the follower. |
| // 6. Wait until the new config is populated on leader and master. |
| // 7. Verify that new config does not contain old followers and a standby node |
| // has populated the new config. |
| TEST_F(AdminCliTest, TestUnsafeChangeConfigFollowerWithPendingConfig) { |
| const MonoDelta kTimeout = MonoDelta::FromSeconds(30); |
| FLAGS_num_tablet_servers = 5; |
| FLAGS_num_replicas = 3; |
| NO_FATALS(BuildAndStart()); |
| |
| // Determine the list of tablet servers currently in the config. |
| TabletServerMap active_tablet_servers; |
| auto iter = tablet_replicas_.equal_range(tablet_id_); |
| for (auto it = iter.first; it != iter.second; ++it) { |
| InsertOrDie(&active_tablet_servers, it->second->uuid(), it->second); |
| } |
| |
| // Get a baseline config reported to the master. |
| LOG(INFO) << "Waiting for Master to see the current replicas..."; |
| master::GetTabletLocationsResponsePB tablet_locations; |
| bool has_leader; |
| ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(), |
| 3, tablet_id_, kTimeout, |
| WAIT_FOR_LEADER, VOTER_REPLICA, |
| &has_leader, &tablet_locations)); |
| LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations); |
| ASSERT_TRUE(has_leader) << SecureDebugString(tablet_locations); |
| |
| TServerDetails* leader_ts; |
| ASSERT_OK(FindTabletLeader(active_tablet_servers, tablet_id_, kTimeout, &leader_ts)); |
| vector<TServerDetails*> followers; |
| ASSERT_EVENTUALLY([&] { |
| ASSERT_OK(FindTabletFollowers(active_tablet_servers, tablet_id_, kTimeout, &followers)); |
| }); |
| // Wait for initial NO_OP to be committed by the leader. |
| ASSERT_OK(WaitForOpFromCurrentTerm(leader_ts, tablet_id_, COMMITTED_OPID, kTimeout)); |
| |
| // Shut down servers follower1 and follower2, |
| // so that leader can't replicate future config change ops. |
| cluster_->tablet_server_by_uuid(followers[0]->uuid())->Shutdown(); |
| cluster_->tablet_server_by_uuid(followers[1]->uuid())->Shutdown(); |
| // Restart master to cleanup cache of dead servers from its |
| // list of candidate servers to place the new replicas. |
| cluster_->master()->Shutdown(); |
| ASSERT_OK(cluster_->master()->Restart()); |
| |
| // Now try to replicate a ChangeConfig operation. This should get stuck and time out |
| // because the server can't replicate any operations. |
| Status s = RemoveServer(leader_ts, tablet_id_, followers[1], |
| MonoDelta::FromSeconds(2), -1); |
| ASSERT_TRUE(s.IsTimedOut()); |
| |
| // Force leader to step down, best effort command since the leadership |
| // could change anytime during cluster lifetime. |
| string stderr; |
| s = RunKuduTool({ |
| "tablet", |
| "leader_step_down", |
| cluster_->master()->bound_rpc_addr().ToString(), |
| tablet_id_ |
| }, nullptr, &stderr); |
| bool not_currently_leader = stderr.find( |
| Status::IllegalState("").CodeAsString()) != string::npos; |
| ASSERT_TRUE(s.ok() || not_currently_leader) << "stderr: " << stderr; |
| |
| LOG(INFO) << "Change Config Op timed out, Sending a Replace config " |
| << "command when change config op is pending on the leader."; |
| const string& leader_addr = Substitute("$0:$1", |
| leader_ts->registration.rpc_addresses(0).host(), |
| leader_ts->registration.rpc_addresses(0).port()); |
| const vector<string> peer_uuid_list = { leader_ts->uuid() }; |
| ASSERT_OK(RunUnsafeChangeConfig(tablet_id_, leader_addr, peer_uuid_list)); |
| |
| const TServerDetails* new_node = FindNodeForReReplication(active_tablet_servers); |
| ASSERT_TRUE(new_node != nullptr); |
| |
| // Master may try to add the servers which are down until tserver_unresponsive_timeout_ms, |
| // so it is safer to wait until consensus metadata has 3 voters on new_node. |
| ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(3, new_node, tablet_id_, kTimeout)); |
| |
| // Wait for the master to be notified of the config change. |
| LOG(INFO) << "Waiting for Master to see new config..."; |
| ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(), |
| 3, tablet_id_, kTimeout, |
| WAIT_FOR_LEADER, VOTER_REPLICA, |
| &has_leader, &tablet_locations)); |
| LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations); |
| for (const auto& replica : tablet_locations.tablet_locations(0).interned_replicas()) { |
| const auto& uuid = tablet_locations.ts_infos(replica.ts_info_idx()).permanent_uuid(); |
| ASSERT_NE(uuid, followers[1]->uuid()); |
| ASSERT_NE(uuid, followers[0]->uuid()); |
| } |
| } |
| |
| // Test unsafe config change when there are back to back pending configs on leader logs. |
| // 1. Instantiate external minicluster with 1 tablet having 3 replicas and 5 TS. |
| // 2. Shut down both the followers. |
| // 3. Trigger a regular config change on the leader which remains pending on leader. |
| // 4. Set a fault crash flag to trigger upon next commit of config change. |
| // 5. Trigger unsafe config change on the surviving leader which should trigger |
| // the fault while the old config change is being committed. |
| // 6. Shutdown and restart the leader and verify that tablet bootstrapped on leader. |
| // 7. Verify that a new node has populated the new config with 3 voters. |
| TEST_F(AdminCliTest, TestUnsafeChangeConfigWithPendingConfigsOnWAL) { |
| const MonoDelta kTimeout = MonoDelta::FromSeconds(30); |
| FLAGS_num_tablet_servers = 5; |
| FLAGS_num_replicas = 3; |
| NO_FATALS(BuildAndStart()); |
| |
| // Determine the list of tablet servers currently in the config. |
| TabletServerMap active_tablet_servers; |
| auto iter = tablet_replicas_.equal_range(tablet_id_); |
| for (auto it = iter.first; it != iter.second; ++it) { |
| InsertOrDie(&active_tablet_servers, it->second->uuid(), it->second); |
| } |
| |
| // Get a baseline config reported to the master. |
| LOG(INFO) << "Waiting for Master to see the current replicas..."; |
| master::GetTabletLocationsResponsePB tablet_locations; |
| bool has_leader; |
| ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(), |
| 3, tablet_id_, kTimeout, |
| WAIT_FOR_LEADER, VOTER_REPLICA, |
| &has_leader, &tablet_locations)); |
| LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations); |
| ASSERT_TRUE(has_leader) << SecureDebugString(tablet_locations); |
| |
| TServerDetails* leader_ts; |
| ASSERT_OK(FindTabletLeader(active_tablet_servers, tablet_id_, kTimeout, &leader_ts)); |
| vector<TServerDetails*> followers; |
| ASSERT_EVENTUALLY([&] { |
| ASSERT_OK(FindTabletFollowers(active_tablet_servers, tablet_id_, kTimeout, &followers)); |
| }); |
| // Wait for initial NO_OP to be committed by the leader. |
| ASSERT_OK(WaitForOpFromCurrentTerm(leader_ts, tablet_id_, COMMITTED_OPID, kTimeout)); |
| |
| // Shut down servers follower1 and follower2, |
| // so that leader can't replicate future config change ops. |
| cluster_->tablet_server_by_uuid(followers[0]->uuid())->Shutdown(); |
| cluster_->tablet_server_by_uuid(followers[1]->uuid())->Shutdown(); |
| |
| // Now try to replicate a ChangeConfig operation. This should get stuck and time out |
| // because the server can't replicate any operations. |
| Status s = RemoveServer(leader_ts, tablet_id_, followers[1], |
| MonoDelta::FromSeconds(2), -1); |
| ASSERT_TRUE(s.IsTimedOut()); |
| |
| LOG(INFO) << "Change Config Op timed out, Sending a Replace config " |
| << "command when change config op is pending on the leader."; |
| const string& leader_addr = Substitute("$0:$1", |
| leader_ts->registration.rpc_addresses(0).host(), |
| leader_ts->registration.rpc_addresses(0).port()); |
| const vector<string> peer_uuid_list = { leader_ts->uuid() }; |
| ASSERT_OK(RunUnsafeChangeConfig(tablet_id_, leader_addr, peer_uuid_list)); |
| |
| // Inject the crash via fault_crash_before_cmeta_flush flag. |
| // Tablet will find 2 pending configs back to back during bootstrap, |
| // one from ChangeConfig (RemoveServer) and another from UnsafeChangeConfig. |
| ASSERT_OK(cluster_->SetFlag( |
| cluster_->tablet_server_by_uuid(leader_ts->uuid()), |
| "fault_crash_before_cmeta_flush", "1.0")); |
| |
| const TServerDetails* new_node = FindNodeForReReplication(active_tablet_servers); |
| ASSERT_TRUE(new_node != nullptr); |
| // Restart master to cleanup cache of dead servers from its list of candidate |
| // servers to trigger placement of new replicas on healthy servers. |
| cluster_->master()->Shutdown(); |
| ASSERT_OK(cluster_->master()->Restart()); |
| |
| ASSERT_OK(cluster_->tablet_server_by_uuid( |
| leader_ts->uuid())->WaitForInjectedCrash(kTimeout)); |
| |
| cluster_->tablet_server_by_uuid(leader_ts->uuid())->Shutdown(); |
| ASSERT_OK(cluster_->tablet_server_by_uuid( |
| leader_ts->uuid())->Restart()); |
| ASSERT_OK(WaitForNumTabletsOnTS(leader_ts, 1, kTimeout, nullptr)); |
| ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(3, new_node, tablet_id_, kTimeout)); |
| |
| // Wait for the master to be notified of the config change. |
| ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(), |
| 3, tablet_id_, kTimeout, |
| WAIT_FOR_LEADER, VOTER_REPLICA, |
| &has_leader, &tablet_locations)); |
| LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations); |
| for (const auto& replica : tablet_locations.tablet_locations(0).interned_replicas()) { |
| const auto& uuid = tablet_locations.ts_infos(replica.ts_info_idx()).permanent_uuid(); |
| ASSERT_NE(uuid, followers[0]->uuid()); |
| ASSERT_NE(uuid, followers[1]->uuid()); |
| } |
| } |
| |
| // Test unsafe config change on a 5-replica tablet when the mulitple pending configs |
| // on the surviving node. |
| // 1. Instantiate external minicluster with 1 tablet having 5 replicas and 9 TS. |
| // 2. Shut down all the followers. |
| // 3. Trigger unsafe config changes on the surviving leader with those |
| // dead followers in the new config. |
| // 4. Wait until the new config is populated on the master and the new leader. |
| // 5. Verify that new config does not contain old followers. |
| TEST_F(AdminCliTest, TestUnsafeChangeConfigWithMultiplePendingConfigs) { |
| const MonoDelta kTimeout = MonoDelta::FromSeconds(30); |
| FLAGS_num_tablet_servers = 9; |
| FLAGS_num_replicas = 5; |
| // Retire the dead servers early with these settings. |
| NO_FATALS(BuildAndStart()); |
| |
| vector<TServerDetails*> tservers; |
| vector<ExternalTabletServer*> external_tservers; |
| AppendValuesFromMap(tablet_servers_, &tservers); |
| for (TServerDetails* ts : tservers) { |
| external_tservers.push_back(cluster_->tablet_server_by_uuid(ts->uuid())); |
| } |
| |
| // Determine the list of tablet servers currently in the config. |
| TabletServerMap active_tablet_servers; |
| auto iter = tablet_replicas_.equal_range(tablet_id_); |
| for (auto it = iter.first; it != iter.second; ++it) { |
| InsertOrDie(&active_tablet_servers, it->second->uuid(), it->second); |
| } |
| |
| // Get a baseline config reported to the master. |
| LOG(INFO) << "Waiting for Master to see the current replicas..."; |
| master::GetTabletLocationsResponsePB tablet_locations; |
| bool has_leader; |
| ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(), |
| 5, tablet_id_, kTimeout, |
| WAIT_FOR_LEADER, VOTER_REPLICA, |
| &has_leader, &tablet_locations)); |
| LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations); |
| ASSERT_TRUE(has_leader) << SecureDebugString(tablet_locations); |
| |
| TServerDetails* leader_ts; |
| ASSERT_OK(FindTabletLeader(active_tablet_servers, tablet_id_, kTimeout, &leader_ts)); |
| vector<TServerDetails*> followers; |
| for (const auto& elem : active_tablet_servers) { |
| if (elem.first == leader_ts->uuid()) { |
| continue; |
| } |
| followers.push_back(elem.second); |
| cluster_->tablet_server_by_uuid(elem.first)->Shutdown(); |
| } |
| ASSERT_EQ(4, followers.size()); |
| |
| // Shutdown master to cleanup cache of dead servers from its list of candidate |
| // servers to trigger placement of new replicas on healthy servers when we restart later. |
| cluster_->master()->Shutdown(); |
| |
| const string& leader_addr = Substitute("$0:$1", |
| leader_ts->registration.rpc_addresses(0).host(), |
| leader_ts->registration.rpc_addresses(0).port()); |
| |
| // This should keep the multiple pending configs on the node since we are |
| // adding all the dead followers to the new config, and then eventually we write |
| // just one surviving node to the config. |
| // New config write sequences are: {ABCDE}, {ABCD}, {ABC}, {AB}, {A}, |
| // A being the leader node where config is written and rest of the nodes are |
| // dead followers. |
| for (int num_replicas = followers.size(); num_replicas >= 0; num_replicas--) { |
| vector<string> peer_uuid_list; |
| peer_uuid_list.push_back(leader_ts->uuid()); |
| for (int i = 0; i < num_replicas; i++) { |
| peer_uuid_list.push_back(followers[i]->uuid()); |
| } |
| ASSERT_OK(RunUnsafeChangeConfig(tablet_id_, leader_addr, peer_uuid_list)); |
| } |
| |
| ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(1, leader_ts, tablet_id_, kTimeout)); |
| ASSERT_OK(cluster_->master()->Restart()); |
| |
| const TServerDetails* new_node = FindNodeForReReplication(active_tablet_servers); |
| ASSERT_TRUE(new_node != nullptr); |
| |
| // Master may try to add the servers which are down until tserver_unresponsive_timeout_ms, |
| // so it is safer to wait until consensus metadata has 5 voters on new_node. |
| ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(5, new_node, tablet_id_, kTimeout)); |
| |
| // Wait for the master to be notified of the config change. |
| LOG(INFO) << "Waiting for Master to see new config..."; |
| ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(), |
| 5, tablet_id_, kTimeout, |
| WAIT_FOR_LEADER, VOTER_REPLICA, |
| &has_leader, &tablet_locations)); |
| LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations); |
| for (const auto& replica : tablet_locations.tablet_locations(0).interned_replicas()) { |
| // Verify that old followers aren't part of new config. |
| for (const auto& old_follower : followers) { |
| const auto& uuid = tablet_locations.ts_infos(replica.ts_info_idx()).permanent_uuid(); |
| ASSERT_NE(uuid, old_follower->uuid()); |
| } |
| } |
| } |
| |
| Status GetTermFromConsensus(const vector<TServerDetails*>& tservers, |
| const string& tablet_id, |
| int64_t *current_term) { |
| ConsensusStatePB cstate; |
| for (auto& ts : tservers) { |
| RETURN_NOT_OK( |
| GetConsensusState(ts, tablet_id, MonoDelta::FromSeconds(10), EXCLUDE_HEALTH_REPORT, |
| &cstate)); |
| if (!cstate.leader_uuid().empty() && |
| IsRaftConfigMember(cstate.leader_uuid(), cstate.committed_config()) && |
| cstate.has_current_term()) { |
| *current_term = cstate.current_term(); |
| return Status::OK(); |
| } |
| } |
| return Status::NotFound(Substitute( |
| "No leader replica found for tablet $0", tablet_id)); |
| } |
| |
| TEST_F(AdminCliTest, TestAbruptLeaderStepDown) { |
| const MonoDelta kTimeout = MonoDelta::FromSeconds(10); |
| const vector<string> kMasterFlags = { |
| "--catalog_manager_wait_for_new_tablets_to_elect_leader=false", |
| }; |
| const vector<string> kTserverFlags = { |
| "--enable_leader_failure_detection=false", |
| }; |
| FLAGS_num_tablet_servers = 3; |
| FLAGS_num_replicas = 3; |
| NO_FATALS(BuildAndStart(kTserverFlags, kMasterFlags)); |
| |
| // Wait for the tablet to be running. |
| vector<TServerDetails*> tservers; |
| AppendValuesFromMap(tablet_servers_, &tservers); |
| ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size()); |
| for (auto& ts : tservers) { |
| ASSERT_OK(WaitUntilTabletRunning(ts, tablet_id_, kTimeout)); |
| } |
| |
| // Elect the leader and wait for the tservers and master to see the leader. |
| const auto* leader = tservers[0]; |
| ASSERT_OK(StartElection(leader, tablet_id_, kTimeout)); |
| ASSERT_OK(WaitForServersToAgree(kTimeout, tablet_servers_, |
| tablet_id_, /*minimum_index=*/1)); |
| TServerDetails* master_observed_leader; |
| ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &master_observed_leader)); |
| ASSERT_EQ(leader->uuid(), master_observed_leader->uuid()); |
| |
| // Ask the leader to step down. |
| string stderr; |
| Status s = RunKuduTool({ |
| "tablet", |
| "leader_step_down", |
| "--abrupt", |
| cluster_->master()->bound_rpc_addr().ToString(), |
| tablet_id_ |
| }, nullptr, &stderr); |
| |
| // There shouldn't be a leader now, since failure detection is disabled. |
| for (const auto* ts : tservers) { |
| s = GetReplicaStatusAndCheckIfLeader(ts, tablet_id_, kTimeout); |
| ASSERT_TRUE(s.IsIllegalState()) << "Expected IllegalState because replica " |
| "should not be the leader: " << s.ToString(); |
| } |
| } |
| |
| TEST_F(AdminCliTest, TestGracefulLeaderStepDown) { |
| const MonoDelta kTimeout = MonoDelta::FromSeconds(10); |
| const vector<string> kMasterFlags = { |
| "--catalog_manager_wait_for_new_tablets_to_elect_leader=false", |
| }; |
| const vector<string> kTserverFlags = { |
| "--enable_leader_failure_detection=false", |
| }; |
| FLAGS_num_tablet_servers = 3; |
| FLAGS_num_replicas = 3; |
| NO_FATALS(BuildAndStart(kTserverFlags, kMasterFlags)); |
| |
| // Wait for the tablet to be running. |
| vector<TServerDetails*> tservers; |
| AppendValuesFromMap(tablet_servers_, &tservers); |
| ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size()); |
| for (auto& ts : tservers) { |
| ASSERT_OK(WaitUntilTabletRunning(ts, tablet_id_, kTimeout)); |
| } |
| |
| // Elect the leader and wait for the tservers and master to see the leader. |
| const auto* leader = tservers[0]; |
| ASSERT_OK(StartElection(leader, tablet_id_, kTimeout)); |
| ASSERT_OK(WaitForServersToAgree(kTimeout, tablet_servers_, |
| tablet_id_, /*minimum_index=*/1)); |
| TServerDetails* master_observed_leader; |
| ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &master_observed_leader)); |
| ASSERT_EQ(leader->uuid(), master_observed_leader->uuid()); |
| |
| // Ask the leader to transfer leadership to a specific peer. |
| const auto new_leader_uuid = tservers[1]->uuid(); |
| string stderr; |
| Status s = RunKuduTool({ |
| "tablet", |
| "leader_step_down", |
| Substitute("--new_leader_uuid=$0", new_leader_uuid), |
| cluster_->master()->bound_rpc_addr().ToString(), |
| tablet_id_ |
| }, nullptr, &stderr); |
| ASSERT_TRUE(s.ok()) << s.ToString(); |
| |
| // Eventually, the chosen node should become leader. |
| ASSERT_EVENTUALLY([&]() { |
| ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &master_observed_leader)); |
| ASSERT_EQ(new_leader_uuid, master_observed_leader->uuid()); |
| }); |
| |
| // Ask the leader to transfer leadership. |
| s = RunKuduTool({ |
| "tablet", |
| "leader_step_down", |
| cluster_->master()->bound_rpc_addr().ToString(), |
| tablet_id_ |
| }, nullptr, &stderr); |
| ASSERT_TRUE(s.ok()) << s.ToString(); |
| |
| // Eventually, some other node should become leader. |
| const std::unordered_set<string> possible_new_leaders = { tservers[0]->uuid(), |
| tservers[2]->uuid() }; |
| ASSERT_EVENTUALLY([&]() { |
| ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &master_observed_leader)); |
| ASSERT_TRUE(ContainsKey(possible_new_leaders, master_observed_leader->uuid())); |
| }); |
| } |
| |
| // Test current_leader_uuid flag used to transfer leadership from current node |
| // 1. Elect a leader and transfer leadership to a new node. |
| // 2. Input new node uuid as an argument to flag current_leader_uuid. |
| // 3. Transfer the leadership from current leader to another node. |
| // 4. Verify the new leader chosen is one of the two non-leader nodes. |
| TEST_F(AdminCliTest, TestGracefulSpecificLeaderStepDown) { |
| const MonoDelta kTimeout = MonoDelta::FromSeconds(10); |
| const vector<string> kMasterFlags = { |
| "--catalog_manager_wait_for_new_tablets_to_elect_leader=false", |
| }; |
| |
| // For deterministic control over manual leader selection |
| const vector<string> kTserverFlags = { |
| "--enable_leader_failure_detection=false", |
| }; |
| FLAGS_num_tablet_servers = 3; |
| FLAGS_num_replicas = 3; |
| NO_FATALS(BuildAndStart(kTserverFlags, kMasterFlags)); |
| |
| // Wait for the tablet to be running. |
| vector<TServerDetails*> tservers; |
| AppendValuesFromMap(tablet_servers_, &tservers); |
| ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size()); |
| for (auto& ts : tservers) { |
| ASSERT_OK(WaitUntilTabletRunning(ts, tablet_id_, kTimeout)); |
| } |
| |
| // Elect the leader and wait for the tservers and master to see the leader. |
| const auto* leader = tservers[0]; |
| ASSERT_OK(StartElection(leader, tablet_id_, kTimeout)); |
| ASSERT_OK(WaitForServersToAgree(kTimeout, tablet_servers_, |
| tablet_id_, /*minimum_index=*/1)); |
| TServerDetails* master_observed_leader; |
| ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &master_observed_leader)); |
| ASSERT_EQ(leader->uuid(), master_observed_leader->uuid()); |
| |
| const auto leader_uuid = leader->uuid(); |
| string stderr; |
| |
| // Ask the leader to transfer leadership and specify the current leader uuid. |
| // Use the leader elected above as current leader input, instead of relying on |
| // master configuration. This is to check whether user specified current |
| // leader uuid is ok to be used as input parameter for new leader election. |
| // Generally, we rely on master configuration to find that out. But for cases |
| // where master configuration is out of sync, user input parameter for |
| // current leader should work just as well. |
| Status s = RunKuduTool({ |
| "tablet", |
| "leader_step_down", |
| Substitute("--current_leader_uuid=$0", leader_uuid), |
| cluster_->master()->bound_rpc_addr().ToString(), |
| tablet_id_ |
| }, nullptr, &stderr); |
| ASSERT_OK(s); |
| |
| // Eventually, a replica at other node should become a leader. |
| const std::unordered_set<string> possible_new_leaders = { tservers[1]->uuid(), |
| tservers[2]->uuid() }; |
| ASSERT_EVENTUALLY([&]() { |
| ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &master_observed_leader)); |
| ASSERT_TRUE(ContainsKey(possible_new_leaders, master_observed_leader->uuid())); |
| }); |
| |
| // Negative case: When current_leader_uuid belongs to a tablet server |
| // where a follower (and not a leader) replica is running. |
| s = RunKuduTool({ |
| "tablet", |
| "leader_step_down", |
| Substitute("--current_leader_uuid=$0", leader_uuid), |
| cluster_->master()->bound_rpc_addr().ToString(), |
| tablet_id_ |
| }, nullptr, &stderr); |
| ASSERT_TRUE(s.IsRuntimeError()) << s.ToString(); |
| } |
| |
| // Leader should reject requests to transfer leadership to a non-member of the |
| // config. |
| TEST_F(AdminCliTest, TestLeaderTransferToEvictedPeer) { |
| const MonoDelta kTimeout = MonoDelta::FromSeconds(10); |
| // In this test, tablet leadership is manually controlled and the master |
| // should not rereplicate. |
| const vector<string> kMasterFlags = { |
| "--catalog_manager_wait_for_new_tablets_to_elect_leader=false", |
| "--master_add_server_when_underreplicated=false", |
| }; |
| const vector<string> kTserverFlags = { |
| "--enable_leader_failure_detection=false", |
| }; |
| FLAGS_num_tablet_servers = 3; |
| FLAGS_num_replicas = 3; |
| NO_FATALS(BuildAndStart(kTserverFlags, kMasterFlags)); |
| |
| // Wait for the tablet to be running. |
| vector<TServerDetails*> tservers; |
| AppendValuesFromMap(tablet_servers_, &tservers); |
| ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size()); |
| for (auto& ts : tservers) { |
| ASSERT_OK(WaitUntilTabletRunning(ts, tablet_id_, kTimeout)); |
| } |
| |
| // Elect the leader and wait for the tservers and master to see the leader. |
| const auto* leader = tservers[0]; |
| ASSERT_OK(StartElection(leader, tablet_id_, kTimeout)); |
| ASSERT_OK(WaitForServersToAgree(kTimeout, tablet_servers_, |
| tablet_id_, /*minimum_index=*/1)); |
| TServerDetails* master_observed_leader; |
| ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &master_observed_leader)); |
| ASSERT_EQ(leader->uuid(), master_observed_leader->uuid()); |
| |
| const string& master_addr = cluster_->master()->bound_rpc_addr().ToString(); |
| |
| // Evict the first follower. |
| string stderr; |
| const auto evicted_uuid = tservers[1]->uuid(); |
| Status s = RunKuduTool({ |
| "tablet", |
| "change_config", |
| "remove_replica", |
| master_addr, |
| tablet_id_, |
| evicted_uuid, |
| }, nullptr, &stderr); |
| ASSERT_TRUE(s.ok()) << s.ToString() << " stderr: " << stderr; |
| |
| // Ask the leader to transfer leadership to the evicted peer. |
| stderr.clear(); |
| s = RunKuduTool({ |
| "tablet", |
| "leader_step_down", |
| Substitute("--new_leader_uuid=$0", evicted_uuid), |
| master_addr, |
| tablet_id_ |
| }, nullptr, &stderr); |
| ASSERT_TRUE(s.IsRuntimeError()) << s.ToString() << " stderr: " << stderr; |
| ASSERT_STR_CONTAINS(stderr, |
| Substitute("tablet server $0 is not a voter in the active config", |
| evicted_uuid)); |
| } |
| |
| // Leader should reject requests to transfer leadership to a non-voter of the |
| // config. |
| TEST_F(AdminCliTest, TestLeaderTransferToNonVoter) { |
| const MonoDelta kTimeout = MonoDelta::FromSeconds(10); |
| // In this test, tablet leadership is manually controlled and the master |
| // should not rereplicate. |
| const vector<string> kMasterFlags = { |
| "--catalog_manager_wait_for_new_tablets_to_elect_leader=false", |
| "--master_add_server_when_underreplicated=false", |
| }; |
| const vector<string> kTserverFlags = { |
| "--enable_leader_failure_detection=false", |
| }; |
| FLAGS_num_tablet_servers = 3; |
| FLAGS_num_replicas = 3; |
| NO_FATALS(BuildAndStart(kTserverFlags, kMasterFlags)); |
| |
| // Wait for the tablet to be running. |
| vector<TServerDetails*> tservers; |
| AppendValuesFromMap(tablet_servers_, &tservers); |
| ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size()); |
| for (auto& ts : tservers) { |
| ASSERT_OK(WaitUntilTabletRunning(ts, tablet_id_, kTimeout)); |
| } |
| |
| // Elect the leader and wait for the tservers and master to see the leader. |
| const auto* leader = tservers[0]; |
| ASSERT_OK(StartElection(leader, tablet_id_, kTimeout)); |
| ASSERT_OK(WaitForServersToAgree(kTimeout, tablet_servers_, |
| tablet_id_, /*minimum_index=*/1)); |
| TServerDetails* master_observed_leader; |
| ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &master_observed_leader)); |
| ASSERT_EQ(leader->uuid(), master_observed_leader->uuid()); |
| |
| const string& master_addr = cluster_->master()->bound_rpc_addr().ToString(); |
| |
| // Demote the first follower to a non-voter. |
| string stderr; |
| const auto non_voter_uuid = tservers[1]->uuid(); |
| Status s = RunKuduTool({ |
| "tablet", |
| "change_config", |
| "change_replica_type", |
| master_addr, |
| tablet_id_, |
| non_voter_uuid, |
| "NON_VOTER", |
| }, nullptr, &stderr); |
| ASSERT_TRUE(s.ok()) << s.ToString() << " stderr: " << stderr; |
| |
| // Ask the leader to transfer leadership to the non-voter. |
| stderr.clear(); |
| s = RunKuduTool({ |
| "tablet", |
| "leader_step_down", |
| Substitute("--new_leader_uuid=$0", non_voter_uuid), |
| master_addr, |
| tablet_id_ |
| }, nullptr, &stderr); |
| ASSERT_TRUE(s.IsRuntimeError()) << s.ToString() << " stderr: " << stderr; |
| ASSERT_STR_CONTAINS(stderr, |
| Substitute("tablet server $0 is not a voter in the active config", |
| non_voter_uuid)); |
| } |
| |
| // Leader transfer causes the tablet to stop accepting new writes. This test |
| // tests that writes can still succeed even if lots of leader transfers and |
| // abrupt stepdowns are happening, as long as the writes have long enough |
| // timeouts to ride over the unstable leadership. |
| TEST_F(AdminCliTest, TestSimultaneousLeaderTransferAndAbruptStepdown) { |
| SKIP_IF_SLOW_NOT_ALLOWED(); |
| |
| const MonoDelta kTimeout = MonoDelta::FromSeconds(10); |
| FLAGS_num_tablet_servers = 3; |
| FLAGS_num_replicas = 3; |
| NO_FATALS(BuildAndStart()); |
| |
| // Wait for the tablet to be running. |
| vector<TServerDetails*> tservers; |
| AppendValuesFromMap(tablet_servers_, &tservers); |
| ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size()); |
| for (auto& ts : tservers) { |
| ASSERT_OK(WaitUntilTabletRunning(ts, tablet_id_, kTimeout)); |
| } |
| |
| // Start a workload with long timeouts. Everything should eventually go |
| // through but it might take a while given the leadership changes. |
| TestWorkload workload(cluster_.get()); |
| workload.set_table_name(kTableId); |
| workload.set_timeout_allowed(false); |
| workload.set_write_timeout_millis(60000); |
| workload.set_num_replicas(FLAGS_num_replicas); |
| workload.set_num_write_threads(1); |
| workload.set_write_batch_size(1); |
| workload.Setup(); |
| workload.Start(); |
| |
| // Sometimes this test is flaky under ASAN and the writes are never able to |
| // complete, so we'll back off on how frequently we disrupt leadership to give |
| // time for progress to be made. |
| #if defined(ADDRESS_SANITIZER) |
| const auto leader_change_period_sec = MonoDelta::FromMilliseconds(6000); |
| #else |
| const auto leader_change_period_sec = MonoDelta::FromMilliseconds(2000); |
| #endif |
| |
| const string& master_addr = cluster_->master()->bound_rpc_addr().ToString(); |
| while (workload.rows_inserted() < 1000) { |
| // Issue a graceful stepdown and then an abrupt stepdown, every |
| // 'leader_change_period_sec' seconds. The results are ignored because the |
| // tools might fail due to the constant leadership changes. |
| ignore_result(RunKuduTool({ |
| "tablet", |
| "leader_step_down", |
| master_addr, |
| tablet_id_ |
| })); |
| ignore_result(RunKuduTool({ |
| "tablet", |
| "leader_step_down", |
| "--abrupt", |
| master_addr, |
| tablet_id_ |
| })); |
| SleepFor(leader_change_period_sec); |
| } |
| } |
| |
| class TestLeaderStepDown : |
| public AdminCliTest, |
| public ::testing::WithParamInterface<LeaderStepDownMode> { |
| }; |
| INSTANTIATE_TEST_SUITE_P(, TestLeaderStepDown, |
| ::testing::Values(LeaderStepDownMode::ABRUPT, |
| LeaderStepDownMode::GRACEFUL)); |
| TEST_P(TestLeaderStepDown, TestLeaderStepDownWhenNotPresent) { |
| FLAGS_num_tablet_servers = 3; |
| FLAGS_num_replicas = 3; |
| NO_FATALS(BuildAndStart( |
| { "--enable_leader_failure_detection=false" }, |
| { "--catalog_manager_wait_for_new_tablets_to_elect_leader=false" })); |
| vector<TServerDetails*> tservers; |
| AppendValuesFromMap(tablet_servers_, &tservers); |
| ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size()); |
| for (auto& ts : tservers) { |
| ASSERT_OK(WaitUntilTabletRunning(ts, |
| tablet_id_, |
| MonoDelta::FromSeconds(10))); |
| } |
| |
| int64_t current_term; |
| ASSERT_TRUE(GetTermFromConsensus(tservers, tablet_id_, |
| ¤t_term).IsNotFound()); |
| string stdout; |
| ASSERT_OK(RunKuduTool({ |
| "tablet", |
| "leader_step_down", |
| Substitute("--abrupt=$0", GetParam() == LeaderStepDownMode::ABRUPT), |
| cluster_->master()->bound_rpc_addr().ToString(), |
| tablet_id_ |
| }, &stdout)); |
| ASSERT_STR_CONTAINS(stdout, |
| Substitute("No leader replica found for tablet $0", |
| tablet_id_)); |
| } |
| |
| TEST_P(TestLeaderStepDown, TestRepeatedLeaderStepDown) { |
| FLAGS_num_tablet_servers = 3; |
| FLAGS_num_replicas = 3; |
| // Speed up leader failure detection and shorten the leader transfer period. |
| NO_FATALS(BuildAndStart({ "--raft_heartbeat_interval_ms=50" })); |
| vector<TServerDetails*> tservers; |
| AppendValuesFromMap(tablet_servers_, &tservers); |
| ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size()); |
| for (auto& ts : tservers) { |
| ASSERT_OK(WaitUntilTabletRunning(ts, |
| tablet_id_, |
| MonoDelta::FromSeconds(10))); |
| } |
| |
| // Start a workload. |
| TestWorkload workload(cluster_.get()); |
| workload.set_table_name(kTableId); |
| workload.set_timeout_allowed(false); |
| workload.set_write_timeout_millis(30000); |
| workload.set_num_replicas(FLAGS_num_replicas); |
| workload.set_num_write_threads(4); |
| workload.set_write_batch_size(1); |
| workload.Setup(); |
| workload.Start(); |
| |
| // Issue stepdown requests repeatedly. If we leave some time for an election, |
| // the workload should still make progress. |
| const string abrupt_flag = Substitute("--abrupt=$0", |
| GetParam() == LeaderStepDownMode::ABRUPT); |
| string stdout; |
| string stderr; |
| while (workload.rows_inserted() < 2000) { |
| stdout.clear(); |
| stderr.clear(); |
| Status s = RunKuduTool({ |
| "tablet", |
| "leader_step_down", |
| abrupt_flag, |
| cluster_->master()->bound_rpc_addr().ToString(), |
| tablet_id_ |
| }, &stdout, &stderr); |
| bool not_currently_leader = stderr.find( |
| Status::IllegalState("").CodeAsString()) != string::npos; |
| ASSERT_TRUE(s.ok() || not_currently_leader) << s.ToString(); |
| SleepFor(MonoDelta::FromMilliseconds(1000)); |
| } |
| |
| ClusterVerifier(cluster_.get()).CheckCluster(); |
| } |
| |
| TEST_F(AdminCliTest, TestDeleteTable) { |
| FLAGS_num_tablet_servers = 1; |
| FLAGS_num_replicas = 1; |
| NO_FATALS(BuildAndStart()); |
| |
| string master_address = cluster_->master()->bound_rpc_addr().ToString(); |
| shared_ptr<KuduClient> client; |
| ASSERT_OK(KuduClientBuilder() |
| .add_master_server_addr(master_address) |
| .Build(&client)); |
| |
| ASSERT_TOOL_OK( |
| "table", |
| "delete", |
| master_address, |
| kTableId |
| ); |
| |
| vector<string> tables; |
| ASSERT_OK(client->ListTables(&tables)); |
| ASSERT_TRUE(tables.empty()); |
| } |
| |
| TEST_F(AdminCliTest, TestListSoftDeletedTables) { |
| FLAGS_num_tablet_servers = 1; |
| FLAGS_num_replicas = 1; |
| |
| NO_FATALS(BuildAndStart()); |
| |
| { |
| string stdout; |
| string stderr; |
| Status s = RunKuduTool({ |
| "table", |
| "list", |
| cluster_->master()->bound_rpc_addr().ToString(), |
| }, &stdout, &stderr); |
| ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr); |
| ASSERT_STR_CONTAINS(stdout, kTableId); |
| } |
| |
| { |
| string stdout; |
| ASSERT_OK(RunKuduTool({ |
| "table", |
| "list", |
| "-soft_deleted_only=true", |
| cluster_->master()->bound_rpc_addr().ToString() |
| }, &stdout)); |
| |
| vector<string> stdout_lines = Split(stdout, ",", strings::SkipEmpty()); |
| ASSERT_TRUE(stdout_lines.empty()); |
| |
| ASSERT_OK(RunKuduTool({ |
| "table", |
| "delete", |
| "--reserve_seconds=300", |
| cluster_->master()->bound_rpc_addr().ToString(), |
| kTableId |
| }, &stdout)); |
| |
| stdout.clear(); |
| string stderr; |
| Status s = RunKuduTool({ |
| "table", |
| "list", |
| "-soft_deleted_only=true", |
| cluster_->master()->bound_rpc_addr().ToString(), |
| }, &stdout, &stderr); |
| ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr); |
| ASSERT_STR_CONTAINS(stdout, kTableId); |
| } |
| } |
| |
| static vector<string> TableListFormat() { |
| return { "pretty", "json", "json_compact" }; |
| } |
| |
| class ListTableCliSimpleParamTest : |
| public AdminCliTest, |
| public ::testing::WithParamInterface<string> { |
| }; |
| |
| INSTANTIATE_TEST_SUITE_P(, ListTableCliSimpleParamTest, |
| ::testing::ValuesIn(TableListFormat())); |
| |
| TEST_P(ListTableCliSimpleParamTest, TestListTables) { |
| FLAGS_num_tablet_servers = 1; |
| FLAGS_num_replicas = 1; |
| |
| NO_FATALS(BuildAndStart()); |
| |
| string stdout; |
| string stderr; |
| Status s = RunKuduTool({ |
| "table", |
| "list", |
| Substitute("--list_table_output_format=$0", GetParam()), |
| cluster_->master()->bound_rpc_addr().ToString(), |
| }, &stdout, &stderr); |
| ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr); |
| |
| if (GetParam() == "pretty") { |
| vector<string> stdout_lines = Split(stdout, ",", strings::SkipEmpty()); |
| ASSERT_EQ(1, stdout_lines.size()); |
| ASSERT_STR_CONTAINS(stdout, Substitute("$0\n", kTableId)); |
| } else if (GetParam() == "json") { |
| ASSERT_STR_CONTAINS(stdout, Substitute("\"name\": \"$0\"", kTableId)); |
| } else if (GetParam() == "json_compact") { |
| ASSERT_STR_CONTAINS(stdout, Substitute("\"name\":\"$0\"", kTableId)); |
| } else { |
| FAIL() << "unexpected table list format" << GetParam(); |
| } |
| } |
| |
| class ListTableCliDetailParamTest : |
| public AdminCliTest, |
| public ::testing::WithParamInterface<string> { |
| }; |
| |
| INSTANTIATE_TEST_SUITE_P(, ListTableCliDetailParamTest, |
| ::testing::ValuesIn(TableListFormat())); |
| |
| TEST_P(ListTableCliDetailParamTest, TestListTablesDetail) { |
| FLAGS_num_tablet_servers = 3; |
| FLAGS_num_replicas = 3; |
| |
| NO_FATALS(BuildAndStart()); |
| |
| // Add another table to test multiple tables output. |
| const string kAnotherTableId = "TestAnotherTable"; |
| auto client_schema = KuduSchema::FromSchema(schema_); |
| unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator()); |
| ASSERT_OK(table_creator->table_name(kAnotherTableId) |
| .schema(&client_schema) |
| .set_range_partition_columns({ "key" }) |
| .num_replicas(FLAGS_num_replicas) |
| .Create()); |
| |
| // Grab list of tablet_ids from any tserver. |
| vector<TServerDetails*> tservers; |
| vector<string> tablet_ids; |
| AppendValuesFromMap(tablet_servers_, &tservers); |
| ListRunningTabletIds(tservers.front(), |
| MonoDelta::FromSeconds(30), &tablet_ids); |
| |
| string stdout; |
| string stderr; |
| Status s = RunKuduTool({ |
| "table", |
| "list", |
| "--list_tablets", |
| Substitute("--list_table_output_format=$0", GetParam()), |
| cluster_->master()->bound_rpc_addr().ToString(), |
| }, &stdout, &stderr); |
| ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr); |
| |
| if (GetParam() == "pretty") { |
| vector<string> stdout_lines = Split(stdout, "\n", strings::SkipEmpty()); |
| |
| // Verify multiple tables along with their tablets and replica-uuids. |
| ASSERT_STR_CONTAINS(stdout, kTableId); |
| ASSERT_STR_CONTAINS(stdout, kAnotherTableId); |
| ASSERT_STR_CONTAINS(stdout, tablet_ids.front()); |
| ASSERT_STR_CONTAINS(stdout, tablet_ids.back()); |
| |
| for (auto& ts : tservers) { |
| ASSERT_STR_CONTAINS(stdout, ts->uuid()); |
| ASSERT_STR_CONTAINS(stdout, ts->uuid()); |
| } |
| } else if (GetParam() == "json") { |
| // The 'json' format output should contain the table id for the table. |
| ASSERT_STR_CONTAINS(stdout, Substitute("\"name\": \"$0\"", kTableId)); |
| ASSERT_STR_CONTAINS(stdout, Substitute("\"name\": \"$0\"", kAnotherTableId)); |
| for (auto& tablet_id : tablet_ids) { |
| ASSERT_STR_CONTAINS(stdout, Substitute("\"tablet_id\": \"$0\"", tablet_id)); |
| } |
| |
| for (auto& ts : tservers) { |
| ASSERT_STR_CONTAINS(stdout, Substitute("\"uuid\": \"$0\"", ts->uuid())); |
| } |
| } else if (GetParam() == "json_compact") { |
| // The 'json_compact' format output should contain the table id for the table. |
| ASSERT_STR_CONTAINS(stdout, Substitute("\"name\":\"$0\"", kTableId)); |
| ASSERT_STR_CONTAINS(stdout, Substitute("\"name\":\"$0\"", kAnotherTableId)); |
| for (auto& tablet_id : tablet_ids) { |
| ASSERT_STR_CONTAINS(stdout, Substitute("\"tablet_id\":\"$0\"", tablet_id)); |
| } |
| |
| for (auto& ts : tservers) { |
| ASSERT_STR_CONTAINS(stdout, Substitute("\"uuid\":\"$0\"", ts->uuid())); |
| } |
| } else { |
| FAIL() << "unexpected table list format" << GetParam(); |
| } |
| } |
| |
| TEST_F(AdminCliTest, TestGetTableStatistics) { |
| vector<string> master_flags{ "--mock_table_metrics_for_testing=true", |
| "--on_disk_size_for_testing=1024", |
| "--live_row_count_for_testing=1000" }; |
| |
| NO_FATALS(BuildAndStart({}, master_flags)); |
| |
| string stdout, stderr; |
| Status s = RunKuduTool({ |
| "table", |
| "statistics", |
| cluster_->master()->bound_rpc_addr().ToString(), |
| kTableId |
| }, &stdout, &stderr); |
| ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr); |
| ASSERT_STR_CONTAINS(stdout, "on disk size: 1024\n" |
| "live row count: 1000\n"); |
| } |
| |
| TEST_F(AdminCliTest, TestDescribeTable) { |
| FLAGS_num_tablet_servers = 1; |
| FLAGS_num_replicas = 1; |
| |
| NO_FATALS(BuildAndStart()); |
| |
| // The default table has a range partition with only one partition. |
| string stdout, stderr; |
| Status s = RunKuduTool({ |
| "table", |
| "describe", |
| cluster_->master()->bound_rpc_addr().ToString(), |
| kTableId |
| }, &stdout, &stderr); |
| ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr); |
| |
| ASSERT_STR_CONTAINS(stdout, |
| "(\n" |
| " key INT32 NOT NULL,\n" |
| " int_val INT32 NOT NULL,\n" |
| " string_val STRING NULLABLE,\n" |
| " PRIMARY KEY (key)\n" |
| ")\n" |
| "RANGE (key) (\n" |
| " PARTITION UNBOUNDED" |
| "\n" |
| ")\n" |
| "OWNER alice\n" |
| "REPLICAS 1\n" |
| "COMMENT "); |
| |
| // Test a table with all types in its schema, multiple hash partitioning |
| // levels, multiple range partitions, and non-covered ranges. |
| const string kAnotherTableId = "TestAnotherTable"; |
| KuduSchema schema; |
| |
| // Build the schema. |
| { |
| KuduSchemaBuilder builder; |
| builder.AddColumn("key_hash0")->Type(KuduColumnSchema::INT32)->NotNull(); |
| builder.AddColumn("key_hash1")->Type(KuduColumnSchema::INT32)->NotNull(); |
| builder.AddColumn("key_hash2")->Type(KuduColumnSchema::INT32)->NotNull(); |
| builder.AddColumn("key_range")->Type(KuduColumnSchema::INT32)->NotNull(); |
| builder.AddColumn("int8_val")->Type(KuduColumnSchema::INT8) |
| ->Compression(KuduColumnStorageAttributes::CompressionType::NO_COMPRESSION) |
| ->Encoding(KuduColumnStorageAttributes::EncodingType::PLAIN_ENCODING); |
| builder.AddColumn("int16_val")->Type(KuduColumnSchema::INT16) |
| ->Compression(KuduColumnStorageAttributes::CompressionType::SNAPPY) |
| ->Encoding(KuduColumnStorageAttributes::EncodingType::RLE); |
| builder.AddColumn("int32_val")->Type(KuduColumnSchema::INT32) |
| ->Compression(KuduColumnStorageAttributes::CompressionType::LZ4) |
| ->Encoding(KuduColumnStorageAttributes::EncodingType::BIT_SHUFFLE); |
| builder.AddColumn("int64_val")->Type(KuduColumnSchema::INT64) |
| ->Compression(KuduColumnStorageAttributes::CompressionType::ZLIB) |
| ->Default(KuduValue::FromInt(123)); |
| builder.AddColumn("timestamp_val")->Type(KuduColumnSchema::UNIXTIME_MICROS); |
| builder.AddColumn("date_val")->Type(KuduColumnSchema::DATE); |
| builder.AddColumn("string_val")->Type(KuduColumnSchema::STRING) |
| ->Encoding(KuduColumnStorageAttributes::EncodingType::PREFIX_ENCODING) |
| ->Default(KuduValue::CopyString(Slice("hello"))) |
| ->Comment("comment for hello"); |
| builder.AddColumn("bool_val")->Type(KuduColumnSchema::BOOL) |
| ->Default(KuduValue::FromBool(false)); |
| builder.AddColumn("float_val")->Type(KuduColumnSchema::FLOAT); |
| builder.AddColumn("double_val")->Type(KuduColumnSchema::DOUBLE) |
| ->Default(KuduValue::FromDouble(123.4)); |
| builder.AddColumn("binary_val")->Type(KuduColumnSchema::BINARY) |
| ->Encoding(KuduColumnStorageAttributes::EncodingType::DICT_ENCODING); |
| builder.AddColumn("decimal_val")->Type(KuduColumnSchema::DECIMAL) |
| ->Precision(30) |
| ->Scale(4); |
| builder.SetPrimaryKey({ "key_hash0", "key_hash1", "key_hash2", "key_range" }); |
| ASSERT_OK(builder.Build(&schema)); |
| } |
| |
| // Set up partitioning and create the table. |
| { |
| unique_ptr<KuduPartialRow> lower_bound0(schema.NewRow()); |
| ASSERT_OK(lower_bound0->SetInt32("key_range", 0)); |
| unique_ptr<KuduPartialRow> upper_bound0(schema.NewRow()); |
| ASSERT_OK(upper_bound0->SetInt32("key_range", 1)); |
| unique_ptr<KuduPartialRow> lower_bound1(schema.NewRow()); |
| ASSERT_OK(lower_bound1->SetInt32("key_range", 2)); |
| unique_ptr<KuduPartialRow> upper_bound1(schema.NewRow()); |
| ASSERT_OK(upper_bound1->SetInt32("key_range", 3)); |
| unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator()); |
| ASSERT_OK(table_creator->table_name(kAnotherTableId) |
| .schema(&schema) |
| .add_hash_partitions({"key_hash0"}, 2) |
| .add_hash_partitions({"key_hash1", "key_hash2"}, 3) |
| .set_range_partition_columns({"key_range"}) |
| .add_range_partition(lower_bound0.release(), upper_bound0.release()) |
| .add_range_partition(lower_bound1.release(), upper_bound1.release()) |
| .num_replicas(FLAGS_num_replicas) |
| .set_owner("alice") |
| .set_comment("table comment") |
| .Create()); |
| } |
| |
| // OK, all that busywork is done. Test the describe output. |
| stdout.clear(); |
| stderr.clear(); |
| s = RunKuduTool({ |
| "table", |
| "describe", |
| cluster_->master()->bound_rpc_addr().ToString(), |
| kAnotherTableId, |
| }, &stdout, &stderr); |
| ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr); |
| |
| ASSERT_STR_CONTAINS(stdout, |
| "(\n" |
| " key_hash0 INT32 NOT NULL,\n" |
| " key_hash1 INT32 NOT NULL,\n" |
| " key_hash2 INT32 NOT NULL,\n" |
| " key_range INT32 NOT NULL,\n" |
| " int8_val INT8 NULLABLE,\n" |
| " int16_val INT16 NULLABLE,\n" |
| " int32_val INT32 NULLABLE,\n" |
| " int64_val INT64 NULLABLE,\n" |
| " timestamp_val UNIXTIME_MICROS NULLABLE,\n" |
| " date_val DATE NULLABLE,\n" |
| " string_val STRING NULLABLE,\n" |
| " bool_val BOOL NULLABLE,\n" |
| " float_val FLOAT NULLABLE,\n" |
| " double_val DOUBLE NULLABLE,\n" |
| " binary_val BINARY NULLABLE,\n" |
| " decimal_val DECIMAL(30, 4) NULLABLE,\n" |
| " PRIMARY KEY (key_hash0, key_hash1, key_hash2, key_range)\n" |
| ")\n" |
| "HASH (key_hash0) PARTITIONS 2,\n" |
| "HASH (key_hash1, key_hash2) PARTITIONS 3,\n" |
| "RANGE (key_range) (\n" |
| " PARTITION 0 <= VALUES < 1,\n" |
| " PARTITION 2 <= VALUES < 3\n" |
| ")\n" |
| "OWNER alice\n" |
| "REPLICAS 1\n" |
| "COMMENT table comment"); |
| |
| // Test the describe output with `-show_attributes=true`. |
| stdout.clear(); |
| stderr.clear(); |
| s = RunKuduTool({ |
| "table", |
| "describe", |
| cluster_->master()->bound_rpc_addr().ToString(), |
| kAnotherTableId, |
| "-show_attributes=true", |
| "-show_column_comment=true" |
| }, &stdout, &stderr); |
| ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr); |
| |
| ASSERT_STR_CONTAINS( |
| stdout, |
| "(\n" |
| " key_hash0 INT32 NOT NULL AUTO_ENCODING DEFAULT_COMPRESSION - -,\n" |
| " key_hash1 INT32 NOT NULL AUTO_ENCODING DEFAULT_COMPRESSION - -,\n" |
| " key_hash2 INT32 NOT NULL AUTO_ENCODING DEFAULT_COMPRESSION - -,\n" |
| " key_range INT32 NOT NULL AUTO_ENCODING DEFAULT_COMPRESSION - -,\n" |
| " int8_val INT8 NULLABLE PLAIN_ENCODING NO_COMPRESSION - -,\n" |
| " int16_val INT16 NULLABLE RLE SNAPPY - -,\n" |
| " int32_val INT32 NULLABLE BIT_SHUFFLE LZ4 - -,\n" |
| " int64_val INT64 NULLABLE AUTO_ENCODING ZLIB 123 123,\n" |
| " timestamp_val UNIXTIME_MICROS NULLABLE AUTO_ENCODING DEFAULT_COMPRESSION - -,\n" |
| " date_val DATE NULLABLE AUTO_ENCODING DEFAULT_COMPRESSION - -,\n" |
| " string_val STRING NULLABLE PREFIX_ENCODING DEFAULT_COMPRESSION \"hello\" \"hello\" " |
| "comment for hello,\n" |
| " bool_val BOOL NULLABLE AUTO_ENCODING DEFAULT_COMPRESSION false false,\n" |
| " float_val FLOAT NULLABLE AUTO_ENCODING DEFAULT_COMPRESSION - -,\n" |
| " double_val DOUBLE NULLABLE AUTO_ENCODING DEFAULT_COMPRESSION 123.4 123.4,\n" |
| " binary_val BINARY NULLABLE DICT_ENCODING DEFAULT_COMPRESSION - -,\n" |
| " decimal_val DECIMAL(30, 4) NULLABLE AUTO_ENCODING DEFAULT_COMPRESSION - -,\n" |
| " PRIMARY KEY (key_hash0, key_hash1, key_hash2, key_range)\n" |
| ")\n" |
| "HASH (key_hash0) PARTITIONS 2,\n" |
| "HASH (key_hash1, key_hash2) PARTITIONS 3,\n" |
| "RANGE (key_range) (\n" |
| " PARTITION 0 <= VALUES < 1,\n" |
| " PARTITION 2 <= VALUES < 3\n" |
| ")\n" |
| "OWNER alice\n" |
| "REPLICAS 1"); |
| |
| s = RunKuduTool({ |
| "table", |
| "describe", |
| cluster_->master()->bound_rpc_addr().ToString(), |
| kAnotherTableId, |
| "-show_avro_format_schema" |
| }, &stdout, &stderr); |
| ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr); |
| |
| ASSERT_STR_CONTAINS( |
| stdout, |
| Substitute( |
| "{\n" |
| " \"type\": \"table\",\n" |
| " \"name\": \"TestAnotherTable\",\n" |
| " \"namespace\": \"kudu.cluster.$0\",\n" |
| " \"fields\": [\n" |
| " {\n" |
| " \"name\": \"key_hash0\",\n" |
| " \"type\": \"int\"\n" |
| " },\n" |
| " {\n" |
| " \"name\": \"key_hash1\",\n" |
| " \"type\": \"int\"\n" |
| " },\n" |
| " {\n" |
| " \"name\": \"key_hash2\",\n" |
| " \"type\": \"int\"\n" |
| " },\n" |
| " {\n" |
| " \"name\": \"key_range\",\n" |
| " \"type\": \"int\"\n" |
| " },\n" |
| " {\n" |
| " \"name\": \"int8_val\",\n" |
| " \"type\": [\n" |
| " \"null\",\n" |
| " \"int\"\n" |
| " ]\n" |
| " },\n" |
| " {\n" |
| " \"name\": \"int16_val\",\n" |
| " \"type\": [\n" |
| " \"null\",\n" |
| " \"int\"\n" |
| " ]\n" |
| " },\n" |
| " {\n" |
| " \"name\": \"int32_val\",\n" |
| " \"type\": [\n" |
| " \"null\",\n" |
| " \"int\"\n" |
| " ]\n" |
| " },\n" |
| " {\n" |
| " \"name\": \"int64_val\",\n" |
| " \"type\": [\n" |
| " \"null\",\n" |
| " \"long\"\n" |
| " ],\n" |
| " \"default\": \"123\"\n" |
| " },\n" |
| " {\n" |
| " \"name\": \"timestamp_val\",\n" |
| " \"type\": [\n" |
| " {\n" |
| " \"type\": \"long\",\n" |
| " \"logicalType\": \"time-micros\"\n" |
| " }\n" |
| " ]\n" |
| " },\n" |
| " {\n" |
| " \"name\": \"date_val\",\n" |
| " \"type\": [\n" |
| " {\n" |
| " \"type\": \"int\",\n" |
| " \"logicalType\": \"date\"\n" |
| " }\n" |
| " ]\n" |
| " },\n" |
| " {\n" |
| " \"name\": \"string_val\",\n" |
| " \"type\": [\n" |
| " \"null\",\n" |
| " \"string\"\n" |
| " ],\n" |
| " \"default\": \"\\\"hello\\\"\"\n" |
| " },\n" |
| " {\n" |
| " \"name\": \"bool_val\",\n" |
| " \"type\": [\n" |
| " \"null\",\n" |
| " \"bool\"\n" |
| " ],\n" |
| " \"default\": \"false\"\n" |
| " },\n" |
| " {\n" |
| " \"name\": \"float_val\",\n" |
| " \"type\": [\n" |
| " \"null\",\n" |
| " \"float\"\n" |
| " ]\n" |
| " },\n" |
| " {\n" |
| " \"name\": \"double_val\",\n" |
| " \"type\": [\n" |
| " \"null\",\n" |
| " \"double\"\n" |
| " ],\n" |
| " \"default\": \"123.4\"\n" |
| " },\n" |
| " {\n" |
| " \"name\": \"binary_val\",\n" |
| " \"type\": [\n" |
| " \"null\",\n" |
| " \"bytes\"\n" |
| " ]\n" |
| " },\n" |
| " {\n" |
| " \"name\": \"decimal_val\",\n" |
| " \"type\": [\n" |
| " {\n" |
| " \"type\": \"bytes\",\n" |
| " \"logicalType\": \"decimal\"\n" |
| " }\n" |
| " ]\n" |
| " }\n" |
| " ]\n" |
| "}\n", |
| client_->cluster_id()) |
| ); |
| } |
| |
| // Simple tests to check whether the column describing flags |
| // work, alone and together as well. |
| TEST_F(AdminCliTest, TestDescribeTableColumnFlags) { |
| |
| NO_FATALS(BuildAndStart()); |
| string stdout; |
| const string kTableName = "TestAnotherTable"; |
| |
| { |
| // Build the schema |
| KuduSchema schema; |
| KuduSchemaBuilder builder; |
| builder.AddColumn("foo")->Type(KuduColumnSchema::INT32)->NotNull(); |
| builder.AddColumn("bar")->Type(KuduColumnSchema::INT32)->NotNull() |
| ->Comment("comment for bar"); |
| builder.SetPrimaryKey({"foo"}); |
| ASSERT_OK(builder.Build(&schema)); |
| |
| // Create the table |
| unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator()); |
| ASSERT_OK(table_creator->table_name(kTableName) |
| .schema(&schema) |
| .set_range_partition_columns({"foo"}) |
| .Create()); |
| } |
| |
| // Test the describe output with flags |
| ASSERT_OK(RunKuduTool({"table", |
| "describe", |
| cluster_->master()->bound_rpc_addr().ToString(), |
| kTableName, |
| "-show_column_comment"}, |
| &stdout)); |
| ASSERT_STR_CONTAINS(stdout, |
| "(\n" |
| " foo INT32 NOT NULL,\n" |
| " bar INT32 NOT NULL comment for bar,\n" |
| " PRIMARY KEY (foo)\n" |
| ")\n"); |
| stdout.clear(); |
| |
| ASSERT_OK(RunKuduTool({"table", |
| "describe", |
| cluster_->master()->bound_rpc_addr().ToString(), |
| kTableName, |
| "-show_attributes"}, |
| &stdout)); |
| ASSERT_STR_CONTAINS(stdout, |
| "(\n" |
| " foo INT32 NOT NULL AUTO_ENCODING DEFAULT_COMPRESSION - -,\n" |
| " bar INT32 NOT NULL AUTO_ENCODING DEFAULT_COMPRESSION - -,\n" |
| " PRIMARY KEY (foo)\n" |
| ")\n"); |
| stdout.clear(); |
| |
| ASSERT_OK(RunKuduTool({"table", |
| "describe", |
| cluster_->master()->bound_rpc_addr().ToString(), |
| kTableName, |
| "-show_attributes", |
| "-show_column_comment"}, |
| &stdout)); |
| ASSERT_STR_CONTAINS( |
| stdout, |
| "(\n" |
| " foo INT32 NOT NULL AUTO_ENCODING DEFAULT_COMPRESSION - -,\n" |
| " bar INT32 NOT NULL AUTO_ENCODING DEFAULT_COMPRESSION - - comment for bar,\n" |
| " PRIMARY KEY (foo)\n" |
| ")\n"); |
| } |
| |
| TEST_F(AdminCliTest, TestDescribeTableNoOwner) { |
| NO_FATALS(BuildAndStart({}, {"--allow_empty_owner=true"})); |
| KuduSchema schema; |
| KuduSchemaBuilder builder; |
| builder.AddColumn("foo")->Type(KuduColumnSchema::INT32)->NotNull()->PrimaryKey(); |
| ASSERT_OK(builder.Build(&schema)); |
| |
| const string kTableName = "table_without_owner"; |
| |
| unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator()); |
| ASSERT_OK(table_creator->table_name(kTableName) |
| .schema(&schema) |
| .set_range_partition_columns({"foo"}) |
| .num_replicas(1) |
| .set_owner("") |
| .Create()); |
| |
| string stdout; |
| ASSERT_OK(RunKuduTool( |
| { |
| "table", |
| "describe", |
| cluster_->master()->bound_rpc_addr().ToString(), |
| kTableName, |
| }, |
| &stdout)); |
| ASSERT_STR_CONTAINS(stdout, "OWNER \n"); |
| } |
| |
| TEST_F(AdminCliTest, TestDescribeTableCustomHashSchema) { |
| NO_FATALS(BuildAndStart({}, {}, {}, /*create_table*/false)); |
| KuduSchema schema; |
| |
| // Build the schema |
| { |
| KuduSchemaBuilder builder; |
| builder.AddColumn("key_range")->Type(KuduColumnSchema::INT32)->NotNull(); |
| builder.AddColumn("key_hash0")->Type(KuduColumnSchema::INT32)->NotNull(); |
| builder.AddColumn("key_hash1")->Type(KuduColumnSchema::INT32)->NotNull(); |
| builder.SetPrimaryKey({"key_range", "key_hash0", "key_hash1"}); |
| ASSERT_OK(builder.Build(&schema)); |
| } |
| |
| constexpr const char* const kTableName = "table_with_custom_hash_schema"; |
| unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator()); |
| table_creator->table_name(kTableName) |
| .schema(&schema) |
| .add_hash_partitions({"key_hash0"}, 2) |
| .set_range_partition_columns({"key_range"}) |
| .num_replicas(1); |
| |
| // Create a KuduRangePartition with custom hash schema |
| { |
| unique_ptr<KuduPartialRow> lower(schema.NewRow()); |
| CHECK_OK(lower->SetInt32("key_range", 0)); |
| unique_ptr<KuduPartialRow> upper(schema.NewRow()); |
| CHECK_OK(upper->SetInt32("key_range", 100)); |
| unique_ptr<client::KuduRangePartition> partition( |
| new client::KuduRangePartition(lower.release(), upper.release())); |
| partition->add_hash_partitions({"key_hash1"}, 3); |
| table_creator->add_custom_range_partition(partition.release()); |
| } |
| |
| // Create a partition with table wide hash schema |
| { |
| unique_ptr<KuduPartialRow> lower(schema.NewRow()); |
| CHECK_OK(lower->SetInt32("key_range", 100)); |
| unique_ptr<KuduPartialRow> upper(schema.NewRow()); |
| CHECK_OK(upper->SetInt32("key_range", 200)); |
| table_creator->add_range_partition(lower.release(), upper.release()); |
| } |
| |
| // Create the table and run the tool |
| ASSERT_OK(table_creator->Create()); |
| string stdout; |
| ASSERT_OK(RunKuduTool( |
| { |
| "table", |
| "describe", |
| cluster_->master()->bound_rpc_addr().ToString(), |
| kTableName, |
| }, |
| &stdout)); |
| ASSERT_STR_CONTAINS(stdout, "PARTITION 0 <= VALUES < 100 HASH(key_hash1) PARTITIONS 3,\n" |
| " PARTITION 100 <= VALUES < 200"); |
| } |
| |
| class ListTableCliParamTest : |
| public AdminCliTest, |
| public ::testing::WithParamInterface<tuple<bool /* show_tablet_partition_info*/, |
| bool /* show_hash_partition_info*/, |
| string /* output format*/>> { |
| }; |
| |
| INSTANTIATE_TEST_SUITE_P(, ListTableCliParamTest, |
| ::testing::Combine(::testing::Bool(), |
| ::testing::Bool(), |
| ::testing::ValuesIn(TableListFormat()))); |
| |
| // Basic test that the kudu tool works in the list tablets case. |
| TEST_P(ListTableCliParamTest, ListTabletWithPartitionInfo) { |
| const auto show_tp = std::get<0>(GetParam()) ? PartitionSchema::HashPartitionInfo::SHOW : |
| PartitionSchema::HashPartitionInfo::HIDE; |
| const auto show_hp = std::get<1>(GetParam()) ? PartitionSchema::HashPartitionInfo::SHOW : |
| PartitionSchema::HashPartitionInfo::HIDE; |
| const auto kTimeout = MonoDelta::FromSeconds(30); |
| |
| // This combination of parameters does not meet expectations. |
| if (show_tp == PartitionSchema::HashPartitionInfo::HIDE && |
| show_hp == PartitionSchema::HashPartitionInfo::SHOW) { |
| return; |
| } |
| |
| FLAGS_num_tablet_servers = 1; |
| FLAGS_num_replicas = 1; |
| |
| NO_FATALS(BuildAndStart()); |
| |
| vector<TServerDetails*> tservers; |
| vector<string> base_tablet_ids; |
| AppendValuesFromMap(tablet_servers_, &tservers); |
| ListRunningTabletIds(tservers.front(), kTimeout, &base_tablet_ids); |
| |
| // Test a table with all types in its schema, multiple hash partitioning |
| // levels, multiple range partitions, and non-covered ranges. |
| constexpr const char* const kTableName = "TestTableListPartition"; |
| KuduSchema schema; |
| |
| // Build the schema. |
| { |
| KuduSchemaBuilder builder; |
| builder.AddColumn("key_hash0")->Type(KuduColumnSchema::INT32)->NotNull(); |
| builder.AddColumn("key_hash1")->Type(KuduColumnSchema::INT32)->NotNull(); |
| builder.AddColumn("key_hash2")->Type(KuduColumnSchema::INT32)->NotNull(); |
| builder.AddColumn("key_range")->Type(KuduColumnSchema::INT32)->NotNull(); |
| builder.SetPrimaryKey({ "key_hash0", "key_hash1", "key_hash2", "key_range" }); |
| ASSERT_OK(builder.Build(&schema)); |
| } |
| |
| // Set up partitioning and create the table. |
| { |
| unique_ptr<KuduPartialRow> lower_bound0(schema.NewRow()); |
| ASSERT_OK(lower_bound0->SetInt32("key_range", 0)); |
| unique_ptr<KuduPartialRow> upper_bound0(schema.NewRow()); |
| ASSERT_OK(upper_bound0->SetInt32("key_range", 1)); |
| unique_ptr<KuduPartialRow> lower_bound1(schema.NewRow()); |
| ASSERT_OK(lower_bound1->SetInt32("key_range", 2)); |
| unique_ptr<KuduPartialRow> upper_bound1(schema.NewRow()); |
| ASSERT_OK(upper_bound1->SetInt32("key_range", 3)); |
| unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator()); |
| ASSERT_OK(table_creator->table_name(kTableName) |
| .schema(&schema) |
| .add_hash_partitions({"key_hash0"}, 2) |
| .add_hash_partitions({"key_hash1", "key_hash2"}, 3) |
| .set_range_partition_columns({"key_range"}) |
| .add_range_partition(lower_bound0.release(), upper_bound0.release()) |
| .add_range_partition(lower_bound1.release(), upper_bound1.release()) |
| .num_replicas(FLAGS_num_replicas) |
| .Create()); |
| } |
| |
| vector<string> new_tablet_ids; |
| ListRunningTabletIds(tservers.front(), kTimeout, &new_tablet_ids); |
| vector<string> delta_tablet_ids; |
| for (auto& tablet_id : base_tablet_ids) { |
| if (std::find(new_tablet_ids.begin(), new_tablet_ids.end(), tablet_id) == |
| new_tablet_ids.end()) { |
| delta_tablet_ids.push_back(tablet_id); |
| } |
| } |
| |
| // Test the list tablet with partition output. |
| string stdout; |
| string stderr; |
| Status s = RunKuduTool({ |
| "table", |
| "list", |
| "--list_tablets", |
| Substitute("--show_tablet_partition_info=$0", std::get<0>(GetParam())), |
| Substitute("--show_hash_partition_info=$0", std::get<1>(GetParam())), |
| Substitute("--list_table_output_format=$0", std::get<2>(GetParam())), |
| "--tables", |
| kTableName, |
| cluster_->master()->bound_rpc_addr().ToString(), |
| }, &stdout, &stderr); |
| ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr); |
| |
| client::sp::shared_ptr<KuduTable> table; |
| ASSERT_OK(client_->OpenTable(kTableName, &table)); |
| const auto& partition_schema = table->partition_schema(); |
| const auto& schema_internal = KuduSchema::ToSchema(table->schema()); |
| |
| // make sure table name correct |
| ASSERT_STR_CONTAINS(stdout, kTableName); |
| |
| master::ListTablesResponsePB tables_info; |
| ASSERT_OK(ListTablesWithInfo( |
| cluster_->master_proxy(), kTableName, kTimeout, &tables_info)); |
| for (const auto& table : tables_info.tables()) { |
| for (const auto& pt : table.tablet_with_partition()) { |
| Partition partition; |
| Partition::FromPB(pt.partition(), &partition); |
| string partition_str; |
| if (show_tp) { |
| partition_str = " : " + partition_schema.PartitionDebugString(partition, |
| schema_internal, |
| show_hp); |
| } |
| string tablet_with_partition = pt.tablet_id() + partition_str; |
| if (std::get<2>(GetParam()) == "pretty") { |
| ASSERT_STR_CONTAINS(stdout, tablet_with_partition); |
| } |
| } |
| } |
| |
| if (std::get<2>(GetParam()) == "pretty") { |
| ASSERT_STR_CONTAINS(stdout, Substitute("$0", kTableName)); |
| |
| for (auto& tablet_id : delta_tablet_ids) { |
| ASSERT_STR_CONTAINS(stdout, tablet_id); |
| } |
| |
| for (auto& ts : tservers) { |
| ASSERT_STR_CONTAINS(stdout, ts->uuid()); |
| ASSERT_STR_CONTAINS(stdout, ts->uuid()); |
| } |
| } else if (std::get<2>(GetParam()) == "json") { |
| ASSERT_STR_CONTAINS(stdout, Substitute("\"name\": \"$0\"", kTableName)); |
| |
| for (auto& tablet_id : delta_tablet_ids) { |
| ASSERT_STR_CONTAINS(stdout, Substitute("\"tablet_id\": \"$0\"", tablet_id)); |
| } |
| |
| for (auto& ts : tservers) { |
| ASSERT_STR_CONTAINS(stdout, Substitute("\"uuid\": \"$0\"", ts->uuid())); |
| } |
| } else if (std::get<2>(GetParam()) == "json_compact") { |
| ASSERT_STR_CONTAINS(stdout, Substitute("\"name\":\"$0\"", kTableName)); |
| |
| for (auto& tablet_id : delta_tablet_ids) { |
| ASSERT_STR_CONTAINS(stdout, Substitute("\"tablet_id\":\"$0\"", tablet_id)); |
| } |
| |
| for (auto& ts : tservers) { |
| ASSERT_STR_CONTAINS(stdout, Substitute("\"uuid\":\"$0\"", ts->uuid())); |
| } |
| } else { |
| FAIL() << "unexpected table list format" << std::get<2>(GetParam()); |
| } |
| } |
| |
| TEST_F(AdminCliTest, TestLocateRow) { |
| FLAGS_num_tablet_servers = 1; |
| FLAGS_num_replicas = 1; |
| |
| NO_FATALS(BuildAndStart()); |
| |
| // Test an OK case. Not much going on here since the table has only one |
| // tablet, which covers the whole universe. |
| string stdout, stderr; |
| Status s = RunKuduTool({ |
| "table", |
| "locate_row", |
| cluster_->master()->bound_rpc_addr().ToString(), |
| kTableId, |
| "[-1]" |
| }, &stdout, &stderr); |
| ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr); |
| |
| // Grab list of tablet_ids from the tserver and check the output. |
| vector<TServerDetails*> tservers; |
| vector<string> tablet_ids; |
| AppendValuesFromMap(tablet_servers_, &tservers); |
| ListRunningTabletIds(tservers.front(), |
| MonoDelta::FromSeconds(30), |
| &tablet_ids); |
| ASSERT_EQ(1, tablet_ids.size()); |
| ASSERT_STR_CONTAINS(stdout, tablet_ids[0]); |
| |
| // Test a few error cases. |
| const auto check_bad_input = [&](const string& json, const string& error) { |
| string out, err; |
| Status s = RunKuduTool({ |
| "table", |
| "locate_row", |
| cluster_->master()->bound_rpc_addr().ToString(), |
| kTableId, |
| json, |
| }, &out, &err); |
| ASSERT_TRUE(s.IsRuntimeError()); |
| ASSERT_STR_CONTAINS(err, error); |
| }; |
| |
| // String instead of int. |
| NO_FATALS(check_bad_input("[\"foo\"]", "unable to parse")); |
| |
| // Float instead of int. |
| NO_FATALS(check_bad_input("[1.2]", "unable to parse")); |
| |
| // Overflow (recall the key is INT32). |
| NO_FATALS(check_bad_input( |
| Substitute("[$0]", std::to_string(std::numeric_limits<int64_t>::max())), |
| "out of range")); |
| } |
| |
| TEST_F(AdminCliTest, TestLocateRowMore) { |
| FLAGS_num_tablet_servers = 1; |
| FLAGS_num_replicas = 1; |
| |
| NO_FATALS(BuildAndStart()); |
| |
| // Make a complex schema with multiple columns in the primary key, hash and |
| // range partitioning, and non-covered ranges. |
| const string kAnotherTableId = "TestAnotherTable"; |
| KuduSchema schema; |
| |
| // Build the schema. |
| KuduSchemaBuilder builder; |
| builder.AddColumn("key_hash")->Type(KuduColumnSchema::STRING)->NotNull(); |
| builder.AddColumn("key_range")->Type(KuduColumnSchema::INT32)->NotNull(); |
| builder.SetPrimaryKey({ "key_hash", "key_range" }); |
| ASSERT_OK(builder.Build(&schema)); |
| |
| // Set up partitioning and create the table. |
| unique_ptr<KuduPartialRow> lower_bound0(schema.NewRow()); |
| ASSERT_OK(lower_bound0->SetInt32("key_range", 0)); |
| unique_ptr<KuduPartialRow> upper_bound0(schema.NewRow()); |
| ASSERT_OK(upper_bound0->SetInt32("key_range", 1)); |
| unique_ptr<KuduPartialRow> lower_bound1(schema.NewRow()); |
| ASSERT_OK(lower_bound1->SetInt32("key_range", 2)); |
| unique_ptr<KuduPartialRow> upper_bound1(schema.NewRow()); |
| ASSERT_OK(upper_bound1->SetInt32("key_range", 3)); |
| unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator()); |
| ASSERT_OK(table_creator->table_name(kAnotherTableId) |
| .schema(&schema) |
| .add_hash_partitions({ "key_hash" }, 2) |
| .set_range_partition_columns({ "key_range" }) |
| .add_range_partition(lower_bound0.release(), upper_bound0.release()) |
| .add_range_partition(lower_bound1.release(), upper_bound1.release()) |
| .num_replicas(FLAGS_num_replicas) |
| .Create()); |
| |
| vector<TServerDetails*> tservers; |
| vector<string> tablet_ids; |
| AppendValuesFromMap(tablet_servers_, &tservers); |
| ListRunningTabletIds(tservers.front(), |
| MonoDelta::FromSeconds(30), |
| &tablet_ids); |
| std::unordered_set<string> tablet_id_set(tablet_ids.begin(), tablet_ids.end()); |
| |
| // Since there isn't a great alternative way to validate the answer the tool |
| // gives, and the scan token code underlying the implementation is extensively |
| // tested, we won't overexert ourselves checking correctness, and instead just |
| // do sanity checks and tool usability checks. |
| string stdout, stderr; |
| Status s = RunKuduTool({ |
| "table", |
| "locate_row", |
| cluster_->master()->bound_rpc_addr().ToString(), |
| kAnotherTableId, |
| "[\"foo bar\",0]" |
| }, &stdout, &stderr); |
| ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr); |
| StripWhiteSpace(&stdout); |
| const auto tablet_id_for_0 = stdout; |
| ASSERT_TRUE(ContainsKey(tablet_id_set, tablet_id_for_0)) |
| << "expected to find tablet id " << tablet_id_for_0; |
| |
| // A row in a different range partition should be in a different tablet. |
| stdout.clear(); |
| stderr.clear(); |
| s = RunKuduTool({ |
| "table", |
| "locate_row", |
| cluster_->master()->bound_rpc_addr().ToString(), |
| kAnotherTableId, |
| "[\"foo\",2]" |
| }, &stdout, &stderr); |
| ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr); |
| StripWhiteSpace(&stdout); |
| const auto tablet_id_for_2 = stdout; |
| ASSERT_TRUE(ContainsKey(tablet_id_set, tablet_id_for_0)) |
| << "expected to find tablet id " << tablet_id_for_2; |
| ASSERT_NE(tablet_id_for_0, tablet_id_for_2); |
| |
| // Test a few error cases. |
| const auto check_bad_input = [&](const string& json, const string& error) { |
| string out, err; |
| Status s = RunKuduTool({ |
| "table", |
| "locate_row", |
| cluster_->master()->bound_rpc_addr().ToString(), |
| kAnotherTableId, |
| json, |
| }, &out, &err); |
| ASSERT_TRUE(s.IsRuntimeError()); |
| ASSERT_STR_CONTAINS(err, error); |
| }; |
| |
| // Test locating a row lying in a non-covered range. |
| NO_FATALS(check_bad_input( |
| "[\"foo\",1]", |
| "row does not belong to any currently existing tablet")); |
| |
| // Test providing a missing or incomplete primary key. |
| NO_FATALS(check_bad_input( |
| "[]", |
| "wrong number of key columns specified: expected 2 but received 0")); |
| NO_FATALS(check_bad_input( |
| "[\"foo\"]", |
| "wrong number of key columns specified: expected 2 but received 1")); |
| |
| // Test providing too many key column values. |
| NO_FATALS(check_bad_input( |
| "[\"foo\",2,\"bar\"]", |
| "wrong number of key columns specified: expected 2 but received 3")); |
| |
| // Test providing an invalid value for a key column when there's multiple |
| // key columns. |
| NO_FATALS(check_bad_input("[\"foo\",\"bar\"]", "unable to parse")); |
| |
| // Test providing bad json. |
| NO_FATALS(check_bad_input("[", "JSON text is corrupt")); |
| NO_FATALS(check_bad_input("[\"foo\",]", "JSON text is corrupt")); |
| |
| // Test providing valid JSON that's not an array. |
| NO_FATALS(check_bad_input( |
| "{ \"key_hash\" : \"foo\", \"key_range\" : 2 }", |
| "wrong type during field extraction: expected object array")); |
| } |
| |
| TEST_F(AdminCliTest, TestLocateRowAndCheckRowPresence) { |
| FLAGS_num_tablet_servers = 1; |
| FLAGS_num_replicas = 1; |
| |
| NO_FATALS(BuildAndStart()); |
| |
| // Grab list of tablet_ids from any tserver so we can check the output. |
| vector<TServerDetails*> tservers; |
| vector<string> tablet_ids; |
| AppendValuesFromMap(tablet_servers_, &tservers); |
| ListRunningTabletIds(tservers.front(), |
| MonoDelta::FromSeconds(30), |
| &tablet_ids); |
| ASSERT_EQ(1, tablet_ids.size()); |
| const string& expected_tablet_id = tablet_ids[0]; |
| |
| // Test the case when the row does not exist. |
| string stdout, stderr; |
| Status s = RunKuduTool({ |
| "table", |
| "locate_row", |
| cluster_->master()->bound_rpc_addr().ToString(), |
| kTableId, |
| "[0]", |
| "-check_row_existence", |
| }, &stdout, &stderr); |
| ASSERT_TRUE(s.IsRuntimeError()) << ToolRunInfo(s, stdout, stderr); |
| ASSERT_STR_CONTAINS(stdout, expected_tablet_id); |
| ASSERT_STR_CONTAINS(stderr, "row does not exist"); |
| |
| // Insert row with key = 0. |
| client::sp::shared_ptr<KuduClient> client; |
| CreateClient(&client); |
| client::sp::shared_ptr<KuduTable> table; |
| ASSERT_OK(client->OpenTable(kTableId, &table)); |
| unique_ptr<KuduInsert> insert(table->NewInsert()); |
| auto* row = insert->mutable_row(); |
| ASSERT_OK(row->SetInt32("key", 0)); |
| ASSERT_OK(row->SetInt32("int_val", 12345)); |
| ASSERT_OK(row->SetString("string_val", "hello")); |
| const string row_str = row->ToString(); |
| auto session = client->NewSession(); |
| ASSERT_OK(session->Apply(insert.release())); |
| ASSERT_OK(session->Flush()); |
| ASSERT_OK(session->Close()); |
| |
| // Test the case when the row exists. Since the scan is done by a subprocess |
| // using a different client instance, it's possible the scan will not |
| // immediately retrieve the row even though the write has already succeeded. |
| // This use case is what timestamp propagation is for, but there's no way to |
| // propagate a timestamp to a tool (and there shouldn't be). Instead, we |
| // ASSERT_EVENTUALLY. |
| ASSERT_EVENTUALLY([&]() { |
| stdout.clear(); |
| stderr.clear(); |
| s = RunKuduTool({ |
| "table", |
| "locate_row", |
| cluster_->master()->bound_rpc_addr().ToString(), |
| kTableId, |
| "[0]", |
| "-check_row_existence", |
| }, &stdout, &stderr); |
| ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr); |
| ASSERT_STR_CONTAINS(stdout, expected_tablet_id); |
| ASSERT_STR_CONTAINS(stdout, row_str); |
| }); |
| } |
| |
| TEST_F(AdminCliTest, TestDumpMemTrackers) { |
| FLAGS_num_tablet_servers = 1; |
| FLAGS_num_replicas = 1; |
| |
| NO_FATALS(BuildAndStart()); |
| |
| // Grab list of tablet_ids from any tserver so we can check the output. |
| vector<TServerDetails*> tservers; |
| vector<string> tablet_ids; |
| AppendValuesFromMap(tablet_servers_, &tservers); |
| ListRunningTabletIds(tservers.front(), |
| MonoDelta::FromSeconds(30), |
| &tablet_ids); |
| ASSERT_EQ(1, tablet_ids.size()); |
| const string& tablet_id = tablet_ids[0]; |
| |
| // The tool should work against the master. |
| string stdout, stderr; |
| Status s = RunKuduTool({ |
| "master", |
| "dump_memtrackers", |
| cluster_->master()->bound_rpc_hostport().ToString(), |
| "-format=csv", |
| }, &stdout, &stderr); |
| ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr); |
| |
| // The memtrackers dump from the master should contain a tracker for the |
| // systablet that has 'server' as its parent. |
| ASSERT_STR_CONTAINS( |
| stdout, |
| Substitute("tablet-$0,server", |
| master::SysCatalogTable::kSysCatalogTabletId)); |
| |
| // The tool should work against the tablet server. |
| stdout.clear(); |
| stderr.clear(); |
| s = RunKuduTool({ |
| "tserver", |
| "dump_memtrackers", |
| cluster_->tablet_server(0)->bound_rpc_hostport().ToString(), |
| "-memtracker_output=json_compact", |
| }, &stdout, &stderr); |
| ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr); |
| |
| // The memtrackers dump from the tablet server should contain a tracker for |
| // the tablet and some tracker that is a child of that tracker. |
| const string tablet_tracker_id = Substitute("tablet-$0", tablet_id); |
| ASSERT_STR_CONTAINS(stdout, Substitute("\"id\":\"$0\"", tablet_tracker_id)); |
| ASSERT_STR_CONTAINS(stdout, Substitute("\"parent_id\":\"$0\"", tablet_tracker_id)); |
| } |
| |
| TEST_F(AdminCliTest, TestAuthzResetCacheIncorrectMasterAddressList) { |
| ExternalMiniClusterOptions opts; |
| opts.num_masters = 3; |
| cluster_.reset(new ExternalMiniCluster(std::move(opts))); |
| ASSERT_OK(cluster_->Start()); |
| auto master_addrs_str = HostPort::ToCommaSeparatedString(cluster_->master_rpc_addrs()); |
| auto incorrect_master_addrs_str = cluster_->master(0)->bound_rpc_hostport().ToString(); |
| string out; |
| string err; |
| Status s; |
| |
| s = RunKuduTool({ |
| "master", |
| "authz_cache", |
| "refresh", |
| incorrect_master_addrs_str, |
| }, &out, &err); |
| ASSERT_TRUE(s.IsRuntimeError()) << ToolRunInfo(s, out, err); |
| |
| const auto ref_err_msg = Substitute( |
| "Invalid argument: list of master addresses provided ($0) " |
| "does not match the actual cluster configuration ($1)", |
| incorrect_master_addrs_str, master_addrs_str); |
| ASSERT_STR_CONTAINS(err, ref_err_msg); |
| |
| // However, the '--force' option makes it possible to run the tool even |
| // if the specified list of master addresses does not match the actual |
| // list of master addresses in the cluster. |
| out.clear(); |
| err.clear(); |
| ASSERT_OK(RunKuduTool({ |
| "master", |
| "authz_cache", |
| "refresh", |
| "--force", |
| incorrect_master_addrs_str, |
| }, &out, &err)); |
| } |
| |
| TEST_F(AdminCliTest, TestAuthzResetCacheNotAuthorized) { |
| vector<string> master_flags{ "--superuser_acl=no-such-user" }; |
| NO_FATALS(BuildAndStart({}, master_flags)); |
| |
| // The tool should report an error: it's not possible to reset the cache |
| // since the OS user under which the tools is invoked is not a superuser/admin |
| // (the --superuser_acl flag is set to contain a non-existent user only). |
| string out; |
| string err; |
| Status s = RunKuduTool({ |
| "master", |
| "authz_cache", |
| "refresh", |
| cluster_->master()->bound_rpc_hostport().ToString(), |
| }, &out, &err); |
| ASSERT_TRUE(s.IsRuntimeError()) << ToolRunInfo(s, out, err); |
| ASSERT_STR_CONTAINS(err, |
| "Remote error: Not authorized: unauthorized access to method: " |
| "RefreshAuthzCache"); |
| } |
| |
| TEST_F(AdminCliTest, TestExtraConfig) { |
| NO_FATALS(BuildAndStart()); |
| |
| string master_address = cluster_->master()->bound_rpc_addr().ToString(); |
| |
| // Gets extra-configs when no extra config set. |
| { |
| string stdout, stderr; |
| Status s = RunKuduTool({ |
| "table", |
| "get_extra_configs", |
| master_address, |
| kTableId |
| }, &stdout, &stderr); |
| ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr); |
| ASSERT_EQ(stdout, " Configuration | Value\n" |
| "---------------+-------\n"); |
| } |
| |
| // Sets "kudu.table.history_max_age_sec" to 3600. |
| { |
| ASSERT_TOOL_OK( |
| "table", |
| "set_extra_config", |
| master_address, |
| kTableId, |
| "kudu.table.history_max_age_sec", |
| "3600" |
| ); |
| } |
| |
| // Gets all extra-configs. |
| { |
| string stdout, stderr; |
| Status s = RunKuduTool({ |
| "table", |
| "get_extra_configs", |
| master_address, |
| kTableId, |
| }, &stdout, &stderr); |
| ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr); |
| ASSERT_STR_CONTAINS(stdout, "kudu.table.history_max_age_sec | 3600"); |
| } |
| |
| // Gets the specified extra-config, the configuration exists. |
| { |
| string stdout, stderr; |
| Status s = RunKuduTool({ |
| "table", |
| "get_extra_configs", |
| master_address, |
| kTableId, |
| "-config_names=kudu.table.history_max_age_sec" |
| }, &stdout, &stderr); |
| ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr); |
| ASSERT_STR_CONTAINS(stdout, "kudu.table.history_max_age_sec | 3600"); |
| } |
| |
| // Gets the duplicate extra-configs, the configuration exists. |
| { |
| string stdout, stderr; |
| Status s = RunKuduTool({ |
| "table", |
| "get_extra_configs", |
| master_address, |
| kTableId, |
| "-config_names=kudu.table.history_max_age_sec,kudu.table.history_max_age_sec" |
| }, &stdout, &stderr); |
| ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr); |
| ASSERT_EQ(stdout, " Configuration | Value\n" |
| "--------------------------------+-------\n" |
| " kudu.table.history_max_age_sec | 3600\n"); |
| } |
| |
| // Gets the specified extra-config, the configuration doesn't exists. |
| { |
| string stdout, stderr; |
| Status s = RunKuduTool({ |
| "table", |
| "get_extra_configs", |
| master_address, |
| kTableId, |
| "-config_names=foobar" |
| }, &stdout, &stderr); |
| ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr); |
| ASSERT_EQ(stdout, " Configuration | Value\n" |
| "---------------+-------\n"); |
| } |
| } |
| |
| // Insert num_rows into table from start key to (start key + num_rows). |
| // The other two columns of the table are specified as fixed value. |
| // If the insert value is out of the range partition of the table, |
| // the function will return IOError which as we expect. |
| static Status InsertTestRows(const client::sp::shared_ptr<KuduClient>& client, |
| const string& table_name, |
| int start_key, |
| int num_rows) { |
| client::sp::shared_ptr<KuduTable> table; |
| RETURN_NOT_OK(client->OpenTable(table_name, &table)); |
| auto session = client->NewSession(); |
| for (int i = start_key; i < num_rows + start_key; ++i) { |
| unique_ptr<KuduInsert> insert(table->NewInsert()); |
| auto* row = insert->mutable_row(); |
| RETURN_NOT_OK(row->SetInt32("key", i)); |
| RETURN_NOT_OK(row->SetInt32("int_val", 12345)); |
| RETURN_NOT_OK(row->SetString("string_val", "hello")); |
| Status result = session->Apply(insert.release()); |
| if (result.IsIOError()) { |
| vector<kudu::client::KuduError*> errors; |
| ElementDeleter drop(&errors); |
| bool overflowed; |
| session->GetPendingErrors(&errors, &overflowed); |
| EXPECT_FALSE(overflowed); |
| EXPECT_EQ(1, errors.size()); |
| EXPECT_TRUE(errors[0]->status().IsNotFound()); |
| return Status::NotFound(Substitute("No range partition covers the insert value $0", i)); |
| } |
| RETURN_NOT_OK(result); |
| } |
| RETURN_NOT_OK(session->Flush()); |
| RETURN_NOT_OK(session->Close()); |
| return Status::OK(); |
| } |
| |
| TEST_F(AdminCliTest, TestAddAndDropUnboundedPartition) { |
| FLAGS_num_tablet_servers = 1; |
| FLAGS_num_replicas = 1; |
| |
| NO_FATALS(BuildAndStart()); |
| |
| const string& master_addr = cluster_->master()->bound_rpc_addr().ToString(); |
| client::sp::shared_ptr<KuduTable> table; |
| |
| // At first, the range partition is unbounded, we can insert any data into it. |
| // We insert 100 rows with key range from 0 to 100, now there are 100 rows. |
| int num_rows = 100; |
| NO_FATALS(InsertTestRows(client_, kTableId, 0, num_rows)); |
| ASSERT_OK(client_->OpenTable(kTableId, &table)); |
| ASSERT_EQ(100, CountTableRows(table.get())); |
| |
| // For the unbounded range partition table, add any range partition will |
| // conflict with it, so we need to drop unbounded range partition first and |
| // then add range partition for it. Since the table is unbounded, it will |
| // drop all rows when dropping range partition. After dropping there will |
| // be 0 rows left. |
| string stdout, stderr; |
| Status s = RunKuduTool({ |
| "table", |
| "drop_range_partition", |
| master_addr, |
| kTableId, |
| "[]", |
| "[]", |
| }, &stdout, &stderr); |
| ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr); |
| ASSERT_EVENTUALLY([&]() { |
| ASSERT_OK(client_->OpenTable(kTableId, &table)); |
| ASSERT_EQ(0, CountTableRows(table.get())); |
| }); |
| |
| // Since the unbounded partition has been dropped, now we can add a new unbounded |
| // range partition for the table. |
| s = RunKuduTool({ |
| "table", |
| "add_range_partition", |
| master_addr, |
| kTableId, |
| "[]", |
| "[]", |
| }, &stdout, &stderr); |
| ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr); |
| |
| // Insert 100 rows with key range from 0 to 100, |
| // now there are 100 rows again. |
| NO_FATALS(InsertTestRows(client_, kTableId, 0, num_rows)); |
| ASSERT_OK(client_->OpenTable(kTableId, &table)); |
| ASSERT_EQ(100, CountTableRows(table.get())); |
| } |
| |
| TEST_F(AdminCliTest, TestAddAndDropRangePartition) { |
| FLAGS_num_tablet_servers = 1; |
| FLAGS_num_replicas = 1; |
| |
| NO_FATALS(BuildAndStart()); |
| |
| // The kTableId's range partition is unbounded, so we need to create another table to |
| // add or drop range partition. |
| const string kTestTableName = "TestTable0"; |
| const string& master_addr = cluster_->master()->bound_rpc_addr().ToString(); |
| |
| // Build the schema. |
| KuduSchema schema; |
| KuduSchemaBuilder builder; |
| builder.AddColumn("key")->Type(KuduColumnSchema::INT32)->NotNull(); |
| builder.AddColumn("int_val")->Type(KuduColumnSchema::INT32)->NotNull(); |
| builder.AddColumn("string_val")->Type(KuduColumnSchema::STRING)->NotNull(); |
| builder.SetPrimaryKey({ "key" }); |
| ASSERT_OK(builder.Build(&schema)); |
| |
| // Set up partitioning and create the table. |
| unique_ptr<KuduPartialRow> lower_bound(schema.NewRow()); |
| ASSERT_OK(lower_bound->SetInt32("key", 0)); |
| unique_ptr<KuduPartialRow> upper_bound(schema.NewRow()); |
| ASSERT_OK(upper_bound->SetInt32("key", 100)); |
| unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator()); |
| ASSERT_OK(table_creator->table_name(kTestTableName) |
| .schema(&schema) |
| .set_range_partition_columns({ "key" }) |
| .add_range_partition(lower_bound.release(), upper_bound.release()) |
| .num_replicas(FLAGS_num_replicas) |
| .Create()); |
| |
| client::sp::shared_ptr<KuduTable> table; |
| |
| // Lambda function to add range partition using kudu CLI. |
| const auto add_range_partition_using_CLI = [&] (const string& lower_bound_json, |
| const string& upper_bound_json, |
| const string& lower_bound_type, |
| const string& upper_bound_type) { |
| string error, out; |
| Status s = RunKuduTool({ |
| "table", |
| "add_range_partition", |
| master_addr, |
| kTestTableName, |
| lower_bound_json, |
| upper_bound_json, |
| Substitute("-lower_bound_type=$0", lower_bound_type), |
| Substitute("-upper_bound_type=$0", upper_bound_type), |
| }, &out, &error); |
| return s; |
| }; |
| |
| // Lambda function to drop range partition using kudu CLI. |
| const auto drop_range_partition_using_CLI = [&] (const string& lower_bound_json, |
| const string& upper_bound_json, |
| const string& lower_bound_type, |
| const string& upper_bound_type) { |
| string error, out; |
| Status s = RunKuduTool({ |
| "table", |
| "drop_range_partition", |
| master_addr, |
| kTestTableName, |
| lower_bound_json, |
| upper_bound_json, |
| Substitute("-lower_bound_type=$0", lower_bound_type), |
| Substitute("-upper_bound_type=$0", upper_bound_type), |
| }, &out, &error); |
| return s; |
| }; |
| |
| const auto check_bounds = [&] (const string& lower_bound, |
| const string& upper_bound, |
| const string& lower_bound_type, |
| const string& upper_bound_type, |
| int start_row_to_insert, |
| int num_rows_to_insert, |
| vector<int> out_of_range_rows_to_insert) { |
| string lower_bound_type_internal = lower_bound_type.empty() ? "INCLUSIVE_BOUND" : |
| lower_bound_type; |
| |
| string upper_bound_type_internal = upper_bound_type.empty() ? "EXCLUSIVE_BOUND" : |
| upper_bound_type; |
| |
| // Add range partition. |
| ASSERT_OK(add_range_partition_using_CLI(lower_bound, upper_bound, |
| lower_bound_type_internal, |
| upper_bound_type_internal)); |
| |
| // Insert num_rows_to_insert rows to table. |
| ASSERT_OK(InsertTestRows(client_, kTestTableName, start_row_to_insert, |
| num_rows_to_insert)); |
| |
| ASSERT_OK(client_->OpenTable(kTestTableName, &table)); |
| ASSERT_EQ(num_rows_to_insert, CountTableRows(table.get())); |
| |
| // Insert rows outside range partition, |
| // which will return error in lambda as we expect. |
| for (auto& value: out_of_range_rows_to_insert) { |
| EXPECT_TRUE(InsertTestRows(client_, kTestTableName, value, 1).IsNotFound()); |
| } |
| |
| // Drop range partition. |
| ASSERT_OK(drop_range_partition_using_CLI(lower_bound, upper_bound, |
| lower_bound_type_internal, |
| upper_bound_type_internal)); |
| |
| ASSERT_EVENTUALLY([&]() { |
| ASSERT_OK(client_->OpenTable(kTestTableName, &table)); |
| // Verify no rows are left. |
| ASSERT_EQ(0, CountTableRows(table.get())); |
| }); |
| }; |
| |
| { |
| // Test specifying the range bound type in lower-case. |
| |
| // Drop the range partition added when create table, the range partition is |
| // [0,100). Insert 100 rows, data range is 0-99. Now there are 100 rows. |
| NO_FATALS(InsertTestRows(client_, kTestTableName, 0, 100)); |
| ASSERT_OK(client_->OpenTable(kTestTableName, &table)); |
| ASSERT_EQ(100, CountTableRows(table.get())); |
| |
| // Drop range partition of [0,100) by command line, now there are 0 rows left. |
| ASSERT_OK(drop_range_partition_using_CLI("[0]", "[100]", "inclusive_bound", |
| "exclusive_bound")); |
| ASSERT_EVENTUALLY([&]() { |
| ASSERT_OK(client_->OpenTable(kTestTableName, &table)); |
| ASSERT_EQ(0, CountTableRows(table.get())); |
| }); |
| } |
| |
| { |
| // Test adding (INCLUSIVE_BOUND, EXCLUSIVE_BOUND) range partition. |
| |
| // Adding [100,200), 100 is inclusive and 200 is exclusive. Then insert |
| // 100 rows , the data range is [100-199]. Insert 99 and 200 to test |
| // insert out of range row. Last, dropping the range partition and checking |
| // that there are 0 rows left. |
| check_bounds("[100]", "[200]", "INCLUSIVE_BOUND", "EXCLUSIVE_BOUND", 100, 100, |
| { 99, 200 }); |
| } |
| |
| { |
| // Test adding (INCLUSIVE_BOUND, INCLUSIVE_BOUND) range partition. |
| |
| // Adding [100,200], both 100 and 200 are inclusive. Then insert 101 |
| // rows, the data range is [100,200]. Insert 99 and 201 to test insert |
| // out of range row. Last, dropping the range partition and checking |
| // that there are 0 rows left. |
| check_bounds("[100]", "[200]", "INCLUSIVE_BOUND", "INCLUSIVE_BOUND", 100, 101, |
| { 99, 201 }); |
| } |
| |
| |
| { |
| // Test adding (EXCLUSIVE_BOUND, INCLUSIVE_BOUND) range partition. |
| |
| // Adding (100,200], 100 is exclusive while 200 is inclusive.Then insert |
| // 100 rows, the data range is (100,200]. Insert 100 and 201 to test |
| // insert out of range row. Last, dropping the range partition and checking |
| // that there are 0 rows left. |
| check_bounds("[100]", "[200]", "EXCLUSIVE_BOUND", "INCLUSIVE_BOUND", 101, 100, |
| { 100, 201 }); |
| } |
| |
| { |
| // Test adding (EXCLUSIVE_BOUND, EXCLUSIVE_BOUND) range partition. |
| |
| // Adding (100,200), both 100 and 200 are exclusive.Then insert 99 rows, |
| // the data range is (100,200). Insert 100 and 200 to test insert out of |
| // range row. Last, dropping the range partition and checking that there |
| // are 0 rows left. |
| check_bounds("[100]", "[200]", "EXCLUSIVE_BOUND", "EXCLUSIVE_BOUND", 101, 99, |
| { 100, 200 }); |
| } |
| |
| { |
| // Test adding (INCLUSIVE_BOUND, UNBOUNDED) range partition. |
| |
| // Adding (1,unbouded), lower range bound is 1, upper range bound is unbounded, |
| // 1 is inclusive. Then insert 100 rows, the data range is [1-100]. Insert 0 |
| // to test insert out of range row. Last, dropping the range partition and |
| // checking that there are 0 rows left. |
| check_bounds("[1]", "[]", "INCLUSIVE_BOUND", "", 1, 100, |
| { 0 }); |
| } |
| |
| { |
| // Test adding (EXCLUSIVE_BOUND, UNBOUNDED) range partition. |
| |
| // Adding (0,unbouded), lower range bound is 0, upper range bound is unbounded, |
| // 0 is exclusive. Then insert 100 rows, the data range |
| // is [2-101]. Insert 1 to test insert out of range row. Last, dropping the range |
| // partition and checking that there are 0 rows left. |
| check_bounds("[1]", "[]", "EXCLUSIVE_BOUND", "", 2, 100, |
| { 1 }); |
| } |
| |
| { |
| // Test adding (UNBOUNDED, INCLUSIVE_BOUND) range partition. |
| |
| // Adding (unbouded,100), lower range bound unbound, upper range bound is 100, |
| // 100 is inclusive. Then insert 100 rows, the data range |
| // is [1-100]. Insert 101 to test insert out of range row. Last, dropping the range |
| // partition and checking that there are 0 rows left. |
| check_bounds("[]", "[100]", "", "INCLUSIVE_BOUND", 1, 100, |
| { 101 }); |
| } |
| |
| { |
| // Test adding (UNBOUNDED, EXCLUSIVE_BOUND) range partition. |
| |
| // Adding (unbouded,100), lower range bound unbound, upper range bound is 100, |
| // 100 is exclusive. Then insert 100 rows, the data range |
| // is [0-99]. Insert 100 to test insert out of range row. Last, dropping the range |
| // partition and checking that there are 0 rows left. |
| check_bounds("[]", "[100]", "", "EXCLUSIVE_BOUND", 0, 100, |
| { 100 }); |
| } |
| } |
| |
| TEST_F(AdminCliTest, TestAddAndDropRangePartitionWithWrongParameters) { |
| FLAGS_num_tablet_servers = 1; |
| FLAGS_num_replicas = 1; |
| |
| NO_FATALS(BuildAndStart()); |
| |
| const string& master_addr = cluster_->master()->bound_rpc_addr().ToString(); |
| const string kTestTableName = "TestTable1"; |
| |
| // Build the schema. |
| KuduSchema schema; |
| KuduSchemaBuilder builder; |
| builder.AddColumn("key")->Type(KuduColumnSchema::INT32)->NotNull(); |
| builder.SetPrimaryKey({ "key" }); |
| ASSERT_OK(builder.Build(&schema)); |
| |
| unique_ptr<KuduPartialRow> lower_bound(schema.NewRow()); |
| ASSERT_OK(lower_bound->SetInt32("key", 0)); |
| unique_ptr<KuduPartialRow> upper_bound(schema.NewRow()); |
| ASSERT_OK(upper_bound->SetInt32("key", 1)); |
| |
| unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator()); |
| ASSERT_OK(table_creator->table_name(kTestTableName) |
| .schema(&schema) |
| .set_range_partition_columns({ "key" }) |
| .add_range_partition(lower_bound.release(), upper_bound.release()) |
| .num_replicas(FLAGS_num_replicas) |
| .Create()); |
| |
| // Lambda function to check bad input, the function will return |
| // OK if running tool to add range partition return RuntimeError, |
| // which as we expect. |
| const auto check_bad_input = [&](const string& lower_bound_json, |
| const string& upper_bound_json, |
| const string& lower_bound_type, |
| const string& upper_bound_type, |
| const string& error) { |
| string out, err; |
| Status s = RunKuduTool({ |
| "table", |
| "add_range_partition", |
| master_addr, |
| kTestTableName, |
| lower_bound_json, |
| upper_bound_json, |
| Substitute("-lower_bound_type=$0", lower_bound_type), |
| Substitute("-upper_bound_type=$0", upper_bound_type), |
| }, &out, &err); |
| ASSERT_TRUE(s.IsRuntimeError()); |
| ASSERT_STR_CONTAINS(err, error); |
| }; |
| |
| // Test providing wrong type of range lower bound type, it will return error. |
| NO_FATALS(check_bad_input("[]", "[]", "test_lower_bound_type", |
| "EXCLUSIVE_BOUND", |
| "wrong type of range lower bound")); |
| |
| // Test providing wrong type of range upper bound type, it will return error. |
| NO_FATALS(check_bad_input("[]", "[]", "INCLUSIVE_BOUND", |
| "test_upper_bound_type", |
| "wrong type of range upper bound")); |
| |
| // Test providing wrong number of range values, it will return error. |
| NO_FATALS(check_bad_input("[1,2]", "[3]", "INCLUSIVE_BOUND", |
| "EXCLUSIVE_BOUND", |
| "wrong number of range columns specified: " |
| "expected 1 but received 2")); |
| |
| // Test providing wrong type of range partition key: string instead of int, |
| // it will return error. |
| NO_FATALS(check_bad_input("[\"hello\"]", "[\"world\"]", "INCLUSIVE_BOUND", |
| "EXCLUSIVE_BOUND", |
| "unable to parse value")); |
| |
| // Test providing incomplete json array of range bound, it will return error. |
| NO_FATALS(check_bad_input("[", "[2]", "INCLUSIVE_BOUND", |
| "EXCLUSIVE_BOUND", |
| "JSON text is corrupt")); |
| |
| // Test providing wrong json array format of range bound, it will return error. |
| NO_FATALS(check_bad_input("[1,]", "[2]", "INCLUSIVE_BOUND", |
| "EXCLUSIVE_BOUND", |
| "JSON text is corrupt")); |
| |
| // Test providing wrong JSON that's not an array, it will return error. |
| NO_FATALS(check_bad_input( |
| "{ \"key\" : 1}", "{\"key\" : 2 }", "INCLUSIVE_BOUND", |
| "EXCLUSIVE_BOUND", |
| "wrong type during field extraction: expected object array")); |
| } |
| |
| TEST_F(AdminCliTest, TestAddAndDropRangePartitionForMultipleRangeColumnsTable) { |
| FLAGS_num_tablet_servers = 1; |
| FLAGS_num_replicas = 1; |
| |
| NO_FATALS(BuildAndStart()); |
| |
| const string& master_addr = cluster_->master()->bound_rpc_addr().ToString(); |
| const string kTestTableName = "TestTable2"; |
| |
| { |
| // Build the schema. |
| KuduSchema schema; |
| KuduSchemaBuilder builder; |
| builder.AddColumn("key_INT8")->Type(KuduColumnSchema::INT8)->NotNull(); |
| builder.AddColumn("key_INT16")->Type(KuduColumnSchema::INT16)->NotNull(); |
| builder.AddColumn("key_INT32")->Type(KuduColumnSchema::INT32)->NotNull(); |
| builder.AddColumn("key_INT64")->Type(KuduColumnSchema::INT64)->NotNull(); |
| builder.AddColumn("key_UNIXTIME_MICROS")-> |
| Type(KuduColumnSchema::UNIXTIME_MICROS)->NotNull(); |
| builder.AddColumn("key_BINARY")->Type(KuduColumnSchema::BINARY)->NotNull(); |
| builder.AddColumn("key_STRING")->Type(KuduColumnSchema::STRING)->NotNull(); |
| builder.SetPrimaryKey({ "key_INT8", "key_INT16", "key_INT32", |
| "key_INT64", "key_UNIXTIME_MICROS", |
| "key_BINARY", "key_STRING" }); |
| ASSERT_OK(builder.Build(&schema)); |
| |
| // Init the range partition and create table. |
| unique_ptr<KuduPartialRow> lower_bound(schema.NewRow()); |
| ASSERT_OK(lower_bound->SetInt8("key_INT8", 0)); |
| ASSERT_OK(lower_bound->SetInt16("key_INT16", 1)); |
| ASSERT_OK(lower_bound->SetInt32("key_INT32", 2)); |
| ASSERT_OK(lower_bound->SetInt64("key_INT64", 3)); |
| ASSERT_OK(lower_bound->SetUnixTimeMicros("key_UNIXTIME_MICROS", 4)); |
| ASSERT_OK(lower_bound->SetBinaryCopy("key_BINARY", "a")); |
| ASSERT_OK(lower_bound->SetString("key_STRING", "b")); |
| |
| unique_ptr<KuduPartialRow> upper_bound(schema.NewRow()); |
| ASSERT_OK(upper_bound->SetInt8("key_INT8", 5)); |
| ASSERT_OK(upper_bound->SetInt16("key_INT16", 6)); |
| ASSERT_OK(upper_bound->SetInt32("key_INT32", 7)); |
| ASSERT_OK(upper_bound->SetInt64("key_INT64", 8)); |
| ASSERT_OK(upper_bound->SetUnixTimeMicros("key_UNIXTIME_MICROS", 9)); |
| ASSERT_OK(upper_bound->SetBinaryCopy("key_BINARY", "c")); |
| ASSERT_OK(upper_bound->SetString("key_STRING", "d")); |
| |
| unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator()); |
| ASSERT_OK(table_creator->table_name(kTestTableName) |
| .schema(&schema) |
| .set_range_partition_columns({ "key_INT8", "key_INT16", "key_INT32", |
| "key_INT64", "key_UNIXTIME_MICROS", |
| "key_BINARY", "key_STRING" }) |
| .add_range_partition(lower_bound.release(), upper_bound.release()) |
| .num_replicas(FLAGS_num_replicas) |
| .Create()); |
| } |
| |
| // Add range partition use CLI. |
| string stdout, stderr; |
| Status s = RunKuduTool({ |
| "table", |
| "add_range_partition", |
| master_addr, |
| kTestTableName, |
| "[10, 11, 12, 13, 14, \"e\", \"f\"]", |
| "[15, 16, 17, 18, 19, \"g\", \"h\"]", |
| }, &stdout, &stderr); |
| ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr); |
| |
| client::sp::shared_ptr<KuduTable> table; |
| ASSERT_OK(client_->OpenTable(kTestTableName, &table)); |
| { |
| // Insert test row. |
| auto session = client_->NewSession(); |
| unique_ptr<KuduInsert> insert(table->NewInsert()); |
| auto* row = insert->mutable_row(); |
| ASSERT_OK(row->SetInt8("key_INT8", 10)); |
| ASSERT_OK(row->SetInt16("key_INT16", 11)); |
| ASSERT_OK(row->SetInt32("key_INT32", 12)); |
| ASSERT_OK(row->SetInt64("key_INT64", 13)); |
| ASSERT_OK(row->SetUnixTimeMicros("key_UNIXTIME_MICROS", 14)); |
| ASSERT_OK(row->SetBinaryCopy("key_BINARY", "e")); |
| ASSERT_OK(row->SetString("key_STRING", "f")); |
| ASSERT_OK(session->Apply(insert.release())); |
| ASSERT_OK(session->Flush()); |
| ASSERT_OK(session->Close()); |
| } |
| |
| // There is 1 row in table now. |
| ASSERT_OK(client_->OpenTable(kTestTableName, &table)); |
| ASSERT_EQ(1, CountTableRows(table.get())); |
| |
| // Drop range partition use CLI. |
| s = RunKuduTool({ |
| "table", |
| "drop_range_partition", |
| master_addr, |
| kTestTableName, |
| "[10, 11, 12, 13, 14, \"e\", \"f\"]", |
| "[15, 16, 17, 18, 19, \"g\", \"h\"]", |
| }, &stdout, &stderr); |
| ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr); |
| |
| // There are 0 rows left. |
| ASSERT_EVENTUALLY([&]() { |
| ASSERT_OK(client_->OpenTable(kTestTableName, &table)); |
| ASSERT_EQ(0, CountTableRows(table.get())); |
| }); |
| } |
| |
| TEST_F(AdminCliTest, AddAndDropRangeWithCustomHashSchema) { |
| FLAGS_num_tablet_servers = 1; |
| FLAGS_num_replicas = 1; |
| |
| NO_FATALS(BuildAndStart()); |
| |
| const string& master_addr = cluster_->master()->bound_rpc_addr().ToString(); |
| constexpr const char* const kTestTableName = "custom_hash_schemas"; |
| constexpr const char* const kC0 = "c0"; |
| constexpr const char* const kC1 = "c1"; |
| constexpr const char* const kC2 = "c2"; |
| |
| { |
| KuduSchemaBuilder builder; |
| builder.AddColumn(kC0)->Type(KuduColumnSchema::INT8)->NotNull(); |
| builder.AddColumn(kC1)->Type(KuduColumnSchema::INT16)->NotNull(); |
| builder.AddColumn(kC2)->Type(KuduColumnSchema::STRING); |
| builder.SetPrimaryKey({ kC0, kC1 }); |
| KuduSchema schema; |
| ASSERT_OK(builder.Build(&schema)); |
| |
| // Create a table with left-unbounded range partition having the |
| // table-wide hash schema. |
| unique_ptr<KuduPartialRow> l(schema.NewRow()); |
| unique_ptr<KuduPartialRow> u(schema.NewRow()); |
| ASSERT_OK(u->SetInt8(kC0, 0)); |
| |
| unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator()); |
| ASSERT_OK(table_creator->table_name(kTestTableName) |
| .schema(&schema) |
| .set_range_partition_columns({ kC0 }) |
| .add_hash_partitions({ kC1 }, 2) |
| .add_range_partition(l.release(), u.release()) |
| .num_replicas(FLAGS_num_replicas) |
| .Create()); |
| } |
| |
| string stdout; |
| string stderr; |
| |
| // Add a range partition with custom hash schema using the kudu CLI tool. |
| { |
| constexpr const char* const kHashSchemaJson = R"*({ |
| "hash_schema": [ |
| { "columns": ["c1"], "num_buckets": 5, "seed": 8 } |
| ] |
| })*"; |
| const auto s = RunKuduTool({ |
| "table", |
| "add_range_partition", |
| master_addr, |
| kTestTableName, |
| "[0]", |
| "[1]", |
| Substitute("--hash_schema=$0", kHashSchemaJson), |
| }, &stdout, &stderr); |
| ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr); |
| } |
| { |
| const auto s = RunKuduTool({ |
| "table", |
| "describe", |
| master_addr, |
| kTestTableName, |
| }, &stdout, &stderr); |
| ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr); |
| ASSERT_STR_CONTAINS(stdout, |
| "PARTITION 0 <= VALUES < 1 HASH(c1) PARTITIONS 5"); |
| } |
| |
| // Insert a row into the newly added range partition. |
| { |
| client::sp::shared_ptr<KuduTable> table; |
| ASSERT_OK(client_->OpenTable(kTestTableName, &table)); |
| |
| auto session = client_->NewSession(); |
| unique_ptr<KuduInsert> insert(table->NewInsert()); |
| auto* row = insert->mutable_row(); |
| ASSERT_OK(row->SetInt8(kC0, 0)); |
| ASSERT_OK(row->SetInt16(kC1, 0)); |
| ASSERT_OK(row->SetString(kC2, "0")); |
| ASSERT_OK(session->Apply(insert.release())); |
| ASSERT_OK(session->Flush()); |
| ASSERT_EQ(1, CountTableRows(table.get())); |
| } |
| |
| // Add unbounded range using the kudu CLI tool. |
| { |
| constexpr const char* const kHashSchemaJson = R"*({ |
| "hash_schema": [ { "columns": ["c0", "c1"], "num_buckets": 3 } ] })*"; |
| const auto s = RunKuduTool({ |
| "table", |
| "add_range_partition", |
| master_addr, |
| kTestTableName, |
| "[1]", |
| "[]", |
| Substitute("--hash_schema=$0", kHashSchemaJson), |
| }, &stdout, &stderr); |
| ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr); |
| } |
| { |
| const auto s = RunKuduTool({ |
| "table", |
| "describe", |
| master_addr, |
| kTestTableName, |
| }, &stdout, &stderr); |
| ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr); |
| ASSERT_STR_CONTAINS(stdout, |
| "PARTITION 0 <= VALUES < 1 HASH(c1) PARTITIONS 5"); |
| ASSERT_STR_CONTAINS(stdout, |
| "PARTITION 1 <= VALUES HASH(c0, c1) PARTITIONS 3"); |
| } |
| |
| // Insert a row into the newly added range partition. |
| { |
| client::sp::shared_ptr<KuduTable> table; |
| ASSERT_OK(client_->OpenTable(kTestTableName, &table)); |
| auto session = client_->NewSession(); |
| unique_ptr<KuduInsert> insert(table->NewInsert()); |
| auto* row = insert->mutable_row(); |
| ASSERT_OK(row->SetInt8(kC0, 10)); |
| ASSERT_OK(row->SetInt16(kC1, 10)); |
| ASSERT_OK(row->SetString(kC2, "10")); |
| ASSERT_OK(session->Apply(insert.release())); |
| ASSERT_OK(session->Flush()); |
| ASSERT_EQ(2, CountTableRows(table.get())); |
| } |
| |
| // Drop all the ranges one-by-one. |
| const vector<pair<string, string>> kRangesStr = { |
| {"", "0"}, {"0", "1"}, {"1", ""}, |
| }; |
| |
| for (const std::pair<string, string>& r : kRangesStr) { |
| SCOPED_TRACE(Substitute("range ['$0', '$1')", r.first, r.second)); |
| const auto s = RunKuduTool({ |
| "table", |
| "drop_range_partition", |
| master_addr, |
| kTestTableName, |
| Substitute("[$0]", r.first), |
| Substitute("[$0]", r.second), |
| }, &stdout, &stderr); |
| ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr); |
| } |
| |
| // There should be 0 rows left. |
| client::sp::shared_ptr<KuduTable> table; |
| ASSERT_OK(client_->OpenTable(kTestTableName, &table)); |
| ASSERT_EVENTUALLY([&]() { |
| ASSERT_EQ(0, CountTableRows(table.get())); |
| }); |
| } |
| |
| namespace { |
| constexpr const char* kPrincipal = "oryx"; |
| |
| vector<string> RebuildMasterCmd(const ExternalMiniCluster& cluster, |
| int tserver_num, |
| bool is_secure, |
| bool log_to_stderr = false, |
| const string& tables = "", |
| const int& default_replica_num = 1) { |
| CHECK_GT(tserver_num, 0); |
| CHECK_LE(tserver_num, cluster.num_tablet_servers()); |
| vector<string> command = { |
| "master", |
| "unsafe_rebuild", |
| "-fs_data_dirs", |
| JoinStrings(cluster.master()->data_dirs(), ","), |
| "-fs_wal_dir", |
| cluster.master()->wal_dir(), |
| }; |
| if (!tables.empty()) { |
| command.emplace_back(Substitute("-tables=$0", tables)); |
| } |
| if (Env::Default()->IsEncryptionEnabled()) { |
| command.emplace_back("--encrypt_data_at_rest=true"); |
| } |
| if (log_to_stderr) { |
| command.emplace_back("--logtostderr"); |
| } |
| if (is_secure) { |
| command.emplace_back(Substitute("--sasl_protocol_name=$0", kPrincipal)); |
| } |
| for (int i = 0; i < tserver_num; i++) { |
| auto* ts = cluster.tablet_server(i); |
| command.emplace_back(ts->bound_rpc_hostport().ToString()); |
| } |
| command.emplace_back(Substitute("--default_num_replicas=$0", default_replica_num)); |
| return command; |
| } |
| |
| void MakeTestTable(const string& table_name, |
| int num_rows, |
| int num_replicas, |
| ExternalMiniCluster* cluster) { |
| TestWorkload workload(cluster); |
| workload.set_table_name(table_name); |
| workload.set_num_replicas(num_replicas); |
| workload.Setup(); |
| workload.Start(); |
| while (workload.rows_inserted() < num_rows) { |
| SleepFor(MonoDelta::FromMilliseconds(10)); |
| } |
| workload.StopAndJoin(); |
| } |
| } // anonymous namespace |
| |
| // Test failing to run the CLI because the master is non-empty. |
| TEST_F(AdminCliTest, TestRebuildMasterWhenNonEmpty) { |
| FLAGS_num_tablet_servers = 3; |
| NO_FATALS(BuildAndStart({}, {}, {}, /*create_table*/false)); |
| |
| constexpr const char* kTable = "default.foo"; |
| NO_FATALS(MakeTestTable(kTable, /*num_rows*/10, /*num_replicas*/1, cluster_.get())); |
| NO_FATALS(cluster_->master()->Shutdown()); |
| string stdout; |
| string stderr; |
| Status s = RunKuduTool(RebuildMasterCmd(*cluster_, FLAGS_num_tablet_servers, |
| /*is_secure*/false, /*log_to_stderr*/true), |
| &stdout, &stderr); |
| ASSERT_TRUE(s.IsRuntimeError()) << s.ToString(); |
| ASSERT_STR_CONTAINS(stderr, "must be empty"); |
| |
| // Delete the contents of the old master from disk. This should allow the |
| // tool to run. |
| ASSERT_OK(cluster_->master()->DeleteFromDisk()); |
| ASSERT_OK(RunKuduTool(RebuildMasterCmd(*cluster_, FLAGS_num_tablet_servers, |
| /*is_secure*/false, /*log_to_stderr*/true), |
| &stdout, &stderr)); |
| ASSERT_STR_NOT_CONTAINS(stderr, "must be empty"); |
| ASSERT_STR_CONTAINS(stdout, |
| "Rebuilt from 3 tablet servers, of which 0 had errors"); |
| ASSERT_STR_CONTAINS(stdout, "Rebuilt from 1 replicas, of which 0 had errors"); |
| } |
| |
| void delete_table_in_syscatalog(const string& wal_dir, |
| const std::vector<std::string>& data_dirs, |
| const string& del_table_name) { |
| MasterOptions opts; |
| opts.fs_opts.wal_root = wal_dir; |
| opts.fs_opts.data_roots = data_dirs; |
| Master master(opts); |
| ASSERT_OK(master.Init()); |
| SysCatalogTable sys_catalog(&master, &NoOpCb); |
| ASSERT_OK(sys_catalog.Load(master.fs_manager())); |
| // Get table from sys_catalog. |
| const auto kLeaderTimeout = MonoDelta::FromSeconds(10); |
| ASSERT_OK(sys_catalog.tablet_replica()->consensus()->WaitUntilLeader(kLeaderTimeout)); |
| TableInfoLoader table_info_loader; |
| sys_catalog.VisitTables(&table_info_loader); |
| scoped_refptr<TableInfo> table_info; |
| for (const auto& table : table_info_loader.tables) { |
| table->metadata().ReadLock(); |
| string table_name = table->metadata().state().name(); |
| table->metadata().ReadUnlock(); |
| if (table_name == del_table_name) { |
| table_info = table; |
| break; |
| } |
| } |
| ASSERT_NE(nullptr, table_info); |
| |
| // Get tablet from sys_catalog. |
| TabletInfoLoader tablet_info_loader; |
| sys_catalog.VisitTablets(&tablet_info_loader); |
| vector<scoped_refptr<TabletInfo>> tablets; |
| for (const auto& tablet : tablet_info_loader.tablets) { |
| tablet->metadata().ReadLock(); |
| if (tablet->metadata().state().pb.table_id() == table_info->id()) |
| tablets.push_back(tablet); |
| tablet->metadata().ReadUnlock(); |
| } |
| ASSERT_GT(tablets.size(), 0); |
| |
| // Delete one table and it's tablets. |
| TableMetadataLock l_table(table_info.get(), LockMode::WRITE); |
| TabletMetadataGroupLock l_tablets(LockMode::RELEASED); |
| l_tablets.AddMutableInfos(tablets); |
| l_tablets.Lock(LockMode::WRITE); |
| SysCatalogTable::Actions actions; |
| actions.table_to_delete = table_info; |
| actions.tablets_to_delete = tablets; |
| ASSERT_OK(sys_catalog.Write(actions)); |
| |
| NO_FATALS(sys_catalog.Shutdown()); |
| NO_FATALS(master.Shutdown()); |
| } |
| |
| // Rebuild tables according to part of tables not all tables. |
| TEST_F(AdminCliTest, TestRebuildTables) { |
| FLAGS_num_tablet_servers = 3; |
| NO_FATALS(BuildAndStart({}, {}, {}, /*create_table*/false)); |
| // Create 3 tables. |
| constexpr const char* kTable1 = "TestTable"; |
| NO_FATALS(MakeTestTable(kTable1, /*num_rows*/10, /*num_replicas*/1, cluster_.get())); |
| constexpr const char* kTable2 = "TestTable1"; |
| NO_FATALS(MakeTestTable(kTable2, /*num_rows*/10, /*num_replicas*/1, cluster_.get())); |
| constexpr const char* kTable3 = "TestTable2"; |
| NO_FATALS(MakeTestTable(kTable3, /*num_rows*/10, /*num_replicas*/1, cluster_.get())); |
| |
| const string& part_tables = Substitute("$0,$1", kTable1, kTable2); |
| NO_FATALS(cluster_->master()->Shutdown()); |
| |
| string stdout1; |
| string stderr1; |
| // Rebuild 2 tables in update mode. |
| ASSERT_OK(RunKuduTool(RebuildMasterCmd(*cluster_, FLAGS_num_tablet_servers, |
| /*is_secure*/false, /*log_to_stderr*/true, |
| part_tables, /*default_replica_num*/1), |
| &stdout1, &stderr1)); |
| ASSERT_STR_CONTAINS(stdout1, |
| "Rebuilt from 3 tablet servers, of which 0 had errors"); |
| ASSERT_STR_CONTAINS(stdout1, "Rebuilt from 2 replicas, of which 0 had errors"); |
| for (int i = 0; i < cluster_->num_tablet_servers(); i++) { |
| cluster_->tablet_server(i)->Shutdown(); |
| } |
| // Restart the cluster to check cluster healthy. |
| cluster_->Restart(); |
| WaitForTSAndReplicas(); |
| ClusterVerifier cv1(cluster_.get()); |
| NO_FATALS(cv1.CheckCluster()); |
| |
| NO_FATALS(cluster_->master()->Shutdown()); |
| // Delete kTable1 in syscatalog. |
| delete_table_in_syscatalog(cluster_->master()->wal_dir(), |
| cluster_->master()->data_dirs(), |
| kTable1); |
| string stdout2; |
| string stderr2; |
| // Rebuild kTable1 in add mode. |
| ASSERT_OK(RunKuduTool(RebuildMasterCmd(*cluster_, FLAGS_num_tablet_servers, |
| /*is_secure*/false, /*log_to_stderr*/true, kTable1), |
| &stdout2, &stderr2)); |
| ASSERT_STR_NOT_CONTAINS(stderr2, "must be empty"); |
| ASSERT_STR_CONTAINS(stdout2, |
| "Rebuilt from 3 tablet servers, of which 0 had errors"); |
| ASSERT_STR_CONTAINS(stdout2, "Rebuilt from 1 replicas, of which 0 had errors"); |
| for (int i = 0; i < cluster_->num_tablet_servers(); i++) { |
| cluster_->tablet_server(i)->Shutdown(); |
| } |
| // Restart the cluster to check cluster healthy. |
| cluster_->Restart(); |
| WaitForTSAndReplicas(); |
| ClusterVerifier cv2(cluster_.get()); |
| NO_FATALS(cv2.CheckCluster()); |
| } |
| |
| // Test that the master rebuilder ignores tombstones. |
| TEST_F(AdminCliTest, TestRebuildMasterWithTombstones) { |
| FLAGS_num_tablet_servers = 3; |
| NO_FATALS(BuildAndStart({}, {}, {}, /*create_table*/false)); |
| |
| constexpr const char* kTable = "default.foo"; |
| const MonoDelta kTimeout = MonoDelta::FromSeconds(10); |
| NO_FATALS(MakeTestTable(kTable, /*num_rows*/10, /*num_replicas*/1, cluster_.get())); |
| TabletServerMap ts_map; |
| ASSERT_OK(CreateTabletServerMap(cluster_->master_proxy(), |
| cluster_->messenger(), |
| &ts_map)); |
| ValueDeleter deleter(&ts_map); |
| // Find the single tablet replica and tombstone it. |
| string tablet_id; |
| TServerDetails* ts_details = nullptr; |
| for (const auto& id_and_details : ts_map) { |
| vector<string> tablet_ids; |
| ts_details = id_and_details.second; |
| ASSERT_OK(ListRunningTabletIds(ts_details, kTimeout, &tablet_ids)); |
| if (!tablet_ids.empty()) { |
| tablet_id = tablet_ids[0]; |
| break; |
| } |
| } |
| ASSERT_FALSE(tablet_id.empty()); |
| ASSERT_OK(itest::DeleteTablet(ts_details, tablet_id, tablet::TABLET_DATA_TOMBSTONED, kTimeout)); |
| ASSERT_OK(itest::WaitUntilTabletInState( |
| ts_details, tablet_id, tablet::TabletStatePB::STOPPED, kTimeout)); |
| |
| // Rebuild the master. The tombstone shouldn't be rebuilt. |
| NO_FATALS(cluster_->master()->Shutdown()); |
| ASSERT_OK(cluster_->master()->DeleteFromDisk()); |
| string stdout; |
| string stderr; |
| ASSERT_OK(RunKuduTool(RebuildMasterCmd(*cluster_, FLAGS_num_tablet_servers, |
| /*is_secure*/false, /*log_to_stderr*/true), |
| &stdout, &stderr)); |
| ASSERT_STR_CONTAINS(stderr, Substitute("Skipping replica of tablet $0 of table $1", |
| tablet_id, kTable)); |
| ASSERT_STR_CONTAINS(stdout, |
| "Rebuilt from 3 tablet servers, of which 0 had errors"); |
| ASSERT_STR_CONTAINS(stdout, "Rebuilt from 0 replicas, of which 0 had errors"); |
| |
| |
| for (int i = 0; i < cluster_->num_tablet_servers(); i++) { |
| cluster_->tablet_server(i)->Shutdown(); |
| } |
| ASSERT_OK(cluster_->Restart()); |
| |
| // Clients shouldn't be able to access the table -- it wasn't rebuilt. |
| KuduClientBuilder builder; |
| ASSERT_OK(cluster_->CreateClient(&builder, &client_)); |
| client::sp::shared_ptr<KuduTable> table; |
| Status s = client_->OpenTable(kTable, &table); |
| ASSERT_TRUE(s.IsNotFound()) << s.ToString(); |
| |
| // We should still be able to create a table of the same name though. |
| NO_FATALS(MakeTestTable(kTable, /*num_rows*/10, /*num_replicas*/1, cluster_.get())); |
| ASSERT_OK(client_->OpenTable(kTable, &table)); |
| |
| ClusterVerifier cv(cluster_.get()); |
| NO_FATALS(cv.CheckCluster()); |
| } |
| |
| TEST_F(AdminCliTest, TestAddColumnsAndRebuildMaster) { |
| FLAGS_num_tablet_servers = 3; |
| FLAGS_num_replicas = 3; |
| |
| NO_FATALS(BuildAndStart()); |
| |
| // Add a column and shutdown a tserver, the tserver holds a schema with a lower version. |
| { |
| unique_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer(kTableId)); |
| table_alterer->AddColumn("old_column_0")->Type(KuduColumnSchema::INT32); |
| ASSERT_OK(table_alterer->Alter()); |
| } |
| NO_FATALS(cluster_->tablet_server(0)->Shutdown()); |
| |
| // Add another column, the latest schema has a higher version. |
| { |
| unique_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer(kTableId)); |
| table_alterer->AddColumn("old_column_1")->Type(KuduColumnSchema::INT32); |
| ASSERT_OK(table_alterer->Alter()); |
| } |
| |
| // Shut down the master and wipe out its data. |
| NO_FATALS(cluster_->master()->Shutdown()); |
| ASSERT_OK(cluster_->master()->DeleteFromDisk()); |
| |
| // Restart the shutdown tserver, which holds a lower version schema. |
| ASSERT_OK(cluster_->tablet_server(0)->Restart()); |
| |
| // Rebuild the master with the tool. |
| // The tool will firstly use schema on tserver-0 which holds an outdated schema, then |
| // use the newer schema on tserver-1 to rebuild master. |
| string stdout; |
| ASSERT_OK(RunKuduTool(RebuildMasterCmd(*cluster_, 2, |
| /*is_secure*/false, /*log_to_stderr*/true, "", 3), |
| &stdout)); |
| ASSERT_STR_CONTAINS(stdout, "Rebuilt from 2 tablet servers, of which 0 had errors"); |
| ASSERT_STR_CONTAINS(stdout, "Rebuilt from 2 replicas, of which 0 had errors"); |
| |
| // Restart the master and the tablet servers. |
| // The tablet servers must be restarted so they accept the new master's certs. |
| for (int i = 0; i < FLAGS_num_tablet_servers; i++) { |
| cluster_->tablet_server(i)->Shutdown(); |
| } |
| ASSERT_OK(cluster_->Restart()); |
| WaitForTSAndReplicas(); |
| |
| // Wait the cluster to become healthy. |
| string master_address = cluster_->master()->bound_rpc_addr().ToString(); |
| ASSERT_EVENTUALLY([&]() { |
| ASSERT_OK(RunKuduTool({"cluster", "ksck", master_address}, nullptr, nullptr)); |
| }); |
| |
| // The client has to be rebuilt since there's a new master. |
| KuduClientBuilder builder; |
| ASSERT_OK(cluster_->CreateClient(&builder, &client_)); |
| |
| // Make sure we can add columns to the table we rebuilt. |
| { |
| unique_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer(kTableId)); |
| table_alterer->AddColumn("new_column_0")->Type(KuduColumnSchema::INT32); |
| ASSERT_OK(table_alterer->Alter()); |
| } |
| |
| // Check master and all tservers have the latest schema. |
| ASSERT_OK(RunKuduTool({"table", "describe", master_address, kTableId}, &stdout)); |
| ASSERT_STR_MATCHES(stdout, "old_column_0.*old_column_1.*new_column_0"); |
| for (int i = 0; i < FLAGS_num_tablet_servers; i++) { |
| const string& ts_addr = cluster_->tablet_server(i)->bound_rpc_addr().ToString(); |
| ASSERT_OK(RunKuduTool({"remote_replica", "list", ts_addr, "-include_schema"}, &stdout)); |
| ASSERT_STR_MATCHES(stdout, "old_column_0.*old_column_1.*new_column_0"); |
| } |
| |
| // Check the altered table schema in client view, and check it is writable and readable. |
| { |
| KuduSchema schema; |
| ASSERT_OK(client_->GetTableSchema(kTableId, &schema)); |
| ASSERT_EQ(6, schema.num_columns()); |
| ASSERT_TRUE(schema.HasColumn("old_column_0", nullptr)); |
| ASSERT_TRUE(schema.HasColumn("old_column_1", nullptr)); |
| ASSERT_TRUE(schema.HasColumn("new_column_0", nullptr)); |
| |
| client::sp::shared_ptr<KuduTable> table; |
| ASSERT_OK(client_->OpenTable(kTableId, &table)); |
| |
| TestWorkload workload(cluster_.get()); |
| workload.set_table_name(kTableId); |
| workload.set_num_write_threads(1); |
| workload.set_schema(table->schema()); |
| workload.Setup(); |
| workload.Start(); |
| while (workload.rows_inserted() < 1000) { |
| SleepFor(MonoDelta::FromMilliseconds(10)); |
| } |
| workload.StopAndJoin(); |
| |
| vector<string> rows; |
| ScanTableToStrings(table.get(), &rows); |
| for (const auto& row : rows) { |
| ASSERT_STR_MATCHES(row, "old_column_0.*old_column_1.*new_column_0"); |
| } |
| } |
| } |
| |
| class SecureClusterAdminCliTest : public KuduTest { |
| public: |
| void SetUpCluster(bool is_secure) { |
| ExternalMiniClusterOptions opts; |
| opts.num_tablet_servers = 3; |
| if (is_secure) { |
| opts.enable_kerberos = true; |
| opts.principal = "oryx"; |
| } |
| cluster_.reset(new ExternalMiniCluster(std::move(opts))); |
| ASSERT_OK(cluster_->Start()); |
| KuduClientBuilder builder; |
| builder.sasl_protocol_name(kPrincipal); |
| ASSERT_OK(cluster_->CreateClient(&builder, &client_)); |
| } |
| |
| void SetUp() override { |
| NO_FATALS(SetUpCluster(/*is_secure*/true)); |
| } |
| protected: |
| unique_ptr<ExternalMiniCluster> cluster_; |
| client::sp::shared_ptr<KuduClient> client_; |
| }; |
| |
| TEST_F(SecureClusterAdminCliTest, TestNonDefaultPrincipal) { |
| ASSERT_OK(RunKuduTool({"master", |
| "list", |
| Substitute("--sasl_protocol_name=$0", kPrincipal), |
| HostPort::ToCommaSeparatedString(cluster_->master_rpc_addrs())})); |
| } |
| |
| class SecureClusterAdminCliParamTest : public SecureClusterAdminCliTest, |
| public ::testing::WithParamInterface<bool> { |
| public: |
| void SetUp() override { |
| SetUpCluster(/*is_secure*/GetParam()); |
| } |
| }; |
| |
| // Basic test that the master rebuilder works in the happy case. |
| TEST_P(SecureClusterAdminCliParamTest, TestRebuildMaster) { |
| bool is_secure = GetParam(); |
| constexpr const char* kPreRebuildTableName = "pre_rebuild"; |
| constexpr const char* kPostRebuildTableName = "post_rebuild"; |
| constexpr int kNumRows = 10000; |
| |
| FLAGS_num_tablet_servers = 3; |
| FLAGS_num_replicas = 3; |
| |
| // Create a table and insert some rows |
| NO_FATALS(MakeTestTable(kPreRebuildTableName, kNumRows, /*num_replicas*/3, cluster_.get())); |
| |
| // Scan the table to strings so we can check it isn't corrupted post-rebuild. |
| vector<string> rows; |
| { |
| client::sp::shared_ptr<KuduTable> table; |
| ASSERT_OK(client_->OpenTable(kPreRebuildTableName, &table)); |
| ScanTableToStrings(table.get(), &rows); |
| } |
| |
| // Shut down the master and wipe out its data. |
| NO_FATALS(cluster_->master()->Shutdown()); |
| ASSERT_OK(cluster_->master()->DeleteFromDisk()); |
| |
| // Rebuild the master with the tool. |
| string stdout; |
| ASSERT_OK(RunKuduTool(RebuildMasterCmd(*cluster_, FLAGS_num_tablet_servers, is_secure), |
| &stdout)); |
| ASSERT_STR_CONTAINS(stdout, |
| "Rebuilt from 3 tablet servers, of which 0 had errors"); |
| ASSERT_STR_CONTAINS(stdout, "Rebuilt from 3 replicas, of which 0 had errors"); |
| |
| // Restart the master and the tablet servers. |
| // The tablet servers must be restarted so they accept the new master's certs. |
| for (int i = 0; i < cluster_->num_tablet_servers(); i++) { |
| cluster_->tablet_server(i)->Shutdown(); |
| } |
| ASSERT_OK(cluster_->Restart()); |
| |
| // Verify we can read back exactly what we read before the rebuild. |
| // The client has to be rebuilt since there's a new master. |
| KuduClientBuilder builder; |
| ASSERT_OK(cluster_->CreateClient(&builder, &client_)); |
| vector<string> postrebuild_rows; |
| { |
| client::sp::shared_ptr<KuduTable> table; |
| ASSERT_OK(client_->OpenTable(kPreRebuildTableName, &table)); |
| ScanTableToStrings(table.get(), &postrebuild_rows); |
| } |
| ASSERT_EQ(rows.size(), postrebuild_rows.size()); |
| for (int i = 0; i < rows.size(); i++) { |
| ASSERT_EQ(rows[i], postrebuild_rows[i]) << "Mismatch at row " << i |
| << " of " << rows.size(); |
| } |
| // The cluster should still be considered healthy. |
| FLAGS_sasl_protocol_name = kPrincipal; |
| ClusterVerifier cv(cluster_.get()); |
| NO_FATALS(cv.CheckCluster()); |
| |
| // Make sure we can delete the table we created before the rebuild. |
| ASSERT_OK(client_->DeleteTable(kPreRebuildTableName)); |
| |
| // Make sure we can create a table and write to it. |
| NO_FATALS(MakeTestTable(kPostRebuildTableName, kNumRows, /*num_replicas*/3, cluster_.get())); |
| NO_FATALS(cv.CheckCluster()); |
| } |
| |
| TEST_P(SecureClusterAdminCliParamTest, TestRebuildMasterAndAddColumns) { |
| bool is_secure = GetParam(); |
| constexpr const char* kTableName = "rebuild_and_add_columns"; |
| constexpr int kNumRows = 10000; |
| |
| FLAGS_num_tablet_servers = 3; |
| FLAGS_num_replicas = 3; |
| |
| // Create a table and insert some rows. |
| NO_FATALS(MakeTestTable(kTableName, kNumRows, /*num_replicas*/3, cluster_.get())); |
| |
| // Shut down the master and wipe out its data. |
| NO_FATALS(cluster_->master()->Shutdown()); |
| ASSERT_OK(cluster_->master()->DeleteFromDisk()); |
| |
| // Rebuild the master with the tool. |
| string stdout; |
| ASSERT_OK(RunKuduTool(RebuildMasterCmd(*cluster_, FLAGS_num_tablet_servers, is_secure), |
| &stdout)); |
| ASSERT_STR_CONTAINS(stdout, |
| "Rebuilt from 3 tablet servers, of which 0 had errors"); |
| ASSERT_STR_CONTAINS(stdout, "Rebuilt from 3 replicas, of which 0 had errors"); |
| |
| // Restart the master and the tablet servers. |
| // The tablet servers must be restarted so they accept the new master's certs. |
| for (int i = 0; i < cluster_->num_tablet_servers(); i++) { |
| cluster_->tablet_server(i)->Shutdown(); |
| } |
| ASSERT_OK(cluster_->Restart()); |
| |
| // The client has to be rebuilt since there's a new master. |
| KuduClientBuilder builder; |
| ASSERT_OK(cluster_->CreateClient(&builder, &client_)); |
| |
| // Make sure we can add columns to the table we rebuilt. |
| unique_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer(kTableName)); |
| for (int i = 0; i < 10; i++) { |
| table_alterer->AddColumn(Substitute("c$0", i))->Type(KuduColumnSchema::INT32); |
| } |
| ASSERT_OK(table_alterer->Alter()); |
| |
| // Check the altered table is readable. |
| { |
| vector<string> rows; |
| client::sp::shared_ptr<KuduTable> table; |
| ASSERT_OK(client_->OpenTable(kTableName, &table)); |
| ScanTableToStrings(table.get(), &rows); |
| } |
| |
| // The cluster should still be considered healthy. |
| FLAGS_sasl_protocol_name = kPrincipal; |
| ClusterVerifier cv(cluster_.get()); |
| NO_FATALS(cv.CheckCluster()); |
| } |
| |
| INSTANTIATE_TEST_SUITE_P(IsSecure, SecureClusterAdminCliParamTest, |
| ::testing::Bool()); |
| |
| |
| } // namespace tools |
| } // namespace kudu |