| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "kudu/integration-tests/cluster_itest_util.h" |
| |
| #include <algorithm> |
| #include <functional> |
| #include <optional> |
| #include <ostream> |
| #include <set> |
| #include <utility> |
| |
| #include <glog/logging.h> |
| #include <glog/stl_logging.h> |
| #include <gtest/gtest.h> |
| #include <rapidjson/document.h> |
| |
| #include "kudu/client/schema.h" |
| #include "kudu/common/common.pb.h" |
| #include "kudu/common/schema.h" |
| #include "kudu/common/wire_protocol-test-util.h" |
| #include "kudu/common/wire_protocol.h" |
| #include "kudu/common/wire_protocol.pb.h" |
| #include "kudu/consensus/consensus.proxy.h" |
| #include "kudu/consensus/opid.pb.h" |
| #include "kudu/consensus/opid_util.h" |
| #include "kudu/consensus/quorum_util.h" |
| #include "kudu/gutil/basictypes.h" |
| #include "kudu/gutil/map-util.h" |
| #include "kudu/gutil/port.h" |
| #include "kudu/gutil/stl_util.h" |
| #include "kudu/gutil/strings/substitute.h" |
| #include "kudu/gutil/strings/util.h" |
| #include "kudu/master/master.pb.h" |
| #include "kudu/master/master.proxy.h" |
| #include "kudu/mini-cluster/external_mini_cluster.h" |
| #include "kudu/rpc/rpc_controller.h" |
| #include "kudu/rpc/rpc_header.pb.h" |
| #include "kudu/tablet/tablet.pb.h" |
| #include "kudu/tserver/tablet_copy.proxy.h" |
| #include "kudu/tserver/tablet_server_test_util.h" |
| #include "kudu/tserver/tserver_admin.pb.h" |
| #include "kudu/tserver/tserver_admin.proxy.h" |
| #include "kudu/tserver/tserver_service.proxy.h" |
| #include "kudu/util/curl_util.h" |
| #include "kudu/util/faststring.h" |
| #include "kudu/util/jsonreader.h" |
| #include "kudu/util/metrics.h" |
| #include "kudu/util/monotime.h" |
| #include "kudu/util/net/net_util.h" |
| #include "kudu/util/net/sockaddr.h" |
| #include "kudu/util/pb_util.h" |
| #include "kudu/util/status.h" |
| #include "kudu/util/test_macros.h" |
| #include "kudu/util/test_util.h" |
| |
| using kudu::client::KuduSchema; |
| using kudu::client::KuduSchemaBuilder; |
| using kudu::cluster::ExternalTabletServer; |
| using kudu::consensus::BulkChangeConfigRequestPB; |
| using kudu::consensus::ChangeConfigRequestPB; |
| using kudu::consensus::ChangeConfigResponsePB; |
| using kudu::consensus::ConsensusStatePB; |
| using kudu::consensus::CountVoters; |
| using kudu::consensus::EXCLUDE_HEALTH_REPORT; |
| using kudu::consensus::GetConsensusStateRequestPB; |
| using kudu::consensus::GetConsensusStateResponsePB; |
| using kudu::consensus::GetLastOpIdRequestPB; |
| using kudu::consensus::GetLastOpIdResponsePB; |
| using kudu::consensus::IncludeHealthReport; |
| using kudu::consensus::LeaderStepDownRequestPB; |
| using kudu::consensus::LeaderStepDownResponsePB; |
| using kudu::consensus::OpId; |
| using kudu::consensus::OpIdType; |
| using kudu::consensus::RaftPeerPB; |
| using kudu::consensus::RunLeaderElectionResponsePB; |
| using kudu::consensus::RunLeaderElectionRequestPB; |
| using kudu::consensus::VoteRequestPB; |
| using kudu::consensus::VoteResponsePB; |
| using kudu::consensus::kInvalidOpIdIndex; |
| using kudu::master::ListTabletServersResponsePB_Entry; |
| using kudu::master::MasterServiceProxy; |
| using kudu::pb_util::SecureDebugString; |
| using kudu::pb_util::SecureShortDebugString; |
| using kudu::rpc::Messenger; |
| using kudu::rpc::RpcController; |
| using kudu::tablet::TabletDataState; |
| using kudu::tserver::CreateTsClientProxies; |
| using kudu::tserver::ListTabletsResponsePB; |
| using kudu::tserver::DeleteTabletRequestPB; |
| using kudu::tserver::DeleteTabletResponsePB; |
| using kudu::tserver::BeginTabletCopySessionRequestPB; |
| using kudu::tserver::BeginTabletCopySessionResponsePB; |
| using kudu::tserver::TabletCopyErrorPB; |
| using kudu::tserver::TabletServerErrorPB; |
| using kudu::tserver::WriteRequestPB; |
| using kudu::tserver::WriteResponsePB; |
| using rapidjson::Value; |
| using std::min; |
| using std::optional; |
| using std::shared_ptr; |
| using std::string; |
| using std::unique_ptr; |
| using std::unordered_map; |
| using std::vector; |
| using strings::Substitute; |
| |
| namespace kudu { |
| namespace itest { |
| |
| const string& TServerDetails::uuid() const { |
| return instance_id.permanent_uuid(); |
| } |
| |
| string TServerDetails::ToString() const { |
| return Substitute("TabletServer: $0, Rpc address: $1", |
| instance_id.permanent_uuid(), |
| SecureShortDebugString(registration.rpc_addresses(0))); |
| } |
| |
| client::KuduSchema SimpleIntKeyKuduSchema() { |
| KuduSchema s; |
| KuduSchemaBuilder b; |
| b.AddColumn("key")->Type(client::KuduColumnSchema::INT32)->NotNull()->PrimaryKey(); |
| CHECK_OK(b.Build(&s)); |
| return s; |
| } |
| |
| Status GetLastOpIdForEachReplica(const string& tablet_id, |
| const vector<const TServerDetails*>& replicas, |
| OpIdType opid_type, |
| const MonoDelta& timeout, |
| vector<OpId>* op_ids) { |
| GetLastOpIdRequestPB opid_req; |
| GetLastOpIdResponsePB opid_resp; |
| opid_req.set_tablet_id(tablet_id); |
| RpcController controller; |
| |
| op_ids->clear(); |
| for (const auto* ts : replicas) { |
| controller.Reset(); |
| controller.set_timeout(timeout); |
| opid_resp.Clear(); |
| opid_req.set_dest_uuid(ts->uuid()); |
| opid_req.set_tablet_id(tablet_id); |
| opid_req.set_opid_type(opid_type); |
| RETURN_NOT_OK_PREPEND( |
| ts->consensus_proxy->GetLastOpId(opid_req, &opid_resp, &controller), |
| Substitute("Failed to fetch last op id from $0", |
| SecureShortDebugString(ts->instance_id))); |
| op_ids->push_back(opid_resp.opid()); |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status GetLastOpIdForReplica(const std::string& tablet_id, |
| const TServerDetails* replica, |
| OpIdType opid_type, |
| const MonoDelta& timeout, |
| consensus::OpId* op_id) { |
| vector<OpId> op_ids; |
| RETURN_NOT_OK(GetLastOpIdForEachReplica(tablet_id, { replica }, opid_type, timeout, &op_ids)); |
| CHECK_EQ(1, op_ids.size()); |
| *op_id = op_ids[0]; |
| return Status::OK(); |
| } |
| |
| Status WaitForOpFromCurrentTerm(TServerDetails* replica, |
| const string& tablet_id, |
| OpIdType opid_type, |
| const MonoDelta& timeout, |
| OpId* opid) { |
| const MonoTime kStart = MonoTime::Now(); |
| const MonoTime kDeadline = kStart + timeout; |
| |
| Status s; |
| while (MonoTime::Now() < kDeadline) { |
| ConsensusStatePB cstate; |
| s = GetConsensusState(replica, tablet_id, kDeadline - MonoTime::Now(), EXCLUDE_HEALTH_REPORT, |
| &cstate); |
| if (s.ok()) { |
| OpId tmp_opid; |
| s = GetLastOpIdForReplica(tablet_id, replica, opid_type, kDeadline - MonoTime::Now(), |
| &tmp_opid); |
| if (s.ok()) { |
| if (tmp_opid.term() == cstate.current_term()) { |
| if (opid) { |
| opid->Swap(&tmp_opid); |
| } |
| return Status::OK(); |
| } |
| s = Status::IllegalState(Substitute("Terms don't match. Current term: $0. Latest OpId: $1", |
| cstate.current_term(), OpIdToString(tmp_opid))); |
| } |
| } |
| SleepFor(MonoDelta::FromMilliseconds(10)); |
| } |
| |
| return Status::TimedOut(Substitute("Timed out after $0 waiting for op from current term: $1", |
| (MonoTime::Now() - kStart).ToString(), |
| s.ToString())); |
| } |
| |
| Status WaitForServersToAgree(const MonoDelta& timeout, |
| const TabletServerMap& tablet_servers, |
| const string& tablet_id, |
| int64_t minimum_index, |
| consensus::OpIdType op_id_type) { |
| const MonoTime deadline = MonoTime::Now() + timeout; |
| |
| vector<const TServerDetails*> servers; |
| AppendValuesFromMap(tablet_servers, &servers); |
| for (int i = 1; MonoTime::Now() < deadline; i++) { |
| vector<OpId> ids; |
| Status s = GetLastOpIdForEachReplica( |
| tablet_id, servers, op_id_type, timeout, &ids); |
| if (s.ok()) { |
| bool any_behind = false; |
| bool any_disagree = false; |
| int64_t cur_index = kInvalidOpIdIndex; |
| for (const OpId& id : ids) { |
| if (cur_index == kInvalidOpIdIndex) { |
| cur_index = id.index(); |
| } |
| if (id.index() != cur_index) { |
| any_disagree = true; |
| break; |
| } |
| if (id.index() < minimum_index) { |
| any_behind = true; |
| break; |
| } |
| } |
| if (!any_behind && !any_disagree) { |
| return Status::OK(); |
| } |
| } else { |
| LOG(WARNING) << "Got error getting last opid for each replica: " << s.ToString(); |
| } |
| |
| LOG(INFO) << "Not converged past " << minimum_index << " yet: " << ids; |
| SleepFor(MonoDelta::FromMilliseconds(min(i * 100, 1000))); |
| } |
| return Status::TimedOut( |
| Substitute("index $0 not available on all replicas after $1", |
| minimum_index, timeout.ToString())); |
| } |
| |
| Status CreateTabletServerMap(const shared_ptr<MasterServiceProxy>& master_proxy, |
| const shared_ptr<Messenger>& messenger, |
| unordered_map<string, TServerDetails*>* ts_map) { |
| CHECK(ts_map->empty()); |
| const MonoDelta kTimeout = MonoDelta::FromSeconds(30); |
| vector<ListTabletServersResponsePB_Entry> tservers; |
| RETURN_NOT_OK(ListTabletServers(master_proxy, kTimeout, &tservers)); |
| |
| for (const auto& entry : tservers) { |
| HostPort host_port = HostPortFromPB(entry.registration().rpc_addresses(0)); |
| vector<Sockaddr> addresses; |
| host_port.ResolveAddresses(&addresses); |
| |
| unique_ptr<TServerDetails> peer(new TServerDetails); |
| peer->instance_id.CopyFrom(entry.instance_id()); |
| peer->registration.CopyFrom(entry.registration()); |
| peer->location = entry.location(); |
| |
| CreateTsClientProxies(addresses[0], |
| messenger, |
| &peer->tablet_copy_proxy, |
| &peer->tserver_proxy, |
| &peer->tserver_admin_proxy, |
| &peer->consensus_proxy, |
| &peer->generic_proxy); |
| |
| InsertOrDie(ts_map, peer->instance_id.permanent_uuid(), peer.get()); |
| ignore_result(peer.release()); |
| } |
| return Status::OK(); |
| } |
| |
| Status GetConsensusState(const TServerDetails* replica, |
| const string& tablet_id, |
| const MonoDelta& timeout, |
| IncludeHealthReport report_health, |
| ConsensusStatePB* consensus_state) { |
| GetConsensusStateRequestPB req; |
| GetConsensusStateResponsePB resp; |
| RpcController controller; |
| controller.set_timeout(timeout); |
| req.set_dest_uuid(replica->uuid()); |
| req.add_tablet_ids(tablet_id); |
| req.set_report_health(report_health); |
| |
| RETURN_NOT_OK(replica->consensus_proxy->GetConsensusState(req, &resp, &controller)); |
| if (resp.has_error()) { |
| return StatusFromPB(resp.error().status()); |
| } |
| if (resp.tablets_size() == 0) { |
| return Status::NotFound("tablet not found:", tablet_id); |
| } |
| DCHECK_EQ(1, resp.tablets_size()); |
| *consensus_state = resp.tablets(0).cstate(); |
| return Status::OK(); |
| } |
| |
| Status WaitUntilNoPendingConfig(const TServerDetails* replica, |
| const std::string& tablet_id, |
| const MonoDelta& timeout, |
| consensus::ConsensusStatePB* cstate) { |
| ConsensusStatePB cstate_tmp; |
| MonoTime start = MonoTime::Now(); |
| MonoTime deadline = start + timeout; |
| MonoTime now; |
| Status s; |
| while ((now = MonoTime::Now()) < deadline) { |
| s = GetConsensusState(replica, tablet_id, deadline - now, EXCLUDE_HEALTH_REPORT, &cstate_tmp); |
| if (s.ok() && !cstate_tmp.has_pending_config()) { |
| if (cstate) { |
| *cstate = std::move(cstate_tmp); |
| } |
| return Status::OK(); |
| } |
| SleepFor(MonoDelta::FromMilliseconds(30)); |
| } |
| return Status::TimedOut(Substitute("There is still a pending config after waiting for $0. " |
| "Last consensus state: $1. Last status: $2", |
| (MonoTime::Now() - start).ToString(), |
| SecureShortDebugString(cstate_tmp), s.ToString())); |
| } |
| |
| Status WaitUntilCommittedConfigNumVotersIs(int num_voters, |
| const TServerDetails* replica, |
| const std::string& tablet_id, |
| const MonoDelta& timeout) { |
| MonoTime start = MonoTime::Now(); |
| MonoTime deadline = start + timeout; |
| |
| int backoff_exp = 0; |
| const int kMaxBackoffExp = 7; |
| Status s; |
| ConsensusStatePB cstate; |
| while (true) { |
| MonoDelta remaining_timeout = deadline - MonoTime::Now(); |
| s = GetConsensusState(replica, tablet_id, remaining_timeout, EXCLUDE_HEALTH_REPORT, &cstate); |
| if (s.ok()) { |
| if (CountVoters(cstate.committed_config()) == num_voters) { |
| return Status::OK(); |
| } |
| } |
| |
| if (MonoTime::Now() > start + timeout) { |
| break; |
| } |
| SleepFor(MonoDelta::FromMilliseconds(1LLU << backoff_exp)); |
| backoff_exp = min(backoff_exp + 1, kMaxBackoffExp); |
| } |
| return Status::TimedOut(Substitute("Number of voters does not equal $0 after waiting for $1. " |
| "Last consensus state: $2. Last status: $3", |
| num_voters, timeout.ToString(), |
| SecureShortDebugString(cstate), s.ToString())); |
| } |
| |
| void WaitUntilCommittedConfigNumMembersIs(int num_members, |
| const TServerDetails* replica, |
| const std::string& tablet_id, |
| const MonoDelta& timeout) { |
| MonoTime deadline = MonoTime::Now() + timeout; |
| AssertEventually([&] { |
| ConsensusStatePB cstate; |
| ASSERT_OK(GetConsensusState(replica, tablet_id, deadline - MonoTime::Now(), |
| EXCLUDE_HEALTH_REPORT, &cstate)); |
| ASSERT_EQ(num_members, cstate.committed_config().peers_size()); |
| }, timeout); |
| NO_PENDING_FATALS(); |
| } |
| |
| Status WaitUntilCommittedConfigOpIdIndexIs(int64_t opid_index, |
| const TServerDetails* replica, |
| const std::string& tablet_id, |
| const MonoDelta& timeout) { |
| MonoTime start = MonoTime::Now(); |
| MonoTime deadline = start + timeout; |
| |
| Status s; |
| ConsensusStatePB cstate; |
| while (true) { |
| MonoDelta remaining_timeout = deadline - MonoTime::Now(); |
| s = GetConsensusState(replica, tablet_id, remaining_timeout, EXCLUDE_HEALTH_REPORT, &cstate); |
| if (s.ok() && cstate.committed_config().opid_index() == opid_index) { |
| return Status::OK(); |
| } |
| if (MonoTime::Now() > deadline) break; |
| SleepFor(MonoDelta::FromMilliseconds(10)); |
| } |
| return Status::TimedOut(Substitute("Committed config opid_index does not equal $0 " |
| "after waiting for $1. " |
| "Last consensus state: $2. Last status: $3", |
| opid_index, |
| (MonoTime::Now() - start).ToString(), |
| SecureShortDebugString(cstate), s.ToString())); |
| } |
| |
| Status ListTabletServers( |
| const shared_ptr<MasterServiceProxy>& master_proxy, |
| const MonoDelta& timeout, |
| vector<ListTabletServersResponsePB_Entry>* tservers) { |
| master::ListTabletServersRequestPB req; |
| master::ListTabletServersResponsePB resp; |
| RpcController controller; |
| controller.set_timeout(timeout); |
| |
| RETURN_NOT_OK(master_proxy->ListTabletServers(req, &resp, &controller)); |
| RETURN_NOT_OK(controller.status()); |
| if (resp.has_error()) { |
| return Status::RemoteError("Response had an error", SecureShortDebugString(resp.error())); |
| } |
| tservers->assign(resp.servers().begin(), resp.servers().end()); |
| return Status::OK(); |
| } |
| |
| Status WaitForNumTabletServers( |
| const shared_ptr<MasterServiceProxy>& master_proxy, |
| int num_servers, const MonoDelta& timeout) { |
| const MonoTime kStartTime = MonoTime::Now(); |
| const MonoTime kDeadline = kStartTime + timeout; |
| vector<ListTabletServersResponsePB_Entry> tservers; |
| while (MonoTime::Now() < kDeadline) { |
| RETURN_NOT_OK(ListTabletServers(master_proxy, kDeadline - MonoTime::Now(), &tservers)); |
| if (tservers.size() >= num_servers) return Status::OK(); |
| SleepFor(MonoDelta::FromMilliseconds(50)); |
| } |
| |
| return Status::TimedOut(Substitute( |
| "Timed out waiting for $0 tablet servers to be registered with the master. Found $1", |
| num_servers, tservers.size())); |
| } |
| |
| Status WaitForReplicasReportedToMaster( |
| const shared_ptr<master::MasterServiceProxy>& master_proxy, |
| int num_replicas, |
| const string& tablet_id, |
| const MonoDelta& timeout, |
| WaitForLeader wait_for_leader, |
| master::ReplicaTypeFilter filter, |
| bool* has_leader, |
| master::GetTabletLocationsResponsePB* tablet_locations) { |
| MonoTime deadline(MonoTime::Now() + timeout); |
| while (true) { |
| RETURN_NOT_OK(GetTabletLocations(master_proxy, tablet_id, timeout, filter, tablet_locations)); |
| *has_leader = false; |
| if (tablet_locations->tablet_locations(0).interned_replicas_size() == num_replicas) { |
| for (const auto& replica : tablet_locations->tablet_locations(0).interned_replicas()) { |
| if (replica.role() == RaftPeerPB::LEADER) { |
| *has_leader = true; |
| } |
| } |
| if (wait_for_leader == DONT_WAIT_FOR_LEADER || |
| (wait_for_leader == WAIT_FOR_LEADER && *has_leader)) { |
| break; |
| } |
| } |
| if (deadline < MonoTime::Now()) { |
| return Status::TimedOut(Substitute("Timed out while waiting " |
| "for tablet $0 reporting to master with $1 replicas, has_leader: $2", |
| tablet_id, num_replicas, *has_leader)); |
| } |
| SleepFor(MonoDelta::FromMilliseconds(20)); |
| } |
| if (num_replicas != tablet_locations->tablet_locations(0).interned_replicas_size()) { |
| return Status::NotFound(Substitute("Number of replicas for tablet $0 " |
| "reported to master $1:$2", |
| tablet_id, tablet_locations->tablet_locations(0).interned_replicas_size(), |
| SecureDebugString(*tablet_locations))); |
| } |
| if (wait_for_leader == WAIT_FOR_LEADER && !(*has_leader)) { |
| return Status::NotFound(Substitute( |
| "Leader for tablet $0 not found on master, number of replicas $1:$2", |
| tablet_id, |
| tablet_locations->tablet_locations(0).interned_replicas_size(), |
| SecureDebugString(*tablet_locations))); |
| } |
| return Status::OK(); |
| } |
| |
| Status WaitUntilCommittedOpIdIndexIs(int64_t opid_index, |
| TServerDetails* replica, |
| const string& tablet_id, |
| const MonoDelta& timeout) { |
| MonoTime start = MonoTime::Now(); |
| MonoTime deadline = start + timeout; |
| |
| Status s; |
| OpId op_id; |
| while (true) { |
| MonoDelta remaining_timeout = deadline - MonoTime::Now(); |
| s = GetLastOpIdForReplica(tablet_id, replica, consensus::COMMITTED_OPID, remaining_timeout, |
| &op_id); |
| if (s.ok() && op_id.index() == opid_index) { |
| return Status::OK(); |
| } |
| if (MonoTime::Now() > deadline) break; |
| SleepFor(MonoDelta::FromMilliseconds(10)); |
| } |
| return Status::TimedOut(Substitute("Committed consensus opid_index does not equal $0 " |
| "after waiting for $1. Last opid: $2. Last status: $3", |
| opid_index, |
| (MonoTime::Now() - start).ToString(), |
| OpIdToString(op_id), |
| s.ToString())); |
| } |
| |
| Status GetReplicaStatusAndCheckIfLeader(const TServerDetails* replica, |
| const string& tablet_id, |
| const MonoDelta& timeout) { |
| ConsensusStatePB cstate; |
| Status s = GetConsensusState(replica, tablet_id, timeout, EXCLUDE_HEALTH_REPORT, &cstate); |
| if (PREDICT_FALSE(!s.ok())) { |
| VLOG(1) << "Error getting consensus state from replica: " |
| << replica->instance_id.permanent_uuid(); |
| return Status::NotFound("Error connecting to replica", s.ToString()); |
| } |
| const string& replica_uuid = replica->instance_id.permanent_uuid(); |
| if (cstate.leader_uuid() == replica_uuid) { |
| return Status::OK(); |
| } |
| VLOG(1) << "Replica not leader of config: " << replica->instance_id.permanent_uuid(); |
| return Status::IllegalState("Replica found but not leader"); |
| } |
| |
| Status WaitUntilLeader(const TServerDetails* replica, |
| const string& tablet_id, |
| const MonoDelta& timeout) { |
| MonoTime start = MonoTime::Now(); |
| MonoTime deadline = start + timeout; |
| |
| int backoff_exp = 0; |
| const int kMaxBackoffExp = 7; |
| Status s; |
| while (true) { |
| MonoDelta remaining_timeout = deadline - MonoTime::Now(); |
| s = GetReplicaStatusAndCheckIfLeader(replica, tablet_id, remaining_timeout); |
| if (s.ok()) { |
| return Status::OK(); |
| } |
| |
| if (MonoTime::Now() > deadline) { |
| break; |
| } |
| SleepFor(MonoDelta::FromMilliseconds(1LLU << backoff_exp)); |
| backoff_exp = min(backoff_exp + 1, kMaxBackoffExp); |
| } |
| return Status::TimedOut(Substitute("Replica $0 is not leader after waiting for $1: $2", |
| replica->ToString(), timeout.ToString(), s.ToString())); |
| } |
| |
| Status FindTabletLeader(const TabletServerMap& tablet_servers, |
| const string& tablet_id, |
| const MonoDelta& timeout, |
| TServerDetails** leader) { |
| vector<TServerDetails*> tservers; |
| AppendValuesFromMap(tablet_servers, &tservers); |
| |
| MonoTime start = MonoTime::Now(); |
| MonoTime deadline = start + timeout; |
| Status s; |
| int i = 0; |
| while (true) { |
| MonoDelta remaining_timeout = deadline - MonoTime::Now(); |
| s = GetReplicaStatusAndCheckIfLeader(tservers[i], tablet_id, remaining_timeout); |
| if (s.ok()) { |
| *leader = tservers[i]; |
| return Status::OK(); |
| } |
| |
| if (MonoTime::Now() > deadline) break; |
| i = (i + 1) % tservers.size(); |
| if (i == 0) { |
| SleepFor(MonoDelta::FromMilliseconds(10)); |
| } |
| } |
| return Status::TimedOut(Substitute("Unable to find leader of tablet $0 after $1. " |
| "Status message: $2", tablet_id, |
| (MonoTime::Now() - start).ToString(), |
| s.ToString())); |
| } |
| |
| // Fills the out parameter "followers" with the tablet servers hosting the "tablet_id". |
| // Non-empty "tablet_servers" is expected to include all the tablet servers and not subset |
| // of tablet servers that host the "tablet_id". |
| // Returns an error if the tablet servers do not have consensus or cannot be reached. |
| Status FindTabletFollowers(const TabletServerMap& tablet_servers, |
| const string& tablet_id, |
| const MonoDelta& timeout, |
| vector<TServerDetails*>* followers) { |
| DCHECK(!tablet_servers.empty()); |
| |
| // Sorted sets allow for set intersection needed below. |
| // uuids of all supplied tablet servers. |
| std::set<string> tablet_server_uuids; |
| // uuids of the tablet server peers that host the specified tablet_id. |
| std::set<string> peer_uuids; |
| // uuid of the leader tablet server hosting the specified tablet_id. |
| string leader_uuid; |
| const MonoTime start = MonoTime::Now(); |
| const MonoTime deadline = MonoTime::Now() + timeout; |
| bool no_leader_or_peers_found = false; |
| for (const auto& entry : tablet_servers) { |
| const auto now = MonoTime::Now(); |
| if (now > deadline) { |
| return Status::TimedOut(Substitute("Unable to find followers for tablet $0 after $1.", |
| tablet_id, (now - start).ToString())); |
| } |
| const auto& tserver_uuid = entry.first; |
| const auto* tserver = entry.second; |
| tablet_server_uuids.emplace(tserver_uuid); |
| |
| const MonoDelta remaining_timeout = deadline - now; |
| ConsensusStatePB cstate; |
| Status s = GetConsensusState(tserver, tablet_id, remaining_timeout, EXCLUDE_HEALTH_REPORT, |
| &cstate); |
| if (!s.ok()) { |
| // Failure could be due to tablet server not hosting the tablet which is okay or other issue. |
| // If all the tablet servers return error then the failure is reported back. See below. |
| continue; |
| } |
| // At this point tablet server does host the tablet but it's possible there is no leader |
| // or peers are unknown. |
| if (cstate.committed_config().peers_size() == 0 || !cstate.has_leader_uuid()) { |
| no_leader_or_peers_found = true; |
| continue; |
| } |
| |
| std::set<string> curr_peer_uuids; |
| for (const auto& peer : cstate.committed_config().peers()) { |
| curr_peer_uuids.emplace(peer.permanent_uuid()); |
| } |
| DCHECK(!curr_peer_uuids.empty()); |
| DCHECK(!cstate.leader_uuid().empty()); |
| if (!leader_uuid.empty()) { |
| DCHECK(!peer_uuids.empty()); |
| // Sanity checking that tablet servers with the specified tablet are reporting |
| // the same leader and set of peers. |
| if (leader_uuid != cstate.leader_uuid()) { |
| return Status::IllegalState(Substitute( |
| "Leader $0 reported by tablet server $1 for tablet $2 doesn't match with leader $3 " |
| "reported by other tablet servers.", cstate.leader_uuid(), tserver_uuid, tablet_id, |
| leader_uuid)); |
| } |
| if (peer_uuids != curr_peer_uuids) { |
| return Status::IllegalState(Substitute( |
| "Peers reported by tablet server $0 for tablet $1 don't match with peers reported by " |
| "other tablet servers.", tserver_uuid, tablet_id)); |
| } |
| } else { |
| DCHECK(peer_uuids.empty()); |
| leader_uuid = cstate.leader_uuid(); |
| peer_uuids.swap(curr_peer_uuids); |
| } |
| } |
| |
| // Unable to get leader and peer information from multiple supplied tablet servers. |
| if (leader_uuid.empty()) { |
| DCHECK(peer_uuids.empty()); |
| |
| return no_leader_or_peers_found ? |
| Status::IllegalState( |
| Substitute("No leader or peers found for tablet $0.", tablet_id)) : |
| Status::NotFound( |
| Substitute("No tablet server found with tablet $0 or tablet servers not reachable.", |
| tablet_id)); |
| } |
| |
| if (peer_uuids != STLSetIntersection(peer_uuids, tablet_server_uuids)) { |
| return Status::InvalidArgument(Substitute( |
| "Not all peers reported by tablet servers are part of the supplied tablet servers.")); |
| } |
| |
| for (const auto& tserver_uuid : peer_uuids) { |
| if (tserver_uuid != leader_uuid) { |
| followers->emplace_back(FindOrDie(tablet_servers, tserver_uuid)); |
| } |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status StartElection(const TServerDetails* replica, |
| const string& tablet_id, |
| const MonoDelta& timeout) { |
| RunLeaderElectionRequestPB req; |
| req.set_dest_uuid(replica->uuid()); |
| req.set_tablet_id(tablet_id); |
| RunLeaderElectionResponsePB resp; |
| RpcController rpc; |
| rpc.set_timeout(timeout); |
| RETURN_NOT_OK(replica->consensus_proxy->RunLeaderElection(req, &resp, &rpc)); |
| if (resp.has_error()) { |
| return StatusFromPB(resp.error().status()) |
| .CloneAndPrepend(Substitute("Code $0", TabletServerErrorPB::Code_Name(resp.error().code()))); |
| } |
| return Status::OK(); |
| } |
| |
| Status RequestVote(const TServerDetails* replica, |
| const std::string& tablet_id, |
| const std::string& candidate_uuid, |
| int64_t candidate_term, |
| const consensus::OpId& last_logged_opid, |
| optional<bool> ignore_live_leader, |
| optional<bool> is_pre_election, |
| const MonoDelta& timeout) { |
| DCHECK(last_logged_opid.IsInitialized()); |
| VoteRequestPB req; |
| req.set_dest_uuid(replica->uuid()); |
| req.set_tablet_id(tablet_id); |
| req.set_candidate_uuid(candidate_uuid); |
| req.set_candidate_term(candidate_term); |
| *req.mutable_candidate_status()->mutable_last_received() = last_logged_opid; |
| if (ignore_live_leader) req.set_ignore_live_leader(*ignore_live_leader); |
| if (is_pre_election) req.set_is_pre_election(*is_pre_election); |
| VoteResponsePB resp; |
| RpcController rpc; |
| rpc.set_timeout(timeout); |
| RETURN_NOT_OK(replica->consensus_proxy->RequestConsensusVote(req, &resp, &rpc)); |
| if (resp.has_vote_granted() && resp.vote_granted()) return Status::OK(); |
| if (resp.has_error()) return StatusFromPB(resp.error().status()); |
| if (resp.has_consensus_error()) return StatusFromPB(resp.consensus_error().status()); |
| return Status::IllegalState("Unknown error"); |
| } |
| |
| Status LeaderStepDown(const TServerDetails* replica, |
| const string& tablet_id, |
| const MonoDelta& timeout, |
| TabletServerErrorPB* error, |
| const std::string& new_leader_uuid) { |
| LeaderStepDownRequestPB req; |
| req.set_dest_uuid(replica->uuid()); |
| req.set_tablet_id(tablet_id); |
| if (!new_leader_uuid.empty()) { |
| req.set_new_leader_uuid(new_leader_uuid); |
| req.set_mode(consensus::GRACEFUL); |
| } |
| LeaderStepDownResponsePB resp; |
| RpcController rpc; |
| rpc.set_timeout(timeout); |
| RETURN_NOT_OK(replica->consensus_proxy->LeaderStepDown(req, &resp, &rpc)); |
| if (resp.has_error()) { |
| if (error != nullptr) { |
| *error = resp.error(); |
| } |
| return StatusFromPB(resp.error().status()) |
| .CloneAndPrepend(Substitute("Code $0", TabletServerErrorPB::Code_Name(resp.error().code()))); |
| } |
| return Status::OK(); |
| } |
| |
| Status WriteSimpleTestRow(const TServerDetails* replica, |
| const std::string& tablet_id, |
| RowOperationsPB::Type write_type, |
| int32_t key, |
| int32_t int_val, |
| const string& string_val, |
| const MonoDelta& timeout) { |
| WriteRequestPB req; |
| WriteResponsePB resp; |
| RpcController rpc; |
| rpc.set_timeout(timeout); |
| |
| req.set_tablet_id(tablet_id); |
| Schema schema = GetSimpleTestSchema(); |
| RETURN_NOT_OK(SchemaToPB(schema, req.mutable_schema())); |
| AddTestRowToPB(write_type, schema, key, int_val, string_val, req.mutable_row_operations()); |
| |
| RETURN_NOT_OK(replica->tserver_proxy->Write(req, &resp, &rpc)); |
| if (resp.has_error()) { |
| return StatusFromPB(resp.error().status()); |
| } |
| return Status::OK(); |
| } |
| |
| Status AddServer(const TServerDetails* leader, |
| const std::string& tablet_id, |
| const TServerDetails* replica_to_add, |
| consensus::RaftPeerPB::MemberType member_type, |
| const MonoDelta& timeout, |
| const consensus::RaftPeerAttrsPB& attrs, |
| const optional<int64_t>& cas_config_index, |
| TabletServerErrorPB::Code* error_code) { |
| ChangeConfigRequestPB req; |
| req.set_dest_uuid(leader->uuid()); |
| req.set_tablet_id(tablet_id); |
| req.set_type(consensus::ADD_PEER); |
| RaftPeerPB* peer = req.mutable_server(); |
| peer->set_permanent_uuid(replica_to_add->uuid()); |
| peer->set_member_type(member_type); |
| *peer->mutable_attrs() = attrs; |
| *peer->mutable_last_known_addr() = replica_to_add->registration.rpc_addresses(0); |
| if (cas_config_index) { |
| req.set_cas_config_opid_index(*cas_config_index); |
| } |
| |
| ChangeConfigResponsePB resp; |
| RpcController rpc; |
| rpc.set_timeout(timeout); |
| RETURN_NOT_OK(leader->consensus_proxy->ChangeConfig(req, &resp, &rpc)); |
| if (resp.has_error()) { |
| if (error_code) { |
| *error_code = resp.error().code(); |
| } |
| return StatusFromPB(resp.error().status()); |
| } |
| return Status::OK(); |
| } |
| |
| Status RemoveServer(const TServerDetails* leader, |
| const std::string& tablet_id, |
| const TServerDetails* replica_to_remove, |
| const MonoDelta& timeout, |
| const optional<int64_t>& cas_config_index, |
| TabletServerErrorPB::Code* error_code) { |
| ChangeConfigRequestPB req; |
| req.set_dest_uuid(leader->uuid()); |
| req.set_tablet_id(tablet_id); |
| req.set_type(consensus::REMOVE_PEER); |
| if (cas_config_index) { |
| req.set_cas_config_opid_index(*cas_config_index); |
| } |
| RaftPeerPB* peer = req.mutable_server(); |
| peer->set_permanent_uuid(replica_to_remove->uuid()); |
| |
| ChangeConfigResponsePB resp; |
| RpcController rpc; |
| rpc.set_timeout(timeout); |
| RETURN_NOT_OK(leader->consensus_proxy->ChangeConfig(req, &resp, &rpc)); |
| if (resp.has_error()) { |
| if (error_code) { |
| *error_code = resp.error().code(); |
| } |
| return StatusFromPB(resp.error().status()); |
| } |
| return Status::OK(); |
| } |
| |
| Status ChangeReplicaType(const TServerDetails* leader, |
| const std::string& tablet_id, |
| const TServerDetails* target_replica, |
| RaftPeerPB::MemberType replica_type, |
| const MonoDelta& timeout, |
| const optional<int64_t>& cas_config_index, |
| tserver::TabletServerErrorPB::Code* error_code) { |
| ChangeConfigRequestPB req; |
| req.set_dest_uuid(leader->uuid()); |
| req.set_tablet_id(tablet_id); |
| req.set_type(consensus::MODIFY_PEER); |
| if (cas_config_index) { |
| req.set_cas_config_opid_index(*cas_config_index); |
| } |
| RaftPeerPB* peer = req.mutable_server(); |
| peer->set_permanent_uuid(target_replica->uuid()); |
| peer->set_member_type(replica_type); |
| |
| ChangeConfigResponsePB resp; |
| RpcController rpc; |
| rpc.set_timeout(timeout); |
| RETURN_NOT_OK(leader->consensus_proxy->ChangeConfig(req, &resp, &rpc)); |
| if (resp.has_error()) { |
| if (error_code) { |
| *error_code = resp.error().code(); |
| } |
| return StatusFromPB(resp.error().status()); |
| } |
| return Status::OK(); |
| } |
| |
| Status BulkChangeConfig(const TServerDetails* leader, |
| const std::string& tablet_id, |
| const vector<BulkChangeConfigRequestPB::ConfigChangeItemPB>& changes, |
| const MonoDelta& timeout, |
| const optional<int64_t>& cas_config_index, |
| tserver::TabletServerErrorPB::Code* error_code) { |
| BulkChangeConfigRequestPB req; |
| req.set_dest_uuid(leader->uuid()); |
| req.set_tablet_id(tablet_id); |
| if (cas_config_index) { |
| req.set_cas_config_opid_index(*cas_config_index); |
| } |
| for (const auto& change : changes) { |
| *req.add_config_changes() = change; |
| } |
| |
| ChangeConfigResponsePB resp; |
| RpcController rpc; |
| rpc.set_timeout(timeout); |
| RETURN_NOT_OK(leader->consensus_proxy->BulkChangeConfig(req, &resp, &rpc)); |
| if (resp.has_error()) { |
| if (error_code) { |
| *error_code = resp.error().code(); |
| } |
| return StatusFromPB(resp.error().status()); |
| } |
| return Status::OK(); |
| } |
| |
| Status ListTablets(const TServerDetails* ts, |
| const MonoDelta& timeout, |
| vector<ListTabletsResponsePB::StatusAndSchemaPB>* tablets) { |
| tserver::ListTabletsRequestPB req; |
| tserver::ListTabletsResponsePB resp; |
| RpcController rpc; |
| rpc.set_timeout(timeout); |
| |
| RETURN_NOT_OK(ts->tserver_proxy->ListTablets(req, &resp, &rpc)); |
| if (resp.has_error()) { |
| return StatusFromPB(resp.error().status()); |
| } |
| |
| tablets->assign(resp.status_and_schema().begin(), resp.status_and_schema().end()); |
| return Status::OK(); |
| } |
| |
| Status ListRunningTabletIds(const TServerDetails* ts, |
| const MonoDelta& timeout, |
| vector<string>* tablet_ids) { |
| vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets; |
| RETURN_NOT_OK(ListTablets(ts, timeout, &tablets)); |
| tablet_ids->clear(); |
| for (const ListTabletsResponsePB::StatusAndSchemaPB& t : tablets) { |
| if (t.tablet_status().state() == tablet::RUNNING) { |
| tablet_ids->push_back(t.tablet_status().tablet_id()); |
| } |
| } |
| return Status::OK(); |
| } |
| |
| Status GetTabletLocations(const shared_ptr<MasterServiceProxy>& master_proxy, |
| const string& tablet_id, |
| const MonoDelta& timeout, |
| master::ReplicaTypeFilter filter, |
| master::GetTabletLocationsResponsePB* tablet_locations) { |
| master::GetTabletLocationsRequestPB req; |
| *req.add_tablet_ids() = tablet_id; |
| req.set_replica_type_filter(filter); |
| req.set_intern_ts_infos_in_response(true); |
| rpc::RpcController rpc; |
| rpc.set_timeout(timeout); |
| RETURN_NOT_OK(master_proxy->GetTabletLocations(req, tablet_locations, &rpc)); |
| if (tablet_locations->has_error()) { |
| return StatusFromPB(tablet_locations->error().status()); |
| } |
| if (tablet_locations->errors_size() > 0) { |
| CHECK_EQ(1, tablet_locations->errors_size()) |
| << SecureShortDebugString(*tablet_locations); |
| return StatusFromPB(tablet_locations->errors(0).status()); |
| } |
| CHECK_EQ(1, tablet_locations->tablet_locations_size()) |
| << SecureShortDebugString(*tablet_locations); |
| return Status::OK(); |
| } |
| |
| Status ListTablesWithInfo(const shared_ptr<MasterServiceProxy>& master_proxy, |
| const string& filter, |
| const MonoDelta& timeout, |
| master::ListTablesResponsePB* tables_info) { |
| master::ListTablesRequestPB req; |
| if (!filter.empty()) { |
| req.set_name_filter(filter); |
| } |
| req.set_list_tablet_with_partition(true); |
| rpc::RpcController rpc; |
| rpc.set_timeout(timeout); |
| RETURN_NOT_OK(master_proxy->ListTables(req, tables_info, &rpc)); |
| if (tables_info->has_error()) { |
| return StatusFromPB(tables_info->error().status()); |
| } |
| return Status::OK(); |
| } |
| |
| Status GetTableLocations(const shared_ptr<MasterServiceProxy>& master_proxy, |
| const string& table_name, |
| const MonoDelta& timeout, |
| master::ReplicaTypeFilter filter, |
| const optional<string>& table_id, |
| master::GetTableLocationsResponsePB* table_locations) { |
| master::GetTableLocationsRequestPB req; |
| req.mutable_table()->set_table_name(table_name); |
| if (table_id) { |
| req.mutable_table()->set_table_id(*table_id); |
| } |
| req.set_replica_type_filter(filter); |
| req.set_max_returned_locations(1000); |
| req.set_intern_ts_infos_in_response(true); |
| rpc::RpcController rpc; |
| rpc.set_timeout(timeout); |
| RETURN_NOT_OK(master_proxy->GetTableLocations(req, table_locations, &rpc)); |
| if (table_locations->has_error()) { |
| return StatusFromPB(table_locations->error().status()); |
| } |
| return Status::OK(); |
| } |
| |
| Status WaitForNumVotersInConfigOnMaster(const shared_ptr<MasterServiceProxy>& master_proxy, |
| const std::string& tablet_id, |
| int num_voters, |
| const MonoDelta& timeout) { |
| Status s; |
| MonoTime deadline = MonoTime::Now() + timeout; |
| int num_voters_found = 0; |
| while (true) { |
| master::GetTabletLocationsResponsePB tablet_locations; |
| MonoDelta time_remaining = deadline - MonoTime::Now(); |
| s = GetTabletLocations(master_proxy, tablet_id, time_remaining, |
| master::VOTER_REPLICA, &tablet_locations); |
| if (s.ok()) { |
| num_voters_found = 0; |
| for (const auto& r : tablet_locations.tablet_locations(0).interned_replicas()) { |
| if (r.role() == RaftPeerPB::LEADER || r.role() == RaftPeerPB::FOLLOWER) num_voters_found++; |
| } |
| if (num_voters_found == num_voters) break; |
| } |
| if (MonoTime::Now() > deadline) break; |
| SleepFor(MonoDelta::FromMilliseconds(10)); |
| } |
| RETURN_NOT_OK(s); |
| if (num_voters_found != num_voters) { |
| return Status::IllegalState( |
| Substitute("Did not find exactly $0 voters, found $1 voters", |
| num_voters, num_voters_found)); |
| } |
| return Status::OK(); |
| } |
| |
| Status WaitForNumTabletsOnTS(const TServerDetails* ts, |
| int count, |
| const MonoDelta& timeout, |
| vector<ListTabletsResponsePB::StatusAndSchemaPB>* tablets, |
| optional<tablet::TabletStatePB> state) { |
| // If the user doesn't care about collecting the resulting tablets, collect into a local |
| // vector. |
| vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets_local; |
| if (tablets == nullptr) tablets = &tablets_local; |
| |
| Status s; |
| MonoTime deadline = MonoTime::Now() + timeout; |
| while (true) { |
| s = ListTablets(ts, MonoDelta::FromSeconds(10), tablets); |
| if (s.ok() && state) { |
| tablets->erase( |
| std::remove_if(tablets->begin(), tablets->end(), |
| [&] (const ListTabletsResponsePB::StatusAndSchemaPB& t) { |
| return t.tablet_status().state() != state; |
| }), |
| tablets->end()); |
| } |
| |
| if (s.ok() && tablets->size() == count) break; |
| if (MonoTime::Now() > deadline) break; |
| SleepFor(MonoDelta::FromMilliseconds(10)); |
| } |
| RETURN_NOT_OK(s); |
| if (tablets->size() != count) { |
| return Status::IllegalState( |
| Substitute("Did not find exactly $0 tablets, found $1 tablets", |
| count, tablets->size())); |
| } |
| return Status::OK(); |
| } |
| |
| Status CheckIfTabletInState(const TServerDetails* ts, |
| const std::string& tablet_id, |
| tablet::TabletStatePB expected_state, |
| const MonoDelta& timeout) { |
| vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets; |
| RETURN_NOT_OK(ListTablets(ts, timeout, &tablets)); |
| for (const ListTabletsResponsePB::StatusAndSchemaPB& t : tablets) { |
| if (t.tablet_status().tablet_id() == tablet_id) { |
| tablet::TabletStatePB current_state = t.tablet_status().state(); |
| if (current_state != expected_state) { |
| return Status::IllegalState(Substitute("Tablet not in expected state $0 (state = $1)", |
| TabletStatePB_Name(expected_state), |
| TabletStatePB_Name(current_state))); |
| } |
| return Status::OK(); |
| } |
| } |
| return Status::NotFound("Tablet " + tablet_id + " not found"); |
| } |
| |
| Status CheckIfTabletRunning(const TServerDetails* ts, |
| const std::string& tablet_id, |
| const MonoDelta& timeout) { |
| return CheckIfTabletInState(ts, tablet_id, tablet::RUNNING, timeout); |
| } |
| |
| Status WaitUntilTabletInState(const TServerDetails* ts, |
| const std::string& tablet_id, |
| tablet::TabletStatePB state, |
| const MonoDelta& timeout) { |
| static const MonoDelta kSleepInterval = MonoDelta::FromMilliseconds(10); |
| const MonoTime start = MonoTime::Now(); |
| const MonoTime deadline = start + timeout; |
| MonoTime now; |
| Status s; |
| while (true) { |
| now = MonoTime::Now(); |
| s = CheckIfTabletInState(ts, tablet_id, state, deadline - now); |
| if (s.ok()) { |
| return Status::OK(); |
| } |
| if (now + kSleepInterval >= deadline) { |
| break; |
| } |
| SleepFor(kSleepInterval); |
| } |
| return Status::TimedOut(Substitute("T $0 P $1: Tablet not in $2 state after $3: " |
| "Status message: $4", |
| tablet_id, ts->uuid(), |
| tablet::TabletStatePB_Name(state), |
| (now - start).ToString(), |
| s.ToString())); |
| } |
| |
| // Wait until the specified tablet is in RUNNING state. |
| Status WaitUntilTabletRunning(const TServerDetails* ts, |
| const std::string& tablet_id, |
| const MonoDelta& timeout) { |
| return WaitUntilTabletInState(ts, tablet_id, tablet::RUNNING, timeout); |
| } |
| |
| Status DeleteTablet(const TServerDetails* ts, |
| const std::string& tablet_id, |
| const TabletDataState& delete_type, |
| const MonoDelta& timeout, |
| const optional<int64_t>& cas_config_index, |
| tserver::TabletServerErrorPB::Code* error_code) { |
| DeleteTabletRequestPB req; |
| req.set_dest_uuid(ts->uuid()); |
| req.set_tablet_id(tablet_id); |
| req.set_delete_type(delete_type); |
| if (cas_config_index) { |
| req.set_cas_config_opid_index_less_or_equal(*cas_config_index); |
| } |
| |
| DeleteTabletResponsePB resp; |
| RpcController rpc; |
| rpc.set_timeout(timeout); |
| RETURN_NOT_OK(ts->tserver_admin_proxy->DeleteTablet(req, &resp, &rpc)); |
| if (resp.has_error()) { |
| if (error_code) { |
| *error_code = resp.error().code(); |
| } |
| return StatusFromPB(resp.error().status()); |
| } |
| return Status::OK(); |
| } |
| |
| Status DeleteTabletWithRetries(const TServerDetails* ts, |
| const string& tablet_id, |
| TabletDataState delete_type, |
| const MonoDelta& timeout, |
| const optional<int64_t>& cas_config_index) { |
| const MonoTime deadline = MonoTime::Now() + timeout; |
| Status s; |
| while (true) { |
| s = DeleteTablet(ts, tablet_id, delete_type, timeout, cas_config_index); |
| if (s.ok()) { |
| return s; |
| } |
| if (deadline < MonoTime::Now()) { |
| break; |
| } |
| SleepFor(MonoDelta::FromMilliseconds(10)); |
| } |
| return s; |
| } |
| |
| Status StartTabletCopy(const TServerDetails* ts, |
| const string& tablet_id, |
| const string& copy_source_uuid, |
| const HostPort& copy_source_addr, |
| int64_t caller_term, |
| const MonoDelta& timeout, |
| tserver::TabletServerErrorPB::Code* error_code) { |
| consensus::StartTabletCopyRequestPB req; |
| consensus::StartTabletCopyResponsePB resp; |
| RpcController rpc; |
| rpc.set_timeout(timeout); |
| |
| req.set_dest_uuid(ts->uuid()); |
| req.set_tablet_id(tablet_id); |
| req.set_copy_peer_uuid(copy_source_uuid); |
| *req.mutable_copy_peer_addr() = HostPortToPB(copy_source_addr); |
| req.set_caller_term(caller_term); |
| |
| RETURN_NOT_OK(ts->consensus_proxy->StartTabletCopy(req, &resp, &rpc)); |
| if (resp.has_error()) { |
| CHECK(resp.error().has_code()) << "Tablet copy error response has no code"; |
| CHECK(tserver::TabletServerErrorPB::Code_IsValid(resp.error().code())) |
| << "Tablet copy error response code is not valid"; |
| if (error_code) { |
| *error_code = resp.error().code(); |
| } |
| return StatusFromPB(resp.error().status()); |
| } |
| return Status::OK(); |
| } |
| |
| Status BeginTabletCopySession(const TServerDetails* ts, |
| const string& tablet_id, |
| const string& caller_uuid, |
| const MonoDelta& timeout, |
| TabletCopyErrorPB::Code* error_code) { |
| BeginTabletCopySessionRequestPB req; |
| BeginTabletCopySessionResponsePB resp; |
| req.set_tablet_id(tablet_id); |
| req.set_requestor_uuid(caller_uuid); |
| |
| RpcController rpc; |
| rpc.set_timeout(timeout); |
| |
| RETURN_NOT_OK(ts->tablet_copy_proxy->BeginTabletCopySession(req, &resp, &rpc)); |
| if (rpc.error_response()) { |
| const TabletCopyErrorPB& error = |
| rpc.error_response()->GetExtension(TabletCopyErrorPB::tablet_copy_error_ext); |
| if (error_code) *error_code = error.code(); |
| return StatusFromPB(error.status()); |
| } |
| return Status::OK(); |
| } |
| |
| Status GetInt64Metric(const HostPort& http_hp, |
| const MetricEntityPrototype* entity_proto, |
| const char* entity_id, |
| const MetricPrototype* metric_proto, |
| const char* value_field, |
| int64_t* value) { |
| *value = 0; |
| bool found = false; |
| // Fetch metrics whose name matches the given prototype. |
| string url = Substitute( |
| "http://$0/jsonmetricz?metrics=$1", |
| http_hp.ToString(), metric_proto->name()); |
| EasyCurl curl; |
| faststring dst; |
| RETURN_NOT_OK(curl.FetchURL(url, &dst)); |
| |
| // Parse the results, beginning with the top-level entity array. |
| JsonReader r(dst.ToString()); |
| RETURN_NOT_OK(r.Init()); |
| vector<const Value*> entities; |
| RETURN_NOT_OK(r.ExtractObjectArray(r.root(), nullptr, &entities)); |
| for (const Value* entity : entities) { |
| // Find the desired entity. |
| string type; |
| RETURN_NOT_OK(r.ExtractString(entity, "type", &type)); |
| if (type != entity_proto->name()) { |
| continue; |
| } |
| if (entity_id) { |
| string id; |
| RETURN_NOT_OK(r.ExtractString(entity, "id", &id)); |
| if (!MatchPattern(id, entity_id)) { |
| continue; |
| } |
| } |
| |
| // Find the desired metric within the entity. |
| vector<const Value*> metrics; |
| RETURN_NOT_OK(r.ExtractObjectArray(entity, "metrics", &metrics)); |
| for (const Value* metric : metrics) { |
| string name; |
| RETURN_NOT_OK(r.ExtractString(metric, "name", &name)); |
| if (name != metric_proto->name()) { |
| continue; |
| } |
| |
| int64_t v = 0; |
| RETURN_NOT_OK(r.ExtractInt64(metric, value_field, &v)); |
| found = true; |
| *value += v; |
| if (!entity_id) { |
| return Status::OK(); |
| } |
| } |
| } |
| if (found) { |
| return Status::OK(); |
| } |
| string msg; |
| if (entity_id) { |
| msg = Substitute("Could not find metric $0.$1 for entity $2", |
| entity_proto->name(), metric_proto->name(), |
| entity_id); |
| } else { |
| msg = Substitute("Could not find metric $0.$1", |
| entity_proto->name(), metric_proto->name()); |
| } |
| return Status::NotFound(msg); |
| } |
| |
| Status GetMasterRegistration(const shared_ptr<MasterServiceProxy>& master_proxy, |
| const MonoDelta& timeout, |
| master::GetMasterRegistrationResponsePB* registration) { |
| master::GetMasterRegistrationRequestPB req; |
| RpcController rpc; |
| rpc.set_timeout(timeout); |
| RETURN_NOT_OK(master_proxy->GetMasterRegistration(req, registration, &rpc)); |
| if (registration->has_error()) { |
| return StatusFromPB(registration->error().status()); |
| } |
| return Status::OK(); |
| } |
| |
| Status GetClusterId(const shared_ptr<MasterServiceProxy>& master_proxy, |
| const MonoDelta& timeout, |
| string* cluster_id) { |
| master::GetMasterRegistrationResponsePB registration; |
| RETURN_NOT_OK(GetMasterRegistration(master_proxy, timeout, ®istration)); |
| CHECK(registration.has_cluster_id()); |
| *cluster_id = registration.cluster_id(); |
| return Status::OK(); |
| } |
| |
| Status GetTsCounterValue(ExternalTabletServer* ets, |
| MetricPrototype* metric, |
| int64_t* value) { |
| return GetInt64Metric(ets->bound_http_hostport(), |
| &METRIC_ENTITY_server, |
| "kudu.tabletserver", |
| metric, |
| "value", |
| value); |
| } |
| |
| Status AlterTableName(const shared_ptr<MasterServiceProxy>& master_proxy, |
| const string& table_id, |
| const string& old_table_name, |
| const string& new_table_name, |
| const MonoDelta& timeout) { |
| master::AlterTableRequestPB req; |
| req.mutable_table()->set_table_id(table_id); |
| req.mutable_table()->set_table_name(old_table_name); |
| req.set_new_table_name(new_table_name); |
| master::AlterTableResponsePB resp; |
| RpcController controller; |
| controller.set_timeout(timeout); |
| |
| RETURN_NOT_OK(master_proxy->AlterTable(req, &resp, &controller)); |
| RETURN_NOT_OK(controller.status()); |
| if (resp.has_error()) { |
| return Status::RemoteError("Response had an error", SecureShortDebugString(resp.error())); |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status DeleteTable(const std::shared_ptr<master::MasterServiceProxy>& master_proxy, |
| const std::string& table_id, |
| const std::string& table_name, |
| const MonoDelta& timeout) { |
| master::DeleteTableRequestPB req; |
| req.mutable_table()->set_table_id(table_id); |
| req.mutable_table()->set_table_name(table_name); |
| master::DeleteTableResponsePB resp; |
| RpcController controller; |
| controller.set_timeout(timeout); |
| |
| RETURN_NOT_OK(master_proxy->DeleteTable(req, &resp, &controller)); |
| RETURN_NOT_OK(controller.status()); |
| if (resp.has_error()) { |
| return Status::RemoteError("Response had an error", SecureShortDebugString(resp.error())); |
| } |
| |
| return Status::OK(); |
| } |
| |
| } // namespace itest |
| } // namespace kudu |