blob: 3b8e206deb5a44f9aa70b6919c87bcc5fbae9c8f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.server.impl;
import org.apache.ratis.BaseTest;
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.RaftTestUtil.SimpleMessage;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.RaftClientRpc;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.SetConfigurationRequest;
import org.apache.ratis.protocol.exceptions.SetConfigurationException;
import org.apache.ratis.protocol.exceptions.LeaderNotReadyException;
import org.apache.ratis.protocol.exceptions.ReconfigurationInProgressException;
import org.apache.ratis.protocol.exceptions.ReconfigurationTimeoutException;
import org.apache.ratis.server.RaftConfiguration;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.MiniRaftCluster.PeerChanges;
import org.apache.ratis.server.raftlog.LogProtoUtils;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.raftlog.RaftLogBase;
import org.apache.ratis.server.storage.RaftStorageTestUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.LifeCycle;
import org.apache.ratis.util.Slf4jUtils;
import org.apache.ratis.util.TimeDuration;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.event.Level;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import static java.util.Arrays.asList;
import static org.apache.ratis.server.impl.RaftServerTestUtil.waitAndCheckNewConf;
import static org.junit.Assert.assertThrows;
public abstract class RaftReconfigurationBaseTest<CLUSTER extends MiniRaftCluster>
extends BaseTest
implements MiniRaftCluster.Factory.Get<CLUSTER> {
static {
Slf4jUtils.setLogLevel(RaftServer.Division.LOG, Level.DEBUG);
}
private static final DelayLocalExecutionInjection logSyncDelay = RaftServerTestUtil.getLogSyncDelay();
private static final DelayLocalExecutionInjection leaderPlaceHolderDelay =
new DelayLocalExecutionInjection(LeaderStateImpl.APPEND_PLACEHOLDER);
static final int STAGING_CATCHUP_GAP = 10;
{
RaftServerConfigKeys.setStagingCatchupGap(getProperties(), STAGING_CATCHUP_GAP);
}
private void checkPriority(CLUSTER cluster, RaftGroupId groupId, List<RaftPeer> peersWithPriority)
throws InterruptedException {
RaftTestUtil.waitForLeader(cluster, groupId);
for (int i = 0; i < peersWithPriority.size(); i ++) {
RaftPeerId peerId = peersWithPriority.get(i).getId();
final RaftServer.Division server = cluster.getDivision(peerId, groupId);
final RaftConfiguration conf = server.getRaftConf();
for (int j = 0; j < peersWithPriority.size(); j ++) {
int priorityInConf = conf.getPeer(peersWithPriority.get(j).getId()).getPriority();
Assert.assertEquals(priorityInConf, peersWithPriority.get(j).getPriority());
}
}
}
@Test
public void testRestorePriority() throws Exception {
runWithNewCluster(3, cluster -> {
// Add groups
List<RaftPeer> peers = cluster.getPeers();
List<RaftPeer> peersWithPriority = new ArrayList<>();
for (int i = 0; i < peers.size(); i++) {
RaftPeer peer = peers.get(i);
peersWithPriority.add(
RaftPeer.newBuilder(peer).setPriority(i).build());
}
final RaftGroup newGroup = RaftGroup.valueOf(RaftGroupId.randomId(), peersWithPriority);
LOG.info("add new group: " + newGroup);
try (final RaftClient client = cluster.createClient(newGroup)) {
for (RaftPeer p : newGroup.getPeers()) {
client.getGroupManagementApi(p.getId()).add(newGroup);
}
}
RaftGroupId groupId = newGroup.getGroupId();
checkPriority(cluster, groupId, peersWithPriority);
cluster.restart(false);
checkPriority(cluster, groupId, peersWithPriority);
});
}
/**
* add 2 new peers (3 peers -> 5 peers), no leader change
*/
@Test
public void testAddPeers() throws Exception {
runWithNewCluster(3, cluster -> {
RaftTestUtil.waitForLeader(cluster);
// add new peers
RaftPeer[] allPeers = cluster.addNewPeers(2, true).allPeersInNewConf;
// trigger setConfiguration
cluster.setConfiguration(allPeers);
// wait for the new configuration to take effect
waitAndCheckNewConf(cluster, allPeers, 0, null);
});
}
/**
* Test leader election when changing cluster from single mode to HA mode.
*/
@Test
public void testLeaderElectionWhenChangeFromSingleToHA() throws Exception {
runWithNewCluster(1, cluster -> {
RaftTestUtil.waitForLeader(cluster);
RaftGroupId groupId = cluster.getGroup().getGroupId();
RaftPeer curPeer = cluster.getGroup().getPeers().iterator().next();
RaftPeer newPeer = cluster.addNewPeers(1, true, true).newPeers[0];
RaftServerProxy leaderServer = cluster.getServer(curPeer.getId());
// Update leader conf to transitional single mode.
RaftConfigurationImpl oldNewConf = RaftConfigurationImpl.newBuilder()
.setOldConf(new PeerConfiguration(Arrays.asList(curPeer)))
.setConf(new PeerConfiguration(Arrays.asList(curPeer, newPeer)))
.setLogEntryIndex(Long.MAX_VALUE / 2)
.build();
Assert.assertTrue(oldNewConf.isSingleMode(curPeer.getId()));
RaftServerTestUtil.setRaftConf(leaderServer, groupId, oldNewConf);
try(RaftClient client = cluster.createClient()) {
client.admin().transferLeadership(null, leaderServer.getId(), 1000);
}
final RaftServer.Division newLeader = RaftTestUtil.waitForLeader(cluster);
Assert.assertEquals(leaderServer.getId(), newLeader.getId());
Assert.assertEquals(oldNewConf, newLeader.getRaftConf());
});
}
@Test
public void testChangeMajority() throws Exception {
runWithNewCluster(1, cluster -> {
RaftTestUtil.waitForLeader(cluster);
final RaftPeerId leaderId = cluster.getLeader().getId();
try (final RaftClient client = cluster.createClient(leaderId)) {
final PeerChanges c1 = cluster.addNewPeers(2, true);
SetConfigurationRequest.Arguments arguments = SetConfigurationRequest.Arguments.newBuilder()
.setServersInCurrentConf(cluster.getPeers())
.setServersInNewConf(c1.allPeersInNewConf)
.setMode(SetConfigurationRequest.Mode.COMPARE_AND_SET)
.build();
assertThrows("Expect change majority error.", SetConfigurationException.class,
() -> client.admin().setConfiguration(arguments));
}
});
}
/**
* remove 2 peers (5 peers -> 3 peers), no leader change
*/
@Test
public void testRemovePeers() throws Exception {
runWithNewCluster(5, cluster -> {
RaftTestUtil.waitForLeader(cluster);
// remove peers, leader still included in the new conf
RaftPeer[] allPeers = cluster
.removePeers(2, false, Collections.emptyList()).allPeersInNewConf;
// trigger setConfiguration
cluster.setConfiguration(allPeers);
// wait for the new configuration to take effect
waitAndCheckNewConf(cluster, allPeers, 2, 2, null);
});
}
/**
* 5 peers -> 5 peers, remove 2 old, add 2 new, no leader change
*/
@Test
public void testAddRemovePeers() throws Exception {
runWithNewCluster(5, cluster -> runTestAddRemovePeers(false, cluster));
}
@Test
public void testLeaderStepDown() throws Exception {
runWithNewCluster(5, cluster -> runTestAddRemovePeers(true, cluster));
}
private void runTestAddRemovePeers(boolean leaderStepdown, CLUSTER cluster) throws Exception {
RaftTestUtil.waitForLeader(cluster);
PeerChanges change = cluster.addNewPeers(2, true);
RaftPeer[] allPeers = cluster.removePeers(2, leaderStepdown,
asList(change.newPeers)).allPeersInNewConf;
// trigger setConfiguration
cluster.setConfiguration(allPeers);
// wait for the new configuration to take effect
waitAndCheckNewConf(cluster, allPeers, 2, null);
}
@Test
public void testSetConfigurationInAddMode() throws Exception {
runWithNewCluster(2, this::runTestSetConfigurationInAddMode);
}
private void runTestSetConfigurationInAddMode(CLUSTER cluster) throws Exception {
final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
PeerChanges change = cluster.addNewPeers(1, true);
List<RaftPeer> peers = Arrays.asList(change.newPeers);
try (final RaftClient client = cluster.createClient(leader.getId())) {
for (int i = 0; i < 10; i++) {
RaftClientReply reply = client.io().send(new SimpleMessage("m" + i));
Assert.assertTrue(reply.isSuccess());
}
RaftClientReply reply = client.admin().setConfiguration(
SetConfigurationRequest.Arguments.newBuilder()
.setServersInNewConf(peers)
.setMode(SetConfigurationRequest.Mode.ADD).build());
Assert.assertTrue(reply.isSuccess());
waitAndCheckNewConf(cluster, change.allPeersInNewConf, 0, null);
}
cluster.close();
}
@Test
public void testSetConfigurationInCasMode() throws Exception {
runWithNewCluster(2, this::runTestSetConfigurationInCasMode);
}
private void runTestSetConfigurationInCasMode(CLUSTER cluster) throws Exception {
final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
List<RaftPeer> oldPeers = cluster.getPeers();
PeerChanges change = cluster.addNewPeers(1, true);
List<RaftPeer> peers = Arrays.asList(change.allPeersInNewConf);
try (final RaftClient client = cluster.createClient(leader.getId())) {
for (int i = 0; i < 10; i++) {
RaftClientReply reply = client.io().send(new SimpleMessage("m" + i));
Assert.assertTrue(reply.isSuccess());
}
testFailureCase("Can't set configuration in CAS mode ",
() -> client.admin().setConfiguration(SetConfigurationRequest.Arguments.newBuilder()
.setServersInNewConf(peers)
.setServersInCurrentConf(peers)
.setMode(SetConfigurationRequest.Mode.COMPARE_AND_SET)
.build()), SetConfigurationException.class);
Collections.shuffle(oldPeers);
RaftClientReply reply = client.admin().setConfiguration(
SetConfigurationRequest.Arguments.newBuilder()
.setServersInNewConf(peers)
.setServersInCurrentConf(oldPeers)
.setMode(SetConfigurationRequest.Mode.COMPARE_AND_SET)
.build());
Assert.assertTrue(reply.isSuccess());
waitAndCheckNewConf(cluster, change.allPeersInNewConf, 0, null);
}
cluster.close();
}
@Test(timeout = 30000)
public void testReconfTwice() throws Exception {
runWithNewCluster(3, this::runTestReconfTwice);
}
void runTestReconfTwice(CLUSTER cluster) throws Exception {
final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId();
try (final RaftClient client = cluster.createClient(leaderId)) {
// submit some msgs before reconf
for (int i = 0; i < STAGING_CATCHUP_GAP * 2; i++) {
RaftClientReply reply = client.io().send(new SimpleMessage("m" + i));
Assert.assertTrue(reply.isSuccess());
}
final AtomicBoolean reconf1 = new AtomicBoolean(false);
final AtomicBoolean reconf2 = new AtomicBoolean(false);
final AtomicReference<RaftPeer[]> finalPeers = new AtomicReference<>(null);
final AtomicReference<RaftPeer[]> deadPeers = new AtomicReference<>(null);
CountDownLatch latch = new CountDownLatch(1);
Thread clientThread = new Thread(() -> {
try {
PeerChanges c1 = cluster.addNewPeers(2, true, true);
LOG.info("Start changing the configuration: {}",
asList(c1.allPeersInNewConf));
RaftClientReply reply = client.admin().setConfiguration(c1.allPeersInNewConf);
reconf1.set(reply.isSuccess());
PeerChanges c2 = cluster.removePeers(2, true, asList(c1.newPeers));
finalPeers.set(c2.allPeersInNewConf);
deadPeers.set(c2.removedPeers);
LOG.info("Start changing the configuration again: {}",
asList(c2.allPeersInNewConf));
reply = client.admin().setConfiguration(c2.allPeersInNewConf);
reconf2.set(reply.isSuccess());
latch.countDown();
} catch (Exception ignored) {
LOG.warn("{} is ignored", JavaUtils.getClassSimpleName(ignored.getClass()), ignored);
}
});
clientThread.start();
latch.await();
Assert.assertTrue(reconf1.get());
Assert.assertTrue(reconf2.get());
waitAndCheckNewConf(cluster, finalPeers.get(), 2, null);
final RaftPeerId leader2 = RaftTestUtil.waitForLeader(cluster).getId();
// check configuration manager's internal state
// each reconf will generate two configurations: (old, new) and (new)
cluster.getServerAliveStream().forEach(server -> {
final ConfigurationManager confManager = RaftServerTestUtil.getConfigurationManager(server);
// each reconf will generate two configurations: (old, new) and (new)
// each leader change generates one configuration.
// expectedConf = 1 (init) + 2*2 (two conf changes) + #leader
final int expectedConf = leader2.equals(leaderId) ? 6 : 7;
Assert.assertEquals(server.getId() + ": " + confManager, expectedConf, confManager.numOfConf());
});
}
}
@Test
public void testReconfTimeout() throws Exception {
// originally 3 peers
runWithNewCluster(3, this::runTestReconfTimeout);
}
void runTestReconfTimeout(CLUSTER cluster) throws Exception {
final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId();
try (final RaftClient client = cluster.createClient(leaderId)) {
PeerChanges c1 = cluster.addNewPeers(2, false);
LOG.info("Start changing the configuration: {}",
asList(c1.allPeersInNewConf));
Assert.assertFalse(((RaftConfigurationImpl)cluster.getLeader().getRaftConf()).isTransitional());
final RaftClientRpc sender = client.getClientRpc();
final SetConfigurationRequest request = cluster.newSetConfigurationRequest(
client.getId(), leaderId, c1.allPeersInNewConf);
try {
RaftClientReply reply = sender.sendRequest(request);
Assert.fail("did not get expected exception " + reply.toString());
} catch (IOException e) {
Assert.assertTrue("Got exception " + e,
e instanceof ReconfigurationTimeoutException);
}
// the two new peers have not started yet, the bootstrapping must timeout
LOG.info(cluster.printServers());
// resend the same request, make sure the server has correctly reset its
// state so that we still get timeout instead of in-progress exception
try {
sender.sendRequest(request);
Assert.fail("did not get expected exception");
} catch (IOException e) {
Assert.assertTrue("Got exception " + e,
e instanceof ReconfigurationTimeoutException);
}
// start the two new peers
LOG.info("Start new peers");
for (RaftPeer np : c1.newPeers) {
cluster.restartServer(np.getId(), false);
}
Assert.assertTrue(client.admin().setConfiguration(c1.allPeersInNewConf).isSuccess());
}
}
@Test
public void testBootstrapReconfWithSingleNodeAddOne() throws Exception {
// originally 1 peer, add 1 more
runWithNewCluster(1, cluster -> runTestBootstrapReconf(1, true, cluster));
}
@Test
public void testBootstrapReconfWithSingleNodeAddTwo() throws Exception {
// originally 1 peer, add 2 more
runWithNewCluster(1, cluster -> {
RaftTestUtil.waitForLeader(cluster);
final RaftPeerId leaderId = cluster.getLeader().getId();
try (final RaftClient client = cluster.createClient(leaderId)) {
final PeerChanges c1 = cluster.addNewPeers(2, true);
assertThrows("Expect change majority error.", SetConfigurationException.class,
() -> client.admin().setConfiguration(c1.allPeersInNewConf));
}
});
}
@Test
public void testBootstrapReconf() throws Exception {
// originally 3 peers, add 2 more
runWithNewCluster(3, cluster -> runTestBootstrapReconf(2, true, cluster));
}
void runTestBootstrapReconf(int numNewPeer, boolean startNewPeer, CLUSTER cluster) throws Exception {
LOG.info("Originally {} peer(s), add {} more, startNewPeer={}",
cluster.getNumServers(), numNewPeer, startNewPeer);
RaftTestUtil.waitForLeader(cluster);
final RaftPeerId leaderId = cluster.getLeader().getId();
try (final RaftClient client = cluster.createClient(leaderId)) {
// submit some msgs before reconf
for (int i = 0; i < STAGING_CATCHUP_GAP * 2; i++) {
RaftClientReply reply = client.io().send(new SimpleMessage("m" + i));
Assert.assertTrue(reply.isSuccess());
}
final PeerChanges c1 = cluster.addNewPeers(numNewPeer, startNewPeer);
LOG.info("Start changing the configuration: {}",
asList(c1.allPeersInNewConf));
final AtomicReference<Boolean> success = new AtomicReference<>();
Thread clientThread = new Thread(() -> {
try {
RaftClientReply reply = client.admin().setConfiguration(c1.allPeersInNewConf);
success.set(reply.isSuccess());
} catch (IOException ioe) {
LOG.error("FAILED", ioe);
}
});
clientThread.start();
if (startNewPeer) {
// Make sure that set configuration is run inside the thread
RaftTestUtil.waitFor(() -> clientThread.isAlive(), 300, 5000);
ONE_SECOND.sleep();
LOG.info("start new peer(s): {}", c1.newPeers);
for(RaftPeer p : c1.newPeers) {
cluster.restartServer(p.getId(), false);
}
}
RaftTestUtil.waitFor(() -> success.get() != null && success.get(), 300, 15000);
LOG.info(cluster.printServers());
RaftTestUtil.waitFor(() -> cluster.getLeader() != null, 300, 5000);
final RaftLog leaderLog = cluster.getLeader().getRaftLog();
for (RaftPeer newPeer : c1.newPeers) {
final RaftServer.Division d = cluster.getDivision(newPeer.getId());
RaftTestUtil.waitFor(() -> leaderLog.getEntries(0, Long.MAX_VALUE).length
== d.getRaftLog().getEntries(0, Long.MAX_VALUE).length, 300, 15000);
Assert.assertArrayEquals(leaderLog.getEntries(0, Long.MAX_VALUE),
d.getRaftLog().getEntries(0, Long.MAX_VALUE));
}
}
}
/**
* kill the leader before reconfiguration finishes. Make sure the client keeps
* retrying.
*/
@Test
public void testKillLeaderDuringReconf() throws Exception {
// originally 3 peers
runWithNewCluster(3, this::runTestKillLeaderDuringReconf);
}
void runTestKillLeaderDuringReconf(CLUSTER cluster) throws Exception {
final AtomicBoolean clientRunning = new AtomicBoolean(true);
Thread clientThread = null;
try {
final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId();
PeerChanges c1 = cluster.addNewPeers(1, false);
PeerChanges c2 = cluster.removePeers(1, false, asList(c1.newPeers));
LOG.info("Start setConf: {}", asList(c2.allPeersInNewConf));
LOG.info(cluster.printServers());
final CompletableFuture<Void> setConf = new CompletableFuture<>();
clientThread = new Thread(() -> {
try(final RaftClient client = cluster.createClient(leaderId)) {
for(int i = 0; clientRunning.get() && !setConf.isDone(); i++) {
final RaftClientReply reply = client.admin().setConfiguration(c2.allPeersInNewConf);
if (reply.isSuccess()) {
setConf.complete(null);
return;
}
LOG.info("setConf attempt #{} failed, {}", i, cluster.printServers());
}
} catch(Exception e) {
LOG.error("Failed to setConf", e);
setConf.completeExceptionally(e);
}
});
clientThread.start();
// the leader cannot generate the (old, new) conf, and it will keep
// bootstrapping the 1 new peer since it has not started yet.
Assert.assertFalse(((RaftConfigurationImpl)cluster.getLeader().getRaftConf()).isTransitional());
// (0) the first conf entry, (1) the 1st setConf entry, (2) a metadata entry
// (3) new current conf entry (4) a metadata entry
{
final RaftLog leaderLog = cluster.getLeader().getRaftLog();
for(LogEntryProto e : RaftTestUtil.getLogEntryProtos(leaderLog)) {
LOG.info("{}", LogProtoUtils.toLogEntryString(e));
}
final long commitIndex = leaderLog.getLastCommittedIndex();
Assert.assertTrue("commitIndex = " + commitIndex + " > 2", commitIndex <= 2);
}
final RaftPeerId killed = RaftTestUtil.waitAndKillLeader(cluster);
Assert.assertEquals(leaderId, killed);
final RaftPeerId newLeaderId = RaftTestUtil.waitForLeader(cluster).getId();
LOG.info("newLeaderId: {}", newLeaderId);
TimeDuration.valueOf(1500, TimeUnit.MILLISECONDS).sleep();
LOG.info("start new peers: {}", Arrays.asList(c1.newPeers));
for (RaftPeer np : c1.newPeers) {
cluster.restartServer(np.getId(), false);
}
try {
setConf.get(10, TimeUnit.SECONDS);
} catch(TimeoutException ignored) {
}
RaftServerProxy newServer = cluster.getServer(c1.newPeers[0].getId());
if (newServer.getLifeCycleState() == LifeCycle.State.CLOSED) {
LOG.info("New peer {} is shutdown. Skip the check", c1.newPeers[0].getId());
} else {
// the client fails with the first leader, and then retry the same setConfiguration request
waitAndCheckNewConf(cluster, c2.allPeersInNewConf, 1, Collections.singletonList(leaderId));
setConf.get(2, TimeUnit.SECONDS);
}
} finally {
if (clientThread != null) {
clientRunning.set(false);
clientThread.interrupt();
}
}
}
/**
* When a request's new configuration is the same with the current one, make
* sure we return success immediately and no log entry is recorded.
*/
@Test
public void testNoChangeRequest() throws Exception {
// originally 3 peers
runWithNewCluster(3, this::runTestNoChangeRequest);
}
void runTestNoChangeRequest(CLUSTER cluster) throws Exception {
final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
try(final RaftClient client = cluster.createClient(leader.getId())) {
client.io().send(new SimpleMessage("m"));
final RaftLog leaderLog = leader.getRaftLog();
final long committedIndex = leaderLog.getLastCommittedIndex();
final RaftConfiguration confBefore = cluster.getLeader().getRaftConf();
// no real configuration change in the request
final RaftClientReply reply = client.admin().setConfiguration(cluster.getPeers().toArray(RaftPeer.emptyArray()));
Assert.assertTrue(reply.isSuccess());
final long newCommittedIndex = leaderLog.getLastCommittedIndex();
for(long i = committedIndex + 1; i <= newCommittedIndex; i++) {
final LogEntryProto e = leaderLog.get(i);
Assert.assertTrue(e.hasMetadataEntry());
}
Assert.assertSame(confBefore, cluster.getLeader().getRaftConf());
}
}
/**
* Make sure a setConfiguration request is rejected if a configuration change
* is still in progress (i.e., has not been committed yet).
*/
@Test
public void testOverlappedSetConfRequests() throws Exception {
// Make sure leadership check won't affect the test.
// logSyncDelay.setDelayMs() blocks RaftServerImpl::appendEntriesAsync(),
// which will disable heartbeats.
final TimeDuration oldTimeoutMin = RaftServerConfigKeys.Rpc.timeoutMin(getProperties());
RaftServerConfigKeys.Rpc.setTimeoutMin(getProperties(), TimeDuration.valueOf(2, TimeUnit.SECONDS));
final TimeDuration oldTimeoutMax = RaftServerConfigKeys.Rpc.timeoutMax(getProperties());
RaftServerConfigKeys.Rpc.setTimeoutMax(getProperties(), TimeDuration.valueOf(4, TimeUnit.SECONDS));
// originally 3 peers
runWithNewCluster(3, this::runTestOverlappedSetConfRequests);
// reset for the other tests
RaftServerConfigKeys.Rpc.setTimeoutMin(getProperties(), oldTimeoutMin);
RaftServerConfigKeys.Rpc.setTimeoutMax(getProperties(), oldTimeoutMax);
}
void runTestOverlappedSetConfRequests(CLUSTER cluster) throws Exception {
try {
RaftTestUtil.waitForLeader(cluster);
final RaftPeerId leaderId = cluster.getLeader().getId();
RaftPeer[] newPeers = cluster.addNewPeers(2, true).allPeersInNewConf;
// delay every peer's logSync so that the setConf request is delayed
cluster.getPeers()
.forEach(peer -> logSyncDelay.setDelayMs(peer.getId().toString(), 1000));
final CountDownLatch latch = new CountDownLatch(1);
final RaftPeer[] peersInRequest2 = cluster.getPeers().toArray(new RaftPeer[0]);
AtomicBoolean caughtException = new AtomicBoolean(false);
new Thread(() -> {
try(final RaftClient client2 = cluster.createClient(leaderId)) {
latch.await();
LOG.info("client2 starts to change conf");
final RaftClientRpc sender2 = client2.getClientRpc();
sender2.sendRequest(cluster.newSetConfigurationRequest(
client2.getId(), leaderId, peersInRequest2));
} catch (ReconfigurationInProgressException e) {
caughtException.set(true);
} catch (Exception e) {
LOG.warn("Got unexpected exception when client2 changes conf", e);
}
}).start();
AtomicBoolean confChanged = new AtomicBoolean(false);
new Thread(() -> {
try(final RaftClient client1 = cluster.createClient(leaderId)) {
LOG.info("client1 starts to change conf");
confChanged.set(client1.admin().setConfiguration(newPeers).isSuccess());
} catch (IOException e) {
LOG.warn("Got unexpected exception when client1 changes conf", e);
}
}).start();
Thread.sleep(100);
latch.countDown();
for (int i = 0; i < 10 && !confChanged.get(); i++) {
Thread.sleep(1000);
}
Assert.assertTrue(confChanged.get());
Assert.assertTrue(caughtException.get());
} finally {
logSyncDelay.clear();
}
}
/**
* Test a scenario where the follower truncates its log entries which causes
* configuration change.
*/
@Test
public void testRevertConfigurationChange() throws Exception {
runWithNewCluster(5, this::runTestRevertConfigurationChange);
}
void runTestRevertConfigurationChange(CLUSTER cluster) throws Exception {
RaftLogBase log2 = null;
try {
RaftTestUtil.waitForLeader(cluster);
final RaftServer.Division leader = cluster.getLeader();
final RaftPeerId leaderId = leader.getId();
final RaftLog log = leader.getRaftLog();
log2 = (RaftLogBase) log;
Thread.sleep(1000);
// we block the incoming msg for the leader and block its requests to
// followers, so that we force the leader change and the old leader will
// not know
LOG.info("start blocking the leader");
BlockRequestHandlingInjection.getInstance().blockReplier(leaderId.toString());
cluster.setBlockRequestsFrom(leaderId.toString(), true);
PeerChanges change = cluster.removePeers(1, false, new ArrayList<>());
AtomicBoolean gotNotLeader = new AtomicBoolean(false);
final Thread clientThread = new Thread(() -> {
try(final RaftClient client = cluster.createClient(leaderId)) {
LOG.info("client starts to change conf");
final RaftClientRpc sender = client.getClientRpc();
RaftClientReply reply = sender.sendRequest(cluster.newSetConfigurationRequest(
client.getId(), leaderId, change.allPeersInNewConf));
if (reply.getNotLeaderException() != null) {
gotNotLeader.set(true);
}
} catch (IOException e) {
LOG.warn("Got unexpected exception when client1 changes conf", e);
}
});
clientThread.start();
// find ConfigurationEntry
final TimeDuration sleepTime = TimeDuration.valueOf(500, TimeUnit.MILLISECONDS);
final long confIndex = JavaUtils.attemptRepeatedly(() -> {
final long last = log.getLastEntryTermIndex().getIndex();
for (long i = last; i >= 1; i--) {
if (log.get(i).hasConfigurationEntry()) {
return i;
}
}
throw new Exception("ConfigurationEntry not found: last=" + last);
}, 10, sleepTime, "confIndex", LOG);
// wait till the old leader persist the new conf
JavaUtils.attemptRepeatedly(() -> {
Assert.assertTrue(log.getFlushIndex() >= confIndex);
return null;
}, 10, sleepTime, "FLUSH", LOG);
final long committed = log.getLastCommittedIndex();
Assert.assertTrue(committed < confIndex);
// unblock the old leader
BlockRequestHandlingInjection.getInstance().unblockReplier(leaderId.toString());
cluster.setBlockRequestsFrom(leaderId.toString(), false);
// the client should get NotLeaderException
clientThread.join(5000);
Assert.assertTrue(gotNotLeader.get());
// the old leader should have truncated the setConf from the log
JavaUtils.attemptRepeatedly(() -> {
Assert.assertTrue(log.getLastCommittedIndex() >= confIndex);
return null;
}, 10, ONE_SECOND, "COMMIT", LOG);
Assert.assertTrue(log.get(confIndex).hasConfigurationEntry());
log2 = null;
} finally {
RaftStorageTestUtils.printLog(log2, s -> LOG.info(s));
}
}
/**
* Delay the commit of the leader placeholder log entry and see if the client
* can correctly receive and handle the LeaderNotReadyException.
*/
@Test
public void testLeaderNotReadyException() throws Exception {
LOG.info("Start testLeaderNotReadyException");
final MiniRaftCluster cluster = newCluster(1).initServers();
try {
// delay 1s for each logSync call
cluster.getServers().forEach(
peer -> leaderPlaceHolderDelay.setDelayMs(peer.getId().toString(), 2000));
cluster.start();
AtomicBoolean caughtNotReady = new AtomicBoolean(false);
AtomicBoolean success = new AtomicBoolean(false);
final RaftPeerId leaderId = cluster.getPeers().iterator().next().getId();
new Thread(() -> {
try (final RaftClient client = cluster.createClient(leaderId)) {
final RaftClientRpc sender = client.getClientRpc();
final RaftClientRequest request = cluster.newRaftClientRequest(
client.getId(), leaderId, new SimpleMessage("test"));
while (!success.get()) {
try {
final RaftClientReply reply = sender.sendRequest(request);
success.set(reply.isSuccess());
if (reply.getException() != null && reply.getException() instanceof LeaderNotReadyException) {
caughtNotReady.set(true);
}
} catch (IOException e) {
LOG.info("Hit other IOException", e);
}
if (!success.get()) {
try {
Thread.sleep(200);
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
}
}
} catch (Exception ignored) {
LOG.warn("{} is ignored", JavaUtils.getClassSimpleName(ignored.getClass()), ignored);
}
}).start();
RaftTestUtil.waitForLeader(cluster);
for (int i = 0; !success.get() && i < 5; i++) {
Thread.sleep(1000);
}
Assert.assertTrue(success.get());
Assert.assertTrue(caughtNotReady.get());
} finally {
leaderPlaceHolderDelay.clear();
cluster.shutdown();
}
}
}