blob: a5b48dbe4c91f30295850a719967d71d33ed983c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.server.impl;
import org.apache.log4j.Level;
import org.apache.ratis.BaseTest;
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.metrics.RatisMetricRegistry;
import org.apache.ratis.protocol.LeaderElectionRequest;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.exceptions.LeaderSteppingDownException;
import org.apache.ratis.protocol.exceptions.TransferLeadershipException;
import org.apache.ratis.rpc.CallId;
import org.apache.ratis.server.DivisionInfo;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.metrics.LeaderElectionMetrics;
import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogTestUtils;
import org.apache.ratis.util.ExitUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.LifeCycle;
import org.apache.ratis.util.Log4jUtils;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.Timestamp;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import static org.apache.ratis.RaftTestUtil.waitForLeader;
import static org.apache.ratis.server.metrics.LeaderElectionMetrics.LAST_LEADER_ELECTION_ELAPSED_TIME;
import static org.apache.ratis.server.metrics.LeaderElectionMetrics.LEADER_ELECTION_COUNT_METRIC;
import static org.apache.ratis.server.metrics.LeaderElectionMetrics.LEADER_ELECTION_TIME_TAKEN;
import static org.apache.ratis.server.metrics.LeaderElectionMetrics.LEADER_ELECTION_TIMEOUT_COUNT_METRIC;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import com.codahale.metrics.Timer;
public abstract class LeaderElectionTests<CLUSTER extends MiniRaftCluster>
extends BaseTest
implements MiniRaftCluster.Factory.Get<CLUSTER> {
{
Log4jUtils.setLogLevel(RaftServer.Division.LOG, Level.DEBUG);
Log4jUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
}
@Test
public void testBasicLeaderElection() throws Exception {
LOG.info("Running testBasicLeaderElection");
final MiniRaftCluster cluster = newCluster(5);
cluster.start();
RaftTestUtil.waitAndKillLeader(cluster);
RaftTestUtil.waitAndKillLeader(cluster);
RaftTestUtil.waitAndKillLeader(cluster);
testFailureCase("waitForLeader after killed a majority of servers",
() -> RaftTestUtil.waitForLeader(cluster, null, false),
IllegalStateException.class);
cluster.shutdown();
}
@Test
public void testChangeLeader() throws Exception {
SegmentedRaftLogTestUtils.setRaftLogWorkerLogLevel(Level.TRACE);
LOG.info("Running testChangeLeader");
final MiniRaftCluster cluster = newCluster(3);
cluster.start();
RaftPeerId leader = RaftTestUtil.waitForLeader(cluster).getId();
for(int i = 0; i < 10; i++) {
leader = RaftTestUtil.changeLeader(cluster, leader, IllegalStateException::new);
ExitUtils.assertNotTerminated();
}
SegmentedRaftLogTestUtils.setRaftLogWorkerLogLevel(Level.INFO);
cluster.shutdown();
}
@Test
public void testLostMajorityHeartbeats() throws Exception {
runWithNewCluster(3, this::runTestLostMajorityHeartbeats);
}
void runTestLostMajorityHeartbeats(CLUSTER cluster) throws Exception {
final TimeDuration maxTimeout = RaftServerConfigKeys.Rpc.timeoutMax(getProperties());
final RaftServer.Division leader = waitForLeader(cluster);
try {
isolate(cluster, leader.getId());
maxTimeout.sleep();
maxTimeout.sleep();
RaftServerTestUtil.assertLostMajorityHeartbeatsRecently(leader);
} finally {
deIsolate(cluster, leader.getId());
}
}
@Test
public void testTransferLeader() throws Exception {
try(final MiniRaftCluster cluster = newCluster(3)) {
cluster.start();
final RaftServer.Division leader = waitForLeader(cluster);
try (RaftClient client = cluster.createClient(leader.getId())) {
client.io().send(new RaftTestUtil.SimpleMessage("message"));
List<RaftServer.Division> followers = cluster.getFollowers();
Assert.assertEquals(followers.size(), 2);
RaftServer.Division newLeader = followers.get(0);
List<RaftPeer> peers = cluster.getPeers();
List<RaftPeer> peersWithNewPriority = getPeersWithPriority(peers, newLeader.getPeer());
RaftClientReply reply = client.admin().setConfiguration(peersWithNewPriority.toArray(new RaftPeer[0]));
Assert.assertTrue(reply.isSuccess());
reply = client.admin().transferLeadership(newLeader.getId(), 20000);
assertTrue(reply.isSuccess());
final RaftServer.Division currLeader = waitForLeader(cluster);
assertTrue(newLeader.getId() == currLeader.getId());
reply = client.io().send(new RaftTestUtil.SimpleMessage("message"));
Assert.assertTrue(reply.getReplierId().equals(newLeader.getId().toString()));
Assert.assertTrue(reply.isSuccess());
}
cluster.shutdown();
}
}
@Test
public void testTransferLeaderTimeout() throws Exception {
try(final MiniRaftCluster cluster = newCluster(3)) {
cluster.start();
final RaftServer.Division leader = waitForLeader(cluster);
try (RaftClient client = cluster.createClient(leader.getId())) {
List<RaftServer.Division> followers = cluster.getFollowers();
Assert.assertEquals(followers.size(), 2);
RaftServer.Division newLeader = followers.get(0);
// isolate new leader, so that transfer leadership will timeout
isolate(cluster, newLeader.getId());
List<RaftPeer> peers = cluster.getPeers();
List<RaftPeer> peersWithNewPriority = getPeersWithPriority(peers, newLeader.getPeer());
RaftClientReply reply = client.admin().setConfiguration(peersWithNewPriority.toArray(new RaftPeer[0]));
Assert.assertTrue(reply.isSuccess());
CompletableFuture<Boolean> transferTimeoutFuture = CompletableFuture.supplyAsync(() -> {
try {
long timeoutMs = 5000;
long start = System.currentTimeMillis();
try {
client.admin().transferLeadership(newLeader.getId(), timeoutMs);
} catch (TransferLeadershipException e) {
long cost = System.currentTimeMillis() - start;
Assert.assertTrue(cost > timeoutMs);
Assert.assertTrue(e.getMessage().contains("Failed to transfer leadership to"));
Assert.assertTrue(e.getMessage().contains("timed out"));
}
return true;
} catch (IOException e) {
return false;
}
});
// before transfer timeout, leader should in steppingDown
JavaUtils.attemptRepeatedly(() -> {
try {
client.io().send(new RaftTestUtil.SimpleMessage("message"));
} catch (LeaderSteppingDownException e) {
Assert.assertTrue(e.getMessage().contains("is stepping down"));
}
return null;
}, 5, TimeDuration.ONE_SECOND, "check leader steppingDown", RaftServer.LOG);
Assert.assertTrue(transferTimeoutFuture.get());
// after transfer timeout, leader should accept request
reply = client.io().send(new RaftTestUtil.SimpleMessage("message"));
Assert.assertTrue(reply.getReplierId().equals(leader.getId().toString()));
Assert.assertTrue(reply.isSuccess());
deIsolate(cluster, newLeader.getId());
}
cluster.shutdown();
}
}
@Test
public void testEnforceLeader() throws Exception {
LOG.info("Running testEnforceLeader");
final int numServer = 5;
try(final MiniRaftCluster cluster = newCluster(numServer)) {
cluster.start();
final RaftPeerId firstLeader = waitForLeader(cluster).getId();
LOG.info("firstLeader = {}", firstLeader);
final int first = MiniRaftCluster.getIdIndex(firstLeader.toString());
final int random = ThreadLocalRandom.current().nextInt(numServer - 1);
final String newLeader = "s" + (random < first? random: random + 1);
LOG.info("enforce leader to {}", newLeader);
enforceLeader(cluster, newLeader, LOG);
}
}
static void enforceLeader(MiniRaftCluster cluster, final String newLeader, Logger LOG) throws InterruptedException {
LOG.info(cluster.printServers());
for(int i = 0; !cluster.tryEnforceLeader(newLeader) && i < 10; i++) {
final RaftServer.Division currLeader = cluster.getLeader();
LOG.info("try enforcing leader to " + newLeader + " but " +
(currLeader == null ? "no leader for round " + i : "new leader is " + currLeader.getId()));
}
LOG.info(cluster.printServers());
final RaftServer.Division leader = cluster.getLeader();
Assert.assertEquals(newLeader, leader.getId().toString());
}
@Test
public void testLateServerStart() throws Exception {
final int numServer = 3;
LOG.info("Running testLateServerStart");
final MiniRaftCluster cluster = newCluster(numServer);
cluster.initServers();
// start all except one servers
final Iterator<RaftServer> i = cluster.getServers().iterator();
for(int j = 1; j < numServer; j++) {
i.next().start();
}
final RaftServer.Division leader = waitForLeader(cluster);
final TimeDuration sleepTime = TimeDuration.valueOf(3, TimeUnit.SECONDS);
LOG.info("sleep " + sleepTime);
sleepTime.sleep();
// start the last server
final RaftServerProxy lastServer = (RaftServerProxy) i.next();
lastServer.start();
final RaftPeerId lastServerLeaderId = JavaUtils.attemptRepeatedly(
() -> Optional.ofNullable(lastServer.getImpls().iterator().next().getState().getLeaderId())
.orElseThrow(() -> new IllegalStateException("No leader yet")),
10, ONE_SECOND, "getLeaderId", LOG);
LOG.info(cluster.printServers());
Assert.assertEquals(leader.getId(), lastServerLeaderId);
}
protected void testDisconnectLeader() throws Exception {
try(final MiniRaftCluster cluster = newCluster(3)) {
cluster.start();
final RaftServer.Division leader = waitForLeader(cluster);
try (RaftClient client = cluster.createClient(leader.getId())) {
client.io().send(new RaftTestUtil.SimpleMessage("message"));
Thread.sleep(1000);
isolate(cluster, leader.getId());
RaftClientReply reply = client.io().send(new RaftTestUtil.SimpleMessage("message"));
Assert.assertNotEquals(reply.getReplierId(), leader.getId().toString());
Assert.assertTrue(reply.isSuccess());
} finally {
deIsolate(cluster, leader.getId());
}
cluster.shutdown();
}
}
private void isolate(MiniRaftCluster cluster, RaftPeerId id) {
try {
BlockRequestHandlingInjection.getInstance().blockReplier(id.toString());
cluster.setBlockRequestsFrom(id.toString(), true);
} catch (Exception e) {
e.printStackTrace();
}
}
private void deIsolate(MiniRaftCluster cluster, RaftPeerId id) {
BlockRequestHandlingInjection.getInstance().unblockReplier(id.toString());
cluster.setBlockRequestsFrom(id.toString(), false);
}
@Test
public void testLeaderElectionMetrics() throws IOException, InterruptedException {
Timestamp timestamp = Timestamp.currentTime();
final MiniRaftCluster cluster = newCluster(3);
cluster.start();
final RaftServer.Division leaderServer = waitForLeader(cluster);
final RatisMetricRegistry ratisMetricRegistry = LeaderElectionMetrics.getMetricRegistryForLeaderElection(
leaderServer.getMemberId());
// Verify each metric individually.
long numLeaderElections = ratisMetricRegistry.counter(LEADER_ELECTION_COUNT_METRIC).getCount();
assertTrue(numLeaderElections > 0);
long numLeaderElectionTimeout = ratisMetricRegistry.counter(LEADER_ELECTION_TIMEOUT_COUNT_METRIC).getCount();
assertTrue(numLeaderElectionTimeout > 0);
Timer timer = ratisMetricRegistry.timer(LEADER_ELECTION_TIME_TAKEN);
double meanTimeNs = timer.getSnapshot().getMean();
long elapsedNs = timestamp.elapsedTime().toLong(TimeUnit.NANOSECONDS);
assertTrue(timer.getCount() > 0 && meanTimeNs < elapsedNs);
Long leaderElectionLatency = (Long) ratisMetricRegistry.getGauges((s, metric) ->
s.contains(LAST_LEADER_ELECTION_ELAPSED_TIME)).values().iterator().next().getValue();
assertTrue(leaderElectionLatency > 0L);
}
@Test
public void testImmediatelyRevertedToFollower() {
RaftServerImpl server = createMockServer(true);
LeaderElection subject = new LeaderElection(server, false);
try {
subject.startInForeground();
assertEquals(LifeCycle.State.CLOSED, subject.getCurrentState());
} catch (Exception e) {
LOG.info("Error starting LeaderElection", e);
fail(e.getMessage());
}
}
@Test
public void testShutdownBeforeStart() {
RaftServerImpl server = createMockServer(false);
LeaderElection subject = new LeaderElection(server, false);
try {
subject.shutdown();
subject.startInForeground();
assertEquals(LifeCycle.State.CLOSED, subject.getCurrentState());
} catch (Exception e) {
LOG.info("Error starting LeaderElection", e);
fail(e.getMessage());
}
}
@Test
public void testPreVote() {
try(final MiniRaftCluster cluster = newCluster(3)) {
cluster.start();
RaftServer.Division leader = waitForLeader(cluster);
try (RaftClient client = cluster.createClient(leader.getId())) {
client.io().send(new RaftTestUtil.SimpleMessage("message"));
final List<RaftServer.Division> followers = cluster.getFollowers();
assertEquals(followers.size(), 2);
RaftServer.Division follower = followers.get(0);
isolate(cluster, follower.getId());
// send message so that the isolated follower's log lag the others
RaftClientReply reply = client.io().send(new RaftTestUtil.SimpleMessage("message"));
Assert.assertTrue(reply.isSuccess());
final long savedTerm = leader.getInfo().getCurrentTerm();
LOG.info("Wait follower {} timeout and trigger pre-vote", follower.getId());
Thread.sleep(2000);
deIsolate(cluster, follower.getId());
Thread.sleep(2000);
// with pre-vote leader will not step down
RaftServer.Division newleader = waitForLeader(cluster);
assertNotNull(newleader);
assertEquals(newleader.getId(), leader.getId());
// with pre-vote, term will not change
assertEquals(savedTerm, leader.getInfo().getCurrentTerm());
reply = client.io().send(new RaftTestUtil.SimpleMessage("message"));
Assert.assertTrue(reply.isSuccess());
}
cluster.shutdown();
} catch (Exception e) {
fail(e.getMessage());
}
}
@Test
public void testPauseResumeLeaderElection() throws Exception {
runWithNewCluster(3, this::runTestPauseResumeLeaderElection);
}
void runTestPauseResumeLeaderElection(CLUSTER cluster) throws IOException, InterruptedException {
final RaftClientReply pauseLeaderReply;
final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
final RaftPeerId leaderId = leader.getId();
final List<RaftServer.Division> followers = cluster.getFollowers();
Assert.assertTrue(followers.size() >= 1);
final RaftServerImpl f1 = (RaftServerImpl)followers.get(0);
try (final RaftClient client = cluster.createClient()) {
final LeaderElectionRequest r = LeaderElectionRequest.newPause(
client.getId(), f1.getId(),cluster.getGroupId(), CallId.getAndIncrement(), 3000);
pauseLeaderReply = f1.getRaftServer().setLeaderElectionAsync(r).join();
Assert.assertTrue(pauseLeaderReply.isSuccess());
client.io().send(new RaftTestUtil.SimpleMessage("message"));
RaftServer.Division newLeader = followers.get(0);
List<RaftPeer> peers = cluster.getPeers();
List<RaftPeer> peersWithNewPriority = getPeersWithPriority(peers, newLeader.getPeer());
RaftClientReply reply = client.admin().setConfiguration(peersWithNewPriority.toArray(new RaftPeer[0]));
Assert.assertTrue(reply.isSuccess());
JavaUtils.attempt(() -> Assert.assertEquals(leaderId, leader.getId()),
20, HUNDRED_MILLIS, "check leader id", LOG);
final LeaderElectionRequest r1 = LeaderElectionRequest.newResume(
client.getId(), f1.getId(),cluster.getGroupId(), CallId.getAndIncrement(), 3000);
final RaftClientReply resumeLeaderReply = f1.getRaftServer().setLeaderElectionAsync(r1).join();
Assert.assertTrue(resumeLeaderReply.isSuccess());
JavaUtils.attempt(() -> Assert.assertEquals(f1.getId(), cluster.getLeader().getId()),
20, HUNDRED_MILLIS, "check new leader", LOG);
}
}
private static RaftServerImpl createMockServer(boolean alive) {
final DivisionInfo info = mock(DivisionInfo.class);
when(info.isAlive()).thenReturn(alive);
when(info.isCandidate()).thenReturn(false);
RaftServerImpl server = mock(RaftServerImpl.class);
when(server.getInfo()).thenReturn(info);
final RaftGroupMemberId memberId = RaftGroupMemberId.valueOf(RaftPeerId.valueOf("any"), RaftGroupId.randomId());
when(server.getMemberId()).thenReturn(memberId);
LeaderElectionMetrics leaderElectionMetrics = LeaderElectionMetrics.getLeaderElectionMetrics(memberId, () -> 0);
when(server.getLeaderElectionMetrics()).thenReturn(leaderElectionMetrics);
RaftServerProxy proxy = mock(RaftServerProxy.class);
RaftProperties properties = new RaftProperties();
RaftServerConfigKeys.LeaderElection.setPreVote(properties, true);
when(proxy.getProperties()).thenReturn(properties);
when(server.getRaftServer()).thenReturn(proxy);
return server;
}
}