blob: 7f2fbbd3825949f9c56396f4e229ab724b9717d5 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <functional>
#include <map>
#include <memory>
#include <mutex>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>
#include <gmock/gmock.h>
#include "kudu/clock/clock.h"
#include "kudu/common/timestamp.h"
#include "kudu/common/wire_protocol.h"
#include "kudu/consensus/consensus_peers.h"
#include "kudu/consensus/consensus_queue.h"
#include "kudu/consensus/log.h"
#include "kudu/consensus/opid_util.h"
#include "kudu/consensus/raft_consensus.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/rpc/messenger.h"
#include "kudu/util/countdown_latch.h"
#include "kudu/util/locks.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/threadpool.h"
#define TOKENPASTE(x, y) x ## y
#define TOKENPASTE2(x, y) TOKENPASTE(x, y)
#define ASSERT_OPID_EQ(left, right) \
do { \
const consensus::OpId& TOKENPASTE2(_left, __LINE__) = (left); \
const consensus::OpId& TOKENPASTE2(_right, __LINE__) = (right); \
if (!consensus::OpIdEquals(TOKENPASTE2(_left, __LINE__), TOKENPASTE2(_right, __LINE__))) { \
FAIL() << "Expected: " \
<< pb_util::SecureShortDebugString(TOKENPASTE2(_left, __LINE__)) << "\n" \
<< "Value: " \
<< pb_util::SecureShortDebugString(TOKENPASTE2(_right, __LINE__)) << "\n"; \
} \
} while (false)
namespace kudu {
namespace consensus {
inline std::unique_ptr<ReplicateMsg> CreateDummyReplicate(int64_t term,
int64_t index,
const Timestamp& timestamp,
int64_t payload_size) {
std::unique_ptr<ReplicateMsg> msg(new ReplicateMsg);
OpId* id = msg->mutable_id();
id->set_term(term);
id->set_index(index);
msg->set_op_type(NO_OP);
msg->mutable_noop_request()->mutable_payload_for_tests()->resize(payload_size);
msg->set_timestamp(timestamp.ToUint64());
return msg;
}
// Returns RaftPeerPB with given UUID and obviously-fake hostname / port combo.
inline RaftPeerPB FakeRaftPeerPB(const std::string& uuid) {
RaftPeerPB peer_pb;
peer_pb.set_permanent_uuid(uuid);
peer_pb.set_member_type(RaftPeerPB::VOTER);
peer_pb.mutable_last_known_addr()->set_host(strings::Substitute(
"$0-fake-hostname", CURRENT_TEST_NAME()));
peer_pb.mutable_last_known_addr()->set_port(0);
return peer_pb;
}
// Appends 'count' messages to 'queue' with different terms and indexes.
//
// An operation will only be considered done (TestOperationStatus::IsDone()
// will become true) once at least 'n_majority' peers have called
// TestOperationStatus::AckPeer().
inline void AppendReplicateMessagesToQueue(
PeerMessageQueue* queue,
clock::Clock* clock,
int64_t first,
int64_t count,
int64_t payload_size = 0) {
for (int64_t i = first; i < first + count; i++) {
int64_t term = i / 7;
int64_t index = i;
CHECK_OK(queue->AppendOperation(make_scoped_refptr_replicate(
CreateDummyReplicate(term, index, clock->Now(), payload_size).release())));
}
}
// Builds a configuration of 'num' voters.
inline RaftConfigPB BuildRaftConfigPBForTests(int num_voters, int num_non_voters = 0) {
RaftConfigPB raft_config;
for (int i = 0; i < num_voters; i++) {
RaftPeerPB* peer_pb = raft_config.add_peers();
peer_pb->set_member_type(RaftPeerPB::VOTER);
peer_pb->set_permanent_uuid(strings::Substitute("peer-$0", i));
HostPortPB* hp = peer_pb->mutable_last_known_addr();
hp->set_host(strings::Substitute("peer-$0.fake-domain-for-tests", i));
hp->set_port(0);
}
for (int i = 0; i < num_non_voters; i++) {
RaftPeerPB* peer_pb = raft_config.add_peers();
peer_pb->set_member_type(RaftPeerPB::NON_VOTER);
peer_pb->set_permanent_uuid(strings::Substitute("non-voter-peer-$0", i));
HostPortPB* hp = peer_pb->mutable_last_known_addr();
hp->set_host(strings::Substitute("non-voter-peer-$0.fake-domain-for-tests", i));
hp->set_port(0);
}
return raft_config;
}
// Abstract base class to build PeerProxy implementations on top of for testing.
// Provides a single-threaded pool to run callbacks in and callback
// registration/running, along with an enum to identify the supported methods.
class TestPeerProxy : public PeerProxy {
public:
// Which PeerProxy method to invoke.
enum Method {
kUpdate,
kRequestVote,
};
explicit TestPeerProxy(ThreadPool* pool) : pool_(pool) {}
std::string PeerName() const override {
return "TestPeerProxy";
}
protected:
// Register the RPC callback in order to call later.
// We currently only support one request of each method being in flight at a time.
void RegisterCallback(Method method, const rpc::ResponseCallback& callback) {
std::lock_guard<simple_spinlock> lock(lock_);
InsertOrDie(&callbacks_, method, callback);
}
// Answer the peer.
virtual void Respond(Method method) {
rpc::ResponseCallback callback;
{
std::lock_guard<simple_spinlock> lock(lock_);
callback = FindOrDie(callbacks_, method);
CHECK_EQ(1, callbacks_.erase(method));
// Drop the lock before submitting to the pool, since the callback itself may
// destroy this instance.
}
// If the peer has been closed while a response was in-flight, this can
// return a bad Status, but that's fine.
ignore_result(pool_->Submit(std::move(callback)));
}
void RegisterCallbackAndRespond(Method method, const rpc::ResponseCallback& callback) {
RegisterCallback(method, callback);
Respond(method);
}
mutable simple_spinlock lock_;
ThreadPool* pool_;
std::map<Method, rpc::ResponseCallback> callbacks_; // Protected by lock_.
};
template <typename ProxyType>
class DelayablePeerProxy : public TestPeerProxy {
public:
// Add delayability of RPC responses to the delegated impl.
// This class takes ownership of 'proxy'.
explicit DelayablePeerProxy(ThreadPool* pool, ProxyType* proxy)
: TestPeerProxy(pool),
proxy_(CHECK_NOTNULL(proxy)),
delay_response_(false),
latch_(1) {
}
// Delay the answer to the next response to this remote
// peer. The response callback will only be called on Respond().
virtual void DelayResponse() {
std::lock_guard<simple_spinlock> l(lock_);
delay_response_ = true;
latch_.Reset(1); // Reset for the next time.
}
void RespondUnlessDelayed(Method method) {
{
std::lock_guard<simple_spinlock> l(lock_);
if (delay_response_) {
latch_.CountDown();
delay_response_ = false;
return;
}
}
TestPeerProxy::Respond(method);
}
void Respond(Method method) override {
latch_.Wait(); // Wait until strictly after peer would have responded.
return TestPeerProxy::Respond(method);
}
void UpdateAsync(const ConsensusRequestPB& request,
ConsensusResponsePB* response,
rpc::RpcController* controller,
const rpc::ResponseCallback& callback) override {
RegisterCallback(kUpdate, callback);
return proxy_->UpdateAsync(
request, response, controller,
[this]() { this->RespondUnlessDelayed(kUpdate); });
}
void StartElectionAsync(const RunLeaderElectionRequestPB& /*request*/,
RunLeaderElectionResponsePB* /*response*/,
rpc::RpcController* /*controller*/,
const rpc::ResponseCallback& callback) override {
callback();
}
void RequestConsensusVoteAsync(const VoteRequestPB& request,
VoteResponsePB* response,
rpc::RpcController* controller,
const rpc::ResponseCallback& callback) override {
RegisterCallback(kRequestVote, callback);
return proxy_->RequestConsensusVoteAsync(
request, response, controller,
[this]() { this->RespondUnlessDelayed(kRequestVote); });
}
ProxyType* proxy() const {
return proxy_.get();
}
protected:
std::unique_ptr<ProxyType> const proxy_;
bool delay_response_; // Protected by lock_.
CountDownLatch latch_;
};
// Allows complete mocking of a peer's responses.
// You set the response, it will respond with that.
class MockedPeerProxy : public TestPeerProxy {
public:
explicit MockedPeerProxy(ThreadPool* pool)
: TestPeerProxy(pool),
update_count_(0) {
}
void set_update_response(const ConsensusResponsePB& update_response) {
CHECK(update_response.IsInitialized()) << pb_util::SecureShortDebugString(update_response);
std::lock_guard<simple_spinlock> l(lock_);
update_response_ = update_response;
}
void set_vote_response(const VoteResponsePB& vote_response) {
std::lock_guard<simple_spinlock> l(lock_);
vote_response_ = vote_response;
}
void UpdateAsync(const ConsensusRequestPB& /*request*/,
ConsensusResponsePB* response,
rpc::RpcController* /*controller*/,
const rpc::ResponseCallback& callback) override {
{
std::lock_guard<simple_spinlock> l(lock_);
update_count_++;
*response = update_response_;
}
return RegisterCallbackAndRespond(kUpdate, callback);
}
void RequestConsensusVoteAsync(const VoteRequestPB& /*request*/,
VoteResponsePB* response,
rpc::RpcController* /*controller*/,
const rpc::ResponseCallback& callback) override {
*response = vote_response_;
return RegisterCallbackAndRespond(kRequestVote, callback);
}
void StartElectionAsync(const RunLeaderElectionRequestPB& /*request*/,
RunLeaderElectionResponsePB* /*response*/,
rpc::RpcController* /*controller*/,
const rpc::ResponseCallback& callback) override {
callback();
}
// Return the number of times that UpdateAsync() has been called.
int update_count() const {
std::lock_guard<simple_spinlock> l(lock_);
return update_count_;
}
protected:
int update_count_;
ConsensusResponsePB update_response_;
VoteResponsePB vote_response_;
};
// Allows to test peers by emulating a noop remote endpoint that just replies
// that the messages were received/replicated/committed.
class NoOpTestPeerProxy : public TestPeerProxy {
public:
explicit NoOpTestPeerProxy(ThreadPool* pool, consensus::RaftPeerPB peer_pb)
: TestPeerProxy(pool), peer_pb_(std::move(peer_pb)) {
last_received_.CopyFrom(MinimumOpId());
}
void UpdateAsync(const ConsensusRequestPB& request,
ConsensusResponsePB* response,
rpc::RpcController* /*controller*/,
const rpc::ResponseCallback& callback) override {
response->Clear();
{
std::lock_guard<simple_spinlock> lock(lock_);
if (OpIdLessThan(last_received_, request.preceding_id())) {
ConsensusErrorPB* error = response->mutable_status()->mutable_error();
error->set_code(ConsensusErrorPB::PRECEDING_ENTRY_DIDNT_MATCH);
StatusToPB(Status::IllegalState(""), error->mutable_status());
} else if (request.ops_size() > 0) {
last_received_.CopyFrom(request.ops(request.ops_size() - 1).id());
}
response->set_responder_uuid(peer_pb_.permanent_uuid());
response->set_responder_term(request.caller_term());
response->mutable_status()->mutable_last_received()->CopyFrom(last_received_);
response->mutable_status()->mutable_last_received_current_leader()->CopyFrom(last_received_);
// We set the last committed index to be the same index as the last received. While
// this is unlikely to happen in a real situation, its not technically incorrect and
// avoids having to come up with some other index that it still correct.
response->mutable_status()->set_last_committed_idx(last_received_.index());
}
return RegisterCallbackAndRespond(kUpdate, callback);
}
void StartElectionAsync(const RunLeaderElectionRequestPB& /*request*/,
RunLeaderElectionResponsePB* /*response*/,
rpc::RpcController* /*controller*/,
const rpc::ResponseCallback& callback) override {
callback();
}
void RequestConsensusVoteAsync(const VoteRequestPB& request,
VoteResponsePB* response,
rpc::RpcController* /*controller*/,
const rpc::ResponseCallback& callback) override {
{
std::lock_guard<simple_spinlock> lock(lock_);
response->set_responder_uuid(peer_pb_.permanent_uuid());
response->set_responder_term(request.candidate_term());
response->set_vote_granted(true);
}
return RegisterCallbackAndRespond(kRequestVote, callback);
}
const OpId& last_received() {
std::lock_guard<simple_spinlock> lock(lock_);
return last_received_;
}
private:
const consensus::RaftPeerPB peer_pb_;
ConsensusStatusPB last_status_; // Protected by lock_.
OpId last_received_; // Protected by lock_.
};
class NoOpTestPeerProxyFactory : public PeerProxyFactory {
public:
NoOpTestPeerProxyFactory() {
CHECK_OK(ThreadPoolBuilder("test-peer-pool").set_max_threads(3).Build(&pool_));
CHECK_OK(rpc::MessengerBuilder("test").Build(&messenger_));
}
Status NewProxy(const consensus::RaftPeerPB& peer_pb,
std::unique_ptr<PeerProxy>* proxy) override {
proxy->reset(new NoOpTestPeerProxy(pool_.get(), peer_pb));
return Status::OK();
}
const std::shared_ptr<rpc::Messenger>& messenger() const override {
return messenger_;
}
private:
std::unique_ptr<ThreadPool> pool_;
std::shared_ptr<rpc::Messenger> messenger_;
};
typedef std::unordered_map<std::string, std::shared_ptr<RaftConsensus>> TestPeerMap;
// Thread-safe manager for list of peers being used in tests.
class TestPeerMapManager {
public:
explicit TestPeerMapManager(RaftConfigPB config) : config_(std::move(config)) {}
void AddPeer(const std::string& peer_uuid, const std::shared_ptr<RaftConsensus>& peer) {
std::lock_guard<simple_spinlock> lock(lock_);
InsertOrDie(&peers_, peer_uuid, peer);
}
Status GetPeerByIdx(int idx, std::shared_ptr<RaftConsensus>* peer_out) const {
CHECK_LT(idx, config_.peers_size());
return GetPeerByUuid(config_.peers(idx).permanent_uuid(), peer_out);
}
Status GetPeerByUuid(const std::string& peer_uuid,
std::shared_ptr<RaftConsensus>* peer_out) const {
std::lock_guard<simple_spinlock> lock(lock_);
if (!FindCopy(peers_, peer_uuid, peer_out)) {
return Status::NotFound("Other consensus instance was destroyed");
}
return Status::OK();
}
void RemovePeer(const std::string& peer_uuid) {
std::lock_guard<simple_spinlock> lock(lock_);
peers_.erase(peer_uuid);
}
TestPeerMap GetPeerMapCopy() const {
std::lock_guard<simple_spinlock> lock(lock_);
return peers_;
}
void Clear() {
// We create a copy of the peers before we clear 'peers_' so that there's
// still a reference to each peer. If we reduce the reference count to 0 under
// the lock we might get a deadlock as on shutdown consensus indirectly
// destroys the test proxies which in turn reach into this class.
TestPeerMap copy = peers_;
{
std::lock_guard<simple_spinlock> lock(lock_);
peers_.clear();
}
}
private:
const RaftConfigPB config_;
TestPeerMap peers_;
mutable simple_spinlock lock_;
};
// Allows to test remote peers by emulating an RPC.
// Both the "remote" peer's RPC call and the caller peer's response are executed
// asynchronously in a ThreadPool.
class LocalTestPeerProxy : public TestPeerProxy {
public:
LocalTestPeerProxy(std::string peer_uuid, ThreadPool* pool,
TestPeerMapManager* peers)
: TestPeerProxy(pool),
peer_uuid_(std::move(peer_uuid)),
peers_(peers),
miss_comm_(false) {}
void UpdateAsync(const ConsensusRequestPB& request,
ConsensusResponsePB* response,
rpc::RpcController* /*controller*/,
const rpc::ResponseCallback& callback) override {
RegisterCallback(kUpdate, callback);
CHECK_OK(pool_->Submit([=]() { this->SendUpdateRequest(request, response); }));
}
void StartElectionAsync(const RunLeaderElectionRequestPB& /*request*/,
RunLeaderElectionResponsePB* /*response*/,
rpc::RpcController* /*controller*/,
const rpc::ResponseCallback& callback) override {
callback();
}
void RequestConsensusVoteAsync(const VoteRequestPB& request,
VoteResponsePB* response,
rpc::RpcController* /*controller*/,
const rpc::ResponseCallback& callback) override {
RegisterCallback(kRequestVote, callback);
CHECK_OK(pool_->Submit([=]() { this->SendVoteRequest(request, response); }));
}
template<class Response>
void SetResponseError(const Status& status, Response* response) {
tserver::TabletServerErrorPB* error = response->mutable_error();
error->set_code(tserver::TabletServerErrorPB::UNKNOWN_ERROR);
StatusToPB(status, error->mutable_status());
}
template<class Request, class Response>
void RespondOrMissResponse(const Request& request,
const Response& response_temp,
Response* final_response,
Method method) {
bool miss_comm_copy;
{
std::lock_guard<simple_spinlock> lock(lock_);
miss_comm_copy = miss_comm_;
miss_comm_ = false;
}
if (PREDICT_FALSE(miss_comm_copy)) {
VLOG(2) << this << ": injecting fault on " << pb_util::SecureShortDebugString(request);
SetResponseError(Status::IOError("Artificial error caused by communication "
"failure injection."), final_response);
} else {
final_response->CopyFrom(response_temp);
}
Respond(method);
}
void SendUpdateRequest(const ConsensusRequestPB& request,
ConsensusResponsePB* response) {
// Copy the request and the response for the other peer so that ownership
// remains as close to the dist. impl. as possible.
ConsensusRequestPB other_peer_req;
other_peer_req.CopyFrom(request);
// Give the other peer a clean response object to write to.
ConsensusResponsePB other_peer_resp;
std::shared_ptr<RaftConsensus> peer;
Status s = peers_->GetPeerByUuid(peer_uuid_, &peer);
if (s.ok()) {
s = peer->Update(&other_peer_req, &other_peer_resp);
if (s.ok() && !other_peer_resp.has_error()) {
CHECK(other_peer_resp.has_status());
CHECK(other_peer_resp.status().IsInitialized());
}
}
if (!s.ok()) {
LOG(WARNING) << "Could not Update replica with request: "
<< pb_util::SecureShortDebugString(other_peer_req)
<< " Status: " << s.ToString();
SetResponseError(s, &other_peer_resp);
}
response->CopyFrom(other_peer_resp);
RespondOrMissResponse(request, other_peer_resp, response, kUpdate);
}
void SendVoteRequest(const VoteRequestPB& request,
VoteResponsePB* response) {
// Copy the request and the response for the other peer so that ownership
// remains as close to the dist. impl. as possible.
VoteRequestPB other_peer_req;
other_peer_req.CopyFrom(request);
VoteResponsePB other_peer_resp;
other_peer_resp.CopyFrom(*response);
std::shared_ptr<RaftConsensus> peer;
Status s = peers_->GetPeerByUuid(peer_uuid_, &peer);
if (s.ok()) {
s = peer->RequestVote(&other_peer_req,
TabletVotingState(boost::none, tablet::TABLET_DATA_READY),
&other_peer_resp);
}
if (!s.ok()) {
LOG(WARNING) << "Could not RequestVote from replica with request: "
<< pb_util::SecureShortDebugString(other_peer_req)
<< " Status: " << s.ToString();
SetResponseError(s, &other_peer_resp);
}
response->CopyFrom(other_peer_resp);
RespondOrMissResponse(request, other_peer_resp, response, kRequestVote);
}
void InjectCommFaultLeaderSide() {
VLOG(2) << this << ": injecting fault next time";
std::lock_guard<simple_spinlock> lock(lock_);
miss_comm_ = true;
}
const std::string& GetTarget() const {
return peer_uuid_;
}
private:
const std::string peer_uuid_;
TestPeerMapManager* const peers_;
bool miss_comm_;
};
class LocalTestPeerProxyFactory : public PeerProxyFactory {
public:
explicit LocalTestPeerProxyFactory(TestPeerMapManager* peers)
: peers_(peers) {
CHECK_OK(ThreadPoolBuilder("test-peer-pool").set_max_threads(3).Build(&pool_));
CHECK_OK(rpc::MessengerBuilder("test").Build(&messenger_));
}
Status NewProxy(const consensus::RaftPeerPB& peer_pb,
std::unique_ptr<PeerProxy>* proxy) override {
LocalTestPeerProxy* new_proxy = new LocalTestPeerProxy(peer_pb.permanent_uuid(),
pool_.get(),
peers_);
proxy->reset(new_proxy);
proxies_.push_back(new_proxy);
return Status::OK();
}
const std::vector<LocalTestPeerProxy*>& GetProxies() {
return proxies_;
}
const std::shared_ptr<rpc::Messenger>& messenger() const override {
return messenger_;
}
private:
std::unique_ptr<ThreadPool> pool_;
std::shared_ptr<rpc::Messenger> messenger_;
TestPeerMapManager* const peers_;
// NOTE: There is no need to delete this on the dctor because proxies are externally managed
std::vector<LocalTestPeerProxy*> proxies_;
};
// A simple implementation of the op driver.
// This is usually implemented by OpDriver but here we keep the implementation
// to the minimally required to have consensus work.
class TestDriver {
public:
TestDriver(ThreadPool* pool, log::Log* log, const scoped_refptr<ConsensusRound>& round)
: round_(round),
pool_(pool),
log_(log) {
}
void SetRound(const scoped_refptr<ConsensusRound>& round) {
round_ = round;
}
// Does nothing but enqueue the Apply
void ReplicationFinished(const Status& status) {
if (status.IsAborted()) {
Cleanup();
return;
}
CHECK_OK(status);
CHECK_OK(pool_->Submit([this]() { this->Apply(); }));
}
// Called in all modes to delete the op and, transitively, the consensus
// round.
void Cleanup() {
delete this;
}
scoped_refptr<ConsensusRound> round_;
private:
// The commit message has the exact same type of the replicate message, but
// no content.
void Apply() {
CommitMsg msg;
msg.set_op_type(round_->replicate_msg()->op_type());
msg.mutable_commited_op_id()->CopyFrom(round_->id());
CHECK_OK(log_->AsyncAppendCommit(
msg, [this](const Status& s) { this->CommitCallback(s); }));
}
void CommitCallback(const Status& s) {
CHECK_OK(s);
Cleanup();
}
ThreadPool* pool_;
log::Log* log_;
};
// A op factory for tests, usually this is implemented by TabletReplica.
class TestOpFactory : public ConsensusRoundHandler {
public:
explicit TestOpFactory(log::Log* log)
: consensus_(nullptr),
log_(log) {
CHECK_OK(ThreadPoolBuilder("test-txn-factory").set_max_threads(1).Build(&pool_));
}
void SetConsensus(RaftConsensus* consensus) {
consensus_ = consensus;
}
Status StartFollowerOp(const scoped_refptr<ConsensusRound>& round) override {
// 'txn' is deleted when it completes.
auto* txn = new TestDriver(pool_.get(), log_, round);
txn->round_->SetConsensusReplicatedCallback(
[txn](const Status& s) { txn->ReplicationFinished(s); });
return Status::OK();
}
void FinishConsensusOnlyRound(ConsensusRound* /*round*/) override {}
void ReplicateAsync(ConsensusRound* round) {
CHECK_OK(consensus_->Replicate(round));
}
void WaitDone() {
pool_->Wait();
}
void ShutDown() {
WaitDone();
pool_->Shutdown();
}
~TestOpFactory() {
ShutDown();
}
private:
std::unique_ptr<ThreadPool> pool_;
RaftConsensus* consensus_;
log::Log* log_;
};
} // namespace consensus
} // namespace kudu