blob: 82754b71bf378a10023c0d7a3bb5c99fc74aa2ff [file] [log] [blame]
// Copyright 2013 Cloudera, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <gtest/gtest.h>
#include "kudu/common/schema.h"
#include "kudu/common/wire_protocol-test-util.h"
#include "kudu/consensus/consensus_peers.h"
#include "kudu/consensus/consensus-test-util.h"
#include "kudu/consensus/log.h"
#include "kudu/consensus/log_anchor_registry.h"
#include "kudu/consensus/log_util.h"
#include "kudu/consensus/opid_util.h"
#include "kudu/fs/fs_manager.h"
#include "kudu/server/hybrid_clock.h"
#include "kudu/util/metrics.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
METRIC_DECLARE_entity(tablet);
namespace kudu {
namespace consensus {
using log::Log;
using log::LogOptions;
using log::LogAnchorRegistry;
const char* kTabletId = "test-peers-tablet";
const char* kLeaderUuid = "peer-0";
const char* kFollowerUuid = "peer-1";
class ConsensusPeersTest : public KuduTest {
public:
ConsensusPeersTest()
: metric_entity_(METRIC_ENTITY_tablet.Instantiate(&metric_registry_, "peer-test")),
schema_(GetSimpleTestSchema()) {
CHECK_OK(ThreadPoolBuilder("test-peer-pool").set_max_threads(1).Build(&pool_));
}
virtual void SetUp() OVERRIDE {
KuduTest::SetUp();
fs_manager_.reset(new FsManager(env_.get(), GetTestPath("fs_root")));
CHECK_OK(fs_manager_->CreateInitialFileSystemLayout());
CHECK_OK(Log::Open(options_,
fs_manager_.get(),
kTabletId,
schema_,
0, // schema_version
NULL,
&log_));
clock_.reset(new server::HybridClock());
ASSERT_OK(clock_->Init());
consensus_.reset(new TestRaftConsensusQueueIface());
message_queue_.reset(new PeerMessageQueue(metric_entity_,
log_.get(),
FakeRaftPeerPB(kLeaderUuid),
kTabletId));
message_queue_->RegisterObserver(consensus_.get());
}
virtual void TearDown() OVERRIDE {
CHECK_OK(log_->WaitUntilAllFlushed());
}
DelayablePeerProxy<NoOpTestPeerProxy>* NewRemotePeer(
const string& peer_name,
gscoped_ptr<Peer>* peer) {
RaftPeerPB peer_pb;
peer_pb.set_permanent_uuid(peer_name);
DelayablePeerProxy<NoOpTestPeerProxy>* proxy_ptr =
new DelayablePeerProxy<NoOpTestPeerProxy>(pool_.get(),
new NoOpTestPeerProxy(pool_.get(), peer_pb));
gscoped_ptr<PeerProxy> proxy(proxy_ptr);
CHECK_OK(Peer::NewRemotePeer(peer_pb,
kTabletId,
kLeaderUuid,
message_queue_.get(),
pool_.get(),
proxy.Pass(),
peer));
return proxy_ptr;
}
void CheckLastLogEntry(int term, int index) {
OpId id;
log_->GetLatestEntryOpId(&id);
ASSERT_EQ(id.term(), term);
ASSERT_EQ(id.index(), index);
}
void CheckLastRemoteEntry(DelayablePeerProxy<NoOpTestPeerProxy>* proxy, int term, int index) {
OpId id;
id.CopyFrom(proxy->proxy()->last_received());
ASSERT_EQ(id.term(), term);
ASSERT_EQ(id.index(), index);
}
// Registers a callback triggered when the op with the provided term and index
// is committed in the test consensus impl.
// This must be called _before_ the operation is committed.
void WaitForMajorityReplicatedIndex(int index) {
for (int i = 0; i < 100; i++) {
if (consensus_->IsMajorityReplicated(index)) {
return;
}
SleepFor(MonoDelta::FromMilliseconds(i));
}
FAIL() << "Never replicated index " << index << " on a majority";
}
protected:
gscoped_ptr<TestRaftConsensusQueueIface> consensus_;
MetricRegistry metric_registry_;
scoped_refptr<MetricEntity> metric_entity_;
gscoped_ptr<FsManager> fs_manager_;
scoped_refptr<Log> log_;
gscoped_ptr<PeerMessageQueue> message_queue_;
const Schema schema_;
LogOptions options_;
gscoped_ptr<ThreadPool> pool_;
scoped_refptr<server::Clock> clock_;
};
// Tests that a remote peer is correctly built and tracked
// by the message queue.
// After the operations are considered done the proxy (which
// simulates the other endpoint) should reflect the replicated
// messages.
TEST_F(ConsensusPeersTest, TestRemotePeer) {
// We use a majority size of 2 since we make one fake remote peer
// in addition to our real local log.
message_queue_->Init(MinimumOpId());
message_queue_->SetLeaderMode(MinimumOpId(),
MinimumOpId().term(),
BuildRaftConfigPBForTests(3));
gscoped_ptr<Peer> remote_peer;
DelayablePeerProxy<NoOpTestPeerProxy>* proxy =
NewRemotePeer(kFollowerUuid, &remote_peer);
// Append a bunch of messages to the queue
AppendReplicateMessagesToQueue(message_queue_.get(), clock_, 1, 20);
// The above append ends up appending messages in term 2, so we
// update the peer's term to match.
remote_peer->SetTermForTest(2);
// signal the peer there are requests pending.
remote_peer->SignalRequest();
// now wait on the status of the last operation
// this will complete once the peer has logged all
// requests.
WaitForMajorityReplicatedIndex(20);
// verify that the replicated watermark corresponds to the last replicated
// message.
CheckLastRemoteEntry(proxy, 2, 20);
}
TEST_F(ConsensusPeersTest, TestRemotePeers) {
message_queue_->Init(MinimumOpId());
message_queue_->SetLeaderMode(MinimumOpId(),
MinimumOpId().term(),
BuildRaftConfigPBForTests(3));
// Create a set of remote peers
gscoped_ptr<Peer> remote_peer1;
DelayablePeerProxy<NoOpTestPeerProxy>* remote_peer1_proxy =
NewRemotePeer("peer-1", &remote_peer1);
gscoped_ptr<Peer> remote_peer2;
DelayablePeerProxy<NoOpTestPeerProxy>* remote_peer2_proxy =
NewRemotePeer("peer-2", &remote_peer2);
// Delay the response from the second remote peer.
remote_peer2_proxy->DelayResponse();
// Append one message to the queue.
AppendReplicateMessagesToQueue(message_queue_.get(), clock_, 1, 1);
OpId first = MakeOpId(0, 1);
remote_peer1->SignalRequest();
remote_peer2->SignalRequest();
// Now wait for the message to be replicated, this should succeed since
// majority = 2 and only one peer was delayed. The majority is made up
// of remote-peer1 and the local log.
WaitForMajorityReplicatedIndex(first.index());
CheckLastLogEntry(first.term(), first.index());
CheckLastRemoteEntry(remote_peer1_proxy, first.term(), first.index());
remote_peer2_proxy->Respond(TestPeerProxy::kUpdate);
// Wait until all peers have replicated the message, otherwise
// when we add the next one remote_peer2 might find the next message
// in the queue and will replicate it, which is not what we want.
while (!OpIdEquals(message_queue_->GetAllReplicatedIndexForTests(), first)) {
SleepFor(MonoDelta::FromMilliseconds(1));
}
// Now append another message to the queue
AppendReplicateMessagesToQueue(message_queue_.get(), clock_, 2, 1);
// We should not see it replicated, even after 10ms,
// since only the local peer replicates the message.
SleepFor(MonoDelta::FromMilliseconds(10));
ASSERT_FALSE(consensus_->IsMajorityReplicated(2));
// Signal one of the two remote peers.
remote_peer1->SignalRequest();
// We should now be able to wait for it to replicate, since two peers (a majority)
// have replicated the message.
WaitForMajorityReplicatedIndex(2);
}
// Regression test for KUDU-699: even if a peer isn't making progress,
// and thus always has data pending, we should be able to close the peer.
TEST_F(ConsensusPeersTest, TestCloseWhenRemotePeerDoesntMakeProgress) {
message_queue_->Init(MinimumOpId());
message_queue_->SetLeaderMode(MinimumOpId(),
MinimumOpId().term(),
BuildRaftConfigPBForTests(3));
MockedPeerProxy* mock_proxy = new MockedPeerProxy(pool_.get());
gscoped_ptr<Peer> peer;
ASSERT_OK(Peer::NewRemotePeer(FakeRaftPeerPB(kFollowerUuid),
kTabletId,
kLeaderUuid,
message_queue_.get(),
pool_.get(),
gscoped_ptr<PeerProxy>(mock_proxy),
&peer));
// Make the peer respond without making any progress -- it always returns
// that it has only replicated op 0.0. When we see the response, we always
// decide that more data is pending, and we want to send another request.
ConsensusResponsePB peer_resp;
peer_resp.set_responder_uuid(kFollowerUuid);
peer_resp.set_responder_term(0);
peer_resp.mutable_status()->mutable_last_received()->CopyFrom(
MakeOpId(0, 0));
peer_resp.mutable_status()->mutable_last_received_current_leader()->CopyFrom(
MakeOpId(0, 0));
peer_resp.mutable_status()->set_last_committed_idx(0);
mock_proxy->set_update_response(peer_resp);
// Add an op to the queue and start sending requests to the peer.
AppendReplicateMessagesToQueue(message_queue_.get(), clock_, 1, 1);
peer->SignalRequest(true);
// We should be able to close the peer even though it has more data pending.
peer->Close();
}
TEST_F(ConsensusPeersTest, TestDontSendOneRpcPerWriteWhenPeerIsDown) {
message_queue_->Init(MinimumOpId());
message_queue_->SetLeaderMode(MinimumOpId(),
MinimumOpId().term(),
BuildRaftConfigPBForTests(3));
MockedPeerProxy* mock_proxy = new MockedPeerProxy(pool_.get());
gscoped_ptr<Peer> peer;
ASSERT_OK(Peer::NewRemotePeer(FakeRaftPeerPB(kFollowerUuid),
kTabletId,
kLeaderUuid,
message_queue_.get(),
pool_.get(),
gscoped_ptr<PeerProxy>(mock_proxy),
&peer));
// Initial response has to be successful -- otherwise we'll consider the peer
// "new" and only send heartbeat RPCs.
ConsensusResponsePB initial_resp;
initial_resp.set_responder_uuid(kFollowerUuid);
initial_resp.set_responder_term(0);
initial_resp.mutable_status()->mutable_last_received()->CopyFrom(
MakeOpId(1, 1));
initial_resp.mutable_status()->mutable_last_received_current_leader()->CopyFrom(
MakeOpId(1, 1));
initial_resp.mutable_status()->set_last_committed_idx(0);
mock_proxy->set_update_response(initial_resp);
AppendReplicateMessagesToQueue(message_queue_.get(), clock_, 1, 1);
peer->SignalRequest(true);
// Now wait for the message to be replicated, this should succeed since
// the local (leader) peer always acks and the follower also acked this time.
WaitForMajorityReplicatedIndex(1);
// Set up the peer to respond with an error.
ConsensusResponsePB error_resp;
error_resp.mutable_error()->set_code(tserver::TabletServerErrorPB::UNKNOWN_ERROR);
StatusToPB(Status::NotFound("fake error"), error_resp.mutable_error()->mutable_status());
mock_proxy->set_update_response(error_resp);
// Add a bunch of messages to the queue.
for (int i = 2; i <= 100; i++) {
AppendReplicateMessagesToQueue(message_queue_.get(), clock_, i, 1);
peer->SignalRequest(false);
SleepFor(MonoDelta::FromMilliseconds(2));
}
// Check that we didn't attempt to send one UpdateConsensus call per
// Write. 100 writes might have taken a second or two, though, so it's
// OK to have called UpdateConsensus() a few times due to regularly
// scheduled heartbeats.
ASSERT_LT(mock_proxy->update_count(), 5);
}
} // namespace consensus
} // namespace kudu