blob: e54728a96542e44801d27d02bfc766936bb40e4b [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/integration-tests/cluster_itest_util.h"
#include <algorithm>
#include <boost/optional.hpp>
#include <glog/stl_logging.h>
#include <limits>
#include "kudu/client/client.h"
#include "kudu/common/wire_protocol.h"
#include "kudu/common/wire_protocol.pb.h"
#include "kudu/common/wire_protocol-test-util.h"
#include "kudu/consensus/consensus.proxy.h"
#include "kudu/consensus/opid_util.h"
#include "kudu/consensus/quorum_util.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/strings/join.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/master/master.proxy.h"
#include "kudu/rpc/rpc_controller.h"
#include "kudu/server/server_base.proxy.h"
#include "kudu/tserver/tablet_server_test_util.h"
#include "kudu/tserver/tserver_admin.proxy.h"
#include "kudu/tserver/tserver_service.pb.h"
#include "kudu/tserver/tserver_service.proxy.h"
#include "kudu/util/net/net_util.h"
namespace kudu {
namespace itest {
using client::KuduClient;
using client::KuduSchema;
using client::KuduSchemaBuilder;
using client::KuduTable;
using consensus::CONSENSUS_CONFIG_ACTIVE;
using consensus::CONSENSUS_CONFIG_COMMITTED;
using consensus::ChangeConfigRequestPB;
using consensus::ChangeConfigResponsePB;
using consensus::ConsensusConfigType;
using consensus::ConsensusStatePB;
using consensus::CountVoters;
using consensus::GetConsensusStateRequestPB;
using consensus::GetConsensusStateResponsePB;
using consensus::GetLastOpIdRequestPB;
using consensus::GetLastOpIdResponsePB;
using consensus::LeaderStepDownRequestPB;
using consensus::LeaderStepDownResponsePB;
using consensus::OpId;
using consensus::RaftPeerPB;
using consensus::RunLeaderElectionResponsePB;
using consensus::RunLeaderElectionRequestPB;
using consensus::kInvalidOpIdIndex;
using master::ListTabletServersResponsePB;
using master::MasterServiceProxy;
using master::TabletLocationsPB;
using rpc::Messenger;
using rpc::RpcController;
using std::min;
using std::shared_ptr;
using std::string;
using std::unordered_map;
using std::vector;
using strings::Substitute;
using tserver::CreateTsClientProxies;
using tserver::ListTabletsResponsePB;
using tserver::DeleteTabletRequestPB;
using tserver::DeleteTabletResponsePB;
using tserver::TabletServerAdminServiceProxy;
using tserver::TabletServerErrorPB;
using tserver::TabletServerServiceProxy;
using tserver::WriteRequestPB;
using tserver::WriteResponsePB;
const string& TServerDetails::uuid() const {
return instance_id.permanent_uuid();
}
string TServerDetails::ToString() const {
return Substitute("TabletServer: $0, Rpc address: $1",
instance_id.permanent_uuid(),
registration.rpc_addresses(0).ShortDebugString());
}
client::KuduSchema SimpleIntKeyKuduSchema() {
KuduSchema s;
KuduSchemaBuilder b;
b.AddColumn("key")->Type(client::KuduColumnSchema::INT32)->NotNull()->PrimaryKey();
CHECK_OK(b.Build(&s));
return s;
}
Status GetLastOpIdForEachReplica(const string& tablet_id,
const vector<TServerDetails*>& replicas,
consensus::OpIdType opid_type,
const MonoDelta& timeout,
vector<OpId>* op_ids) {
GetLastOpIdRequestPB opid_req;
GetLastOpIdResponsePB opid_resp;
opid_req.set_tablet_id(tablet_id);
RpcController controller;
op_ids->clear();
for (TServerDetails* ts : replicas) {
controller.Reset();
controller.set_timeout(timeout);
opid_resp.Clear();
opid_req.set_dest_uuid(ts->uuid());
opid_req.set_tablet_id(tablet_id);
opid_req.set_opid_type(opid_type);
RETURN_NOT_OK_PREPEND(
ts->consensus_proxy->GetLastOpId(opid_req, &opid_resp, &controller),
Substitute("Failed to fetch last op id from $0",
ts->instance_id.ShortDebugString()));
op_ids->push_back(opid_resp.opid());
}
return Status::OK();
}
Status GetLastOpIdForReplica(const std::string& tablet_id,
TServerDetails* replica,
consensus::OpIdType opid_type,
const MonoDelta& timeout,
consensus::OpId* op_id) {
vector<TServerDetails*> replicas;
replicas.push_back(replica);
vector<OpId> op_ids;
RETURN_NOT_OK(GetLastOpIdForEachReplica(tablet_id, replicas, opid_type, timeout, &op_ids));
CHECK_EQ(1, op_ids.size());
*op_id = op_ids[0];
return Status::OK();
}
Status WaitForServersToAgree(const MonoDelta& timeout,
const TabletServerMap& tablet_servers,
const string& tablet_id,
int64_t minimum_index) {
MonoTime now = MonoTime::Now(MonoTime::COARSE);
MonoTime deadline = now;
deadline.AddDelta(timeout);
for (int i = 1; now.ComesBefore(deadline); i++) {
vector<TServerDetails*> servers;
AppendValuesFromMap(tablet_servers, &servers);
vector<OpId> ids;
Status s = GetLastOpIdForEachReplica(tablet_id, servers, consensus::RECEIVED_OPID, timeout,
&ids);
if (s.ok()) {
bool any_behind = false;
bool any_disagree = false;
int64_t cur_index = kInvalidOpIdIndex;
for (const OpId& id : ids) {
if (cur_index == kInvalidOpIdIndex) {
cur_index = id.index();
}
if (id.index() != cur_index) {
any_disagree = true;
break;
}
if (id.index() < minimum_index) {
any_behind = true;
break;
}
}
if (!any_behind && !any_disagree) {
return Status::OK();
}
} else {
LOG(WARNING) << "Got error getting last opid for each replica: " << s.ToString();
}
LOG(INFO) << "Not converged past " << minimum_index << " yet: " << ids;
SleepFor(MonoDelta::FromMilliseconds(min(i * 100, 1000)));
now = MonoTime::Now(MonoTime::COARSE);
}
return Status::TimedOut(Substitute("Index $0 not available on all replicas after $1. ",
minimum_index, timeout.ToString()));
}
// Wait until all specified replicas have logged the given index.
Status WaitUntilAllReplicasHaveOp(const int64_t log_index,
const string& tablet_id,
const vector<TServerDetails*>& replicas,
const MonoDelta& timeout) {
MonoTime start = MonoTime::Now(MonoTime::FINE);
MonoDelta passed = MonoDelta::FromMilliseconds(0);
while (true) {
vector<OpId> op_ids;
Status s = GetLastOpIdForEachReplica(tablet_id, replicas, consensus::RECEIVED_OPID, timeout,
&op_ids);
if (s.ok()) {
bool any_behind = false;
for (const OpId& op_id : op_ids) {
if (op_id.index() < log_index) {
any_behind = true;
break;
}
}
if (!any_behind) return Status::OK();
} else {
LOG(WARNING) << "Got error getting last opid for each replica: " << s.ToString();
}
passed = MonoTime::Now(MonoTime::FINE).GetDeltaSince(start);
if (passed.MoreThan(timeout)) {
break;
}
SleepFor(MonoDelta::FromMilliseconds(50));
}
string replicas_str;
for (const TServerDetails* replica : replicas) {
if (!replicas_str.empty()) replicas_str += ", ";
replicas_str += "{ " + replica->ToString() + " }";
}
return Status::TimedOut(Substitute("Index $0 not available on all replicas after $1. "
"Replicas: [ $2 ]",
log_index, passed.ToString()));
}
Status CreateTabletServerMap(MasterServiceProxy* master_proxy,
const shared_ptr<Messenger>& messenger,
unordered_map<string, TServerDetails*>* ts_map) {
master::ListTabletServersRequestPB req;
master::ListTabletServersResponsePB resp;
rpc::RpcController controller;
RETURN_NOT_OK(master_proxy->ListTabletServers(req, &resp, &controller));
RETURN_NOT_OK(controller.status());
if (resp.has_error()) {
return Status::RemoteError("Response had an error", resp.error().ShortDebugString());
}
ts_map->clear();
for (const ListTabletServersResponsePB::Entry& entry : resp.servers()) {
HostPort host_port;
RETURN_NOT_OK(HostPortFromPB(entry.registration().rpc_addresses(0), &host_port));
vector<Sockaddr> addresses;
host_port.ResolveAddresses(&addresses);
gscoped_ptr<TServerDetails> peer(new TServerDetails);
peer->instance_id.CopyFrom(entry.instance_id());
peer->registration.CopyFrom(entry.registration());
CreateTsClientProxies(addresses[0],
messenger,
&peer->tserver_proxy,
&peer->tserver_admin_proxy,
&peer->consensus_proxy,
&peer->generic_proxy);
InsertOrDie(ts_map, peer->instance_id.permanent_uuid(), peer.get());
ignore_result(peer.release());
}
return Status::OK();
}
Status GetConsensusState(const TServerDetails* replica,
const string& tablet_id,
consensus::ConsensusConfigType type,
const MonoDelta& timeout,
ConsensusStatePB* consensus_state) {
GetConsensusStateRequestPB req;
GetConsensusStateResponsePB resp;
RpcController controller;
controller.set_timeout(timeout);
req.set_dest_uuid(replica->uuid());
req.set_tablet_id(tablet_id);
req.set_type(type);
RETURN_NOT_OK(replica->consensus_proxy->GetConsensusState(req, &resp, &controller));
if (resp.has_error()) {
return StatusFromPB(resp.error().status());
}
*consensus_state = resp.cstate();
return Status::OK();
}
Status WaitUntilCommittedConfigNumVotersIs(int config_size,
const TServerDetails* replica,
const std::string& tablet_id,
const MonoDelta& timeout) {
MonoTime start = MonoTime::Now(MonoTime::FINE);
MonoTime deadline = start;
deadline.AddDelta(timeout);
int backoff_exp = 0;
const int kMaxBackoffExp = 7;
Status s;
ConsensusStatePB cstate;
while (true) {
MonoDelta remaining_timeout = deadline.GetDeltaSince(MonoTime::Now(MonoTime::FINE));
s = GetConsensusState(replica, tablet_id, CONSENSUS_CONFIG_COMMITTED,
remaining_timeout, &cstate);
if (s.ok()) {
if (CountVoters(cstate.config()) == config_size) {
return Status::OK();
}
}
if (MonoTime::Now(MonoTime::FINE).GetDeltaSince(start).MoreThan(timeout)) {
break;
}
SleepFor(MonoDelta::FromMilliseconds(1 << backoff_exp));
backoff_exp = min(backoff_exp + 1, kMaxBackoffExp);
}
return Status::TimedOut(Substitute("Number of voters does not equal $0 after waiting for $1."
"Last consensus state: $2. Last status: $3",
config_size, timeout.ToString(),
cstate.ShortDebugString(), s.ToString()));
}
Status WaitUntilCommittedConfigOpIdIndexIs(int64_t opid_index,
const TServerDetails* replica,
const std::string& tablet_id,
const MonoDelta& timeout) {
MonoTime start = MonoTime::Now(MonoTime::FINE);
MonoTime deadline = start;
deadline.AddDelta(timeout);
Status s;
ConsensusStatePB cstate;
while (true) {
MonoDelta remaining_timeout = deadline.GetDeltaSince(MonoTime::Now(MonoTime::FINE));
s = GetConsensusState(replica, tablet_id, CONSENSUS_CONFIG_COMMITTED,
remaining_timeout, &cstate);
if (s.ok() && cstate.config().opid_index() == opid_index) {
return Status::OK();
}
if (MonoTime::Now(MonoTime::FINE).GetDeltaSince(start).MoreThan(timeout)) break;
SleepFor(MonoDelta::FromMilliseconds(10));
}
return Status::TimedOut(Substitute("Committed config opid_index does not equal $0 "
"after waiting for $1. "
"Last consensus state: $2. Last status: $3",
opid_index,
MonoTime::Now(MonoTime::FINE).GetDeltaSince(start).ToString(),
cstate.ShortDebugString(), s.ToString()));
}
Status WaitUntilCommittedOpIdIndexIs(int64_t opid_index,
TServerDetails* replica,
const string& tablet_id,
const MonoDelta& timeout) {
MonoTime start = MonoTime::Now(MonoTime::FINE);
MonoTime deadline = start;
deadline.AddDelta(timeout);
Status s;
OpId op_id;
while (true) {
MonoDelta remaining_timeout = deadline.GetDeltaSince(MonoTime::Now(MonoTime::FINE));
s = GetLastOpIdForReplica(tablet_id, replica, consensus::COMMITTED_OPID, remaining_timeout,
&op_id);
if (s.ok() && op_id.index() == opid_index) {
return Status::OK();
}
if (MonoTime::Now(MonoTime::FINE).GetDeltaSince(start).MoreThan(timeout)) break;
SleepFor(MonoDelta::FromMilliseconds(10));
}
return Status::TimedOut(Substitute("Committed consensus opid_index does not equal $0 "
"after waiting for $1. Last opid: $2. Last status: $3",
opid_index,
MonoTime::Now(MonoTime::FINE).GetDeltaSince(start).ToString(),
OpIdToString(op_id),
s.ToString()));
}
Status GetReplicaStatusAndCheckIfLeader(const TServerDetails* replica,
const string& tablet_id,
const MonoDelta& timeout) {
ConsensusStatePB cstate;
Status s = GetConsensusState(replica, tablet_id, CONSENSUS_CONFIG_ACTIVE,
timeout, &cstate);
if (PREDICT_FALSE(!s.ok())) {
VLOG(1) << "Error getting consensus state from replica: "
<< replica->instance_id.permanent_uuid();
return Status::NotFound("Error connecting to replica", s.ToString());
}
const string& replica_uuid = replica->instance_id.permanent_uuid();
if (cstate.has_leader_uuid() && cstate.leader_uuid() == replica_uuid) {
return Status::OK();
}
VLOG(1) << "Replica not leader of config: " << replica->instance_id.permanent_uuid();
return Status::IllegalState("Replica found but not leader");
}
Status WaitUntilLeader(const TServerDetails* replica,
const string& tablet_id,
const MonoDelta& timeout) {
MonoTime start = MonoTime::Now(MonoTime::FINE);
MonoTime deadline = start;
deadline.AddDelta(timeout);
int backoff_exp = 0;
const int kMaxBackoffExp = 7;
Status s;
while (true) {
MonoDelta remaining_timeout = deadline.GetDeltaSince(MonoTime::Now(MonoTime::FINE));
s = GetReplicaStatusAndCheckIfLeader(replica, tablet_id, remaining_timeout);
if (s.ok()) {
return Status::OK();
}
if (MonoTime::Now(MonoTime::FINE).GetDeltaSince(start).MoreThan(timeout)) {
break;
}
SleepFor(MonoDelta::FromMilliseconds(1 << backoff_exp));
backoff_exp = min(backoff_exp + 1, kMaxBackoffExp);
}
return Status::TimedOut(Substitute("Replica $0 is not leader after waiting for $1: $2",
replica->ToString(), timeout.ToString(), s.ToString()));
}
Status FindTabletLeader(const TabletServerMap& tablet_servers,
const string& tablet_id,
const MonoDelta& timeout,
TServerDetails** leader) {
vector<TServerDetails*> tservers;
AppendValuesFromMap(tablet_servers, &tservers);
MonoTime start = MonoTime::Now(MonoTime::FINE);
MonoTime deadline = start;
deadline.AddDelta(timeout);
Status s;
int i = 0;
while (true) {
MonoDelta remaining_timeout = deadline.GetDeltaSince(MonoTime::Now(MonoTime::FINE));
s = GetReplicaStatusAndCheckIfLeader(tservers[i], tablet_id, remaining_timeout);
if (s.ok()) {
*leader = tservers[i];
return Status::OK();
}
if (deadline.ComesBefore(MonoTime::Now(MonoTime::FINE))) break;
i = (i + 1) % tservers.size();
if (i == 0) {
SleepFor(MonoDelta::FromMilliseconds(10));
}
}
return Status::TimedOut(Substitute("Unable to find leader of tablet $0 after $1. "
"Status message: $2", tablet_id,
MonoTime::Now(MonoTime::FINE).GetDeltaSince(start).ToString(),
s.ToString()));
}
Status StartElection(const TServerDetails* replica,
const string& tablet_id,
const MonoDelta& timeout) {
RunLeaderElectionRequestPB req;
req.set_dest_uuid(replica->uuid());
req.set_tablet_id(tablet_id);
RunLeaderElectionResponsePB resp;
RpcController rpc;
rpc.set_timeout(timeout);
RETURN_NOT_OK(replica->consensus_proxy->RunLeaderElection(req, &resp, &rpc));
if (resp.has_error()) {
return StatusFromPB(resp.error().status())
.CloneAndPrepend(Substitute("Code $0", TabletServerErrorPB::Code_Name(resp.error().code())));
}
return Status::OK();
}
Status LeaderStepDown(const TServerDetails* replica,
const string& tablet_id,
const MonoDelta& timeout,
TabletServerErrorPB* error) {
LeaderStepDownRequestPB req;
req.set_dest_uuid(replica->uuid());
req.set_tablet_id(tablet_id);
LeaderStepDownResponsePB resp;
RpcController rpc;
rpc.set_timeout(timeout);
RETURN_NOT_OK(replica->consensus_proxy->LeaderStepDown(req, &resp, &rpc));
if (resp.has_error()) {
if (error != nullptr) {
*error = resp.error();
}
return StatusFromPB(resp.error().status())
.CloneAndPrepend(Substitute("Code $0", TabletServerErrorPB::Code_Name(resp.error().code())));
}
return Status::OK();
}
Status WriteSimpleTestRow(const TServerDetails* replica,
const std::string& tablet_id,
RowOperationsPB::Type write_type,
int32_t key,
int32_t int_val,
const string& string_val,
const MonoDelta& timeout) {
WriteRequestPB req;
WriteResponsePB resp;
RpcController rpc;
rpc.set_timeout(timeout);
req.set_tablet_id(tablet_id);
Schema schema = GetSimpleTestSchema();
RETURN_NOT_OK(SchemaToPB(schema, req.mutable_schema()));
AddTestRowToPB(write_type, schema, key, int_val, string_val, req.mutable_row_operations());
RETURN_NOT_OK(replica->tserver_proxy->Write(req, &resp, &rpc));
if (resp.has_error()) {
return StatusFromPB(resp.error().status());
}
return Status::OK();
}
Status AddServer(const TServerDetails* leader,
const std::string& tablet_id,
const TServerDetails* replica_to_add,
consensus::RaftPeerPB::MemberType member_type,
const boost::optional<int64_t>& cas_config_opid_index,
const MonoDelta& timeout,
TabletServerErrorPB::Code* error_code) {
ChangeConfigRequestPB req;
ChangeConfigResponsePB resp;
RpcController rpc;
rpc.set_timeout(timeout);
req.set_dest_uuid(leader->uuid());
req.set_tablet_id(tablet_id);
req.set_type(consensus::ADD_SERVER);
RaftPeerPB* peer = req.mutable_server();
peer->set_permanent_uuid(replica_to_add->uuid());
peer->set_member_type(member_type);
*peer->mutable_last_known_addr() = replica_to_add->registration.rpc_addresses(0);
if (cas_config_opid_index) {
req.set_cas_config_opid_index(*cas_config_opid_index);
}
RETURN_NOT_OK(leader->consensus_proxy->ChangeConfig(req, &resp, &rpc));
if (resp.has_error()) {
if (error_code) *error_code = resp.error().code();
return StatusFromPB(resp.error().status());
}
return Status::OK();
}
Status RemoveServer(const TServerDetails* leader,
const std::string& tablet_id,
const TServerDetails* replica_to_remove,
const boost::optional<int64_t>& cas_config_opid_index,
const MonoDelta& timeout,
TabletServerErrorPB::Code* error_code) {
ChangeConfigRequestPB req;
ChangeConfigResponsePB resp;
RpcController rpc;
rpc.set_timeout(timeout);
req.set_dest_uuid(leader->uuid());
req.set_tablet_id(tablet_id);
req.set_type(consensus::REMOVE_SERVER);
if (cas_config_opid_index) {
req.set_cas_config_opid_index(*cas_config_opid_index);
}
RaftPeerPB* peer = req.mutable_server();
peer->set_permanent_uuid(replica_to_remove->uuid());
RETURN_NOT_OK(leader->consensus_proxy->ChangeConfig(req, &resp, &rpc));
if (resp.has_error()) {
if (error_code) *error_code = resp.error().code();
return StatusFromPB(resp.error().status());
}
return Status::OK();
}
Status ListTablets(const TServerDetails* ts,
const MonoDelta& timeout,
vector<ListTabletsResponsePB::StatusAndSchemaPB>* tablets) {
tserver::ListTabletsRequestPB req;
tserver::ListTabletsResponsePB resp;
RpcController rpc;
rpc.set_timeout(timeout);
RETURN_NOT_OK(ts->tserver_proxy->ListTablets(req, &resp, &rpc));
if (resp.has_error()) {
return StatusFromPB(resp.error().status());
}
tablets->assign(resp.status_and_schema().begin(), resp.status_and_schema().end());
return Status::OK();
}
Status ListRunningTabletIds(const TServerDetails* ts,
const MonoDelta& timeout,
vector<string>* tablet_ids) {
vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets;
RETURN_NOT_OK(ListTablets(ts, timeout, &tablets));
tablet_ids->clear();
for (const ListTabletsResponsePB::StatusAndSchemaPB& t : tablets) {
if (t.tablet_status().state() == tablet::RUNNING) {
tablet_ids->push_back(t.tablet_status().tablet_id());
}
}
return Status::OK();
}
Status GetTabletLocations(const shared_ptr<MasterServiceProxy>& master_proxy,
const string& tablet_id,
const MonoDelta& timeout,
master::TabletLocationsPB* tablet_locations) {
master::GetTabletLocationsResponsePB resp;
master::GetTabletLocationsRequestPB req;
*req.add_tablet_ids() = tablet_id;
rpc::RpcController rpc;
rpc.set_timeout(timeout);
RETURN_NOT_OK(master_proxy->GetTabletLocations(req, &resp, &rpc));
if (resp.has_error()) {
return StatusFromPB(resp.error().status());
}
if (resp.errors_size() > 0) {
CHECK_EQ(1, resp.errors_size()) << resp.ShortDebugString();
return StatusFromPB(resp.errors(0).status());
}
CHECK_EQ(1, resp.tablet_locations_size()) << resp.ShortDebugString();
*tablet_locations = resp.tablet_locations(0);
return Status::OK();
}
Status GetTableLocations(const shared_ptr<MasterServiceProxy>& master_proxy,
const string& table_name,
const MonoDelta& timeout,
master::GetTableLocationsResponsePB* table_locations) {
master::GetTableLocationsRequestPB req;
req.mutable_table()->set_table_name(table_name);
req.set_max_returned_locations(1000);
rpc::RpcController rpc;
rpc.set_timeout(timeout);
RETURN_NOT_OK(master_proxy->GetTableLocations(req, table_locations, &rpc));
if (table_locations->has_error()) {
return StatusFromPB(table_locations->error().status());
}
return Status::OK();
}
Status WaitForNumVotersInConfigOnMaster(const shared_ptr<MasterServiceProxy>& master_proxy,
const std::string& tablet_id,
int num_voters,
const MonoDelta& timeout) {
Status s;
MonoTime deadline = MonoTime::Now(MonoTime::FINE);
deadline.AddDelta(timeout);
int num_voters_found = 0;
while (true) {
TabletLocationsPB tablet_locations;
MonoDelta time_remaining = deadline.GetDeltaSince(MonoTime::Now(MonoTime::FINE));
s = GetTabletLocations(master_proxy, tablet_id, time_remaining, &tablet_locations);
if (s.ok()) {
num_voters_found = 0;
for (const TabletLocationsPB::ReplicaPB& r : tablet_locations.replicas()) {
if (r.role() == RaftPeerPB::LEADER || r.role() == RaftPeerPB::FOLLOWER) num_voters_found++;
}
if (num_voters_found == num_voters) break;
}
if (deadline.ComesBefore(MonoTime::Now(MonoTime::FINE))) break;
SleepFor(MonoDelta::FromMilliseconds(10));
}
RETURN_NOT_OK(s);
if (num_voters_found != num_voters) {
return Status::IllegalState(
Substitute("Did not find exactly $0 voters, found $1 voters",
num_voters, num_voters_found));
}
return Status::OK();
}
Status WaitForNumTabletsOnTS(TServerDetails* ts,
int count,
const MonoDelta& timeout,
vector<ListTabletsResponsePB::StatusAndSchemaPB>* tablets) {
Status s;
MonoTime deadline = MonoTime::Now(MonoTime::FINE);
deadline.AddDelta(timeout);
while (true) {
s = ListTablets(ts, MonoDelta::FromSeconds(10), tablets);
if (s.ok() && tablets->size() == count) break;
if (deadline.ComesBefore(MonoTime::Now(MonoTime::FINE))) break;
SleepFor(MonoDelta::FromMilliseconds(10));
}
RETURN_NOT_OK(s);
if (tablets->size() != count) {
return Status::IllegalState(
Substitute("Did not find exactly $0 tablets, found $1 tablets",
count, tablets->size()));
}
return Status::OK();
}
Status WaitUntilTabletInState(TServerDetails* ts,
const std::string& tablet_id,
tablet::TabletStatePB state,
const MonoDelta& timeout) {
MonoTime start = MonoTime::Now(MonoTime::FINE);
MonoTime deadline = start;
deadline.AddDelta(timeout);
vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets;
Status s;
tablet::TabletStatePB last_state = tablet::UNKNOWN;
while (true) {
s = ListTablets(ts, MonoDelta::FromSeconds(10), &tablets);
if (s.ok()) {
bool seen = false;
for (const ListTabletsResponsePB::StatusAndSchemaPB& t : tablets) {
if (t.tablet_status().tablet_id() == tablet_id) {
seen = true;
last_state = t.tablet_status().state();
if (last_state == state) {
return Status::OK();
}
}
}
if (!seen) {
s = Status::NotFound("Tablet " + tablet_id + " not found");
}
}
if (deadline.ComesBefore(MonoTime::Now(MonoTime::FINE))) {
break;
}
SleepFor(MonoDelta::FromMilliseconds(10));
}
return Status::TimedOut(Substitute("T $0 P $1: Tablet not in $2 state after $3: "
"Tablet state: $4, Status message: $5",
tablet_id, ts->uuid(),
tablet::TabletStatePB_Name(state),
MonoTime::Now(MonoTime::FINE).GetDeltaSince(start).ToString(),
tablet::TabletStatePB_Name(last_state), s.ToString()));
}
// Wait until the specified tablet is in RUNNING state.
Status WaitUntilTabletRunning(TServerDetails* ts,
const std::string& tablet_id,
const MonoDelta& timeout) {
return WaitUntilTabletInState(ts, tablet_id, tablet::RUNNING, timeout);
}
Status DeleteTablet(const TServerDetails* ts,
const std::string& tablet_id,
const tablet::TabletDataState delete_type,
const boost::optional<int64_t>& cas_config_opid_index_less_or_equal,
const MonoDelta& timeout,
tserver::TabletServerErrorPB::Code* error_code) {
DeleteTabletRequestPB req;
DeleteTabletResponsePB resp;
RpcController rpc;
rpc.set_timeout(timeout);
req.set_dest_uuid(ts->uuid());
req.set_tablet_id(tablet_id);
req.set_delete_type(delete_type);
if (cas_config_opid_index_less_or_equal) {
req.set_cas_config_opid_index_less_or_equal(*cas_config_opid_index_less_or_equal);
}
RETURN_NOT_OK(ts->tserver_admin_proxy->DeleteTablet(req, &resp, &rpc));
if (resp.has_error()) {
if (error_code) {
*error_code = resp.error().code();
}
return StatusFromPB(resp.error().status());
}
return Status::OK();
}
Status StartRemoteBootstrap(const TServerDetails* ts,
const string& tablet_id,
const string& bootstrap_source_uuid,
const HostPort& bootstrap_source_addr,
int64_t caller_term,
const MonoDelta& timeout) {
consensus::StartRemoteBootstrapRequestPB req;
consensus::StartRemoteBootstrapResponsePB resp;
RpcController rpc;
rpc.set_timeout(timeout);
req.set_dest_uuid(ts->uuid());
req.set_tablet_id(tablet_id);
req.set_bootstrap_peer_uuid(bootstrap_source_uuid);
RETURN_NOT_OK(HostPortToPB(bootstrap_source_addr, req.mutable_bootstrap_peer_addr()));
req.set_caller_term(caller_term);
RETURN_NOT_OK(ts->consensus_proxy->StartRemoteBootstrap(req, &resp, &rpc));
if (resp.has_error()) {
return StatusFromPB(resp.error().status());
}
return Status::OK();
}
} // namespace itest
} // namespace kudu