blob: 26552ae804e4f010773735da7b8fc9e53af7f323 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.raft.jraft.core;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Collections.synchronizedList;
import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.util.IgniteUtils.byteBufferToByteArray;
import static org.apache.ignite.raft.jraft.core.TestCluster.ELECTION_TIMEOUT_MILLIS;
import static org.apache.ignite.raft.jraft.test.TestUtils.sender;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import com.codahale.metrics.ConsoleReporter;
import java.io.File;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiPredicate;
import java.util.function.BooleanSupplier;
import java.util.stream.IntStream;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.StaticNodeFinder;
import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils;
import org.apache.ignite.internal.raft.JraftGroupEventsListener;
import org.apache.ignite.internal.raft.storage.impl.DefaultLogStorageFactory;
import org.apache.ignite.internal.raft.storage.impl.IgniteJraftServiceFactory;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.raft.jraft.Iterator;
import org.apache.ignite.raft.jraft.JRaftUtils;
import org.apache.ignite.raft.jraft.Node;
import org.apache.ignite.raft.jraft.NodeManager;
import org.apache.ignite.raft.jraft.RaftGroupService;
import org.apache.ignite.raft.jraft.StateMachine;
import org.apache.ignite.raft.jraft.Status;
import org.apache.ignite.raft.jraft.closure.JoinableClosure;
import org.apache.ignite.raft.jraft.closure.ReadIndexClosure;
import org.apache.ignite.raft.jraft.closure.SynchronizedClosure;
import org.apache.ignite.raft.jraft.closure.TaskClosure;
import org.apache.ignite.raft.jraft.conf.Configuration;
import org.apache.ignite.raft.jraft.entity.EnumOutter;
import org.apache.ignite.raft.jraft.entity.PeerId;
import org.apache.ignite.raft.jraft.entity.Task;
import org.apache.ignite.raft.jraft.entity.UserLog;
import org.apache.ignite.raft.jraft.error.LogIndexOutOfBoundsException;
import org.apache.ignite.raft.jraft.error.LogNotFoundException;
import org.apache.ignite.raft.jraft.error.RaftError;
import org.apache.ignite.raft.jraft.error.RaftException;
import org.apache.ignite.raft.jraft.option.BootstrapOptions;
import org.apache.ignite.raft.jraft.option.NodeOptions;
import org.apache.ignite.raft.jraft.option.RaftOptions;
import org.apache.ignite.raft.jraft.option.ReadOnlyOption;
import org.apache.ignite.raft.jraft.rpc.AppendEntriesRequestImpl;
import org.apache.ignite.raft.jraft.rpc.AppendEntriesResponseImpl;
import org.apache.ignite.raft.jraft.rpc.RpcClientEx;
import org.apache.ignite.raft.jraft.rpc.RpcRequests;
import org.apache.ignite.raft.jraft.rpc.RpcServer;
import org.apache.ignite.raft.jraft.rpc.TestIgniteRpcServer;
import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcClient;
import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcServer;
import org.apache.ignite.raft.jraft.rpc.impl.core.DefaultRaftClientService;
import org.apache.ignite.raft.jraft.storage.SnapshotThrottle;
import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotReader;
import org.apache.ignite.raft.jraft.storage.snapshot.ThroughputSnapshotThrottle;
import org.apache.ignite.raft.jraft.test.TestPeer;
import org.apache.ignite.raft.jraft.test.TestUtils;
import org.apache.ignite.raft.jraft.util.Bits;
import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper;
import org.apache.ignite.raft.jraft.util.ExponentialBackoffTimeoutStrategy;
import org.apache.ignite.raft.jraft.util.Utils;
import org.apache.ignite.raft.jraft.util.concurrent.FixedThreadsExecutorGroup;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtendWith;
/**
* Integration tests for raft cluster. TODO asch get rid of sleeps wherether possible IGNITE-14832
*/
@ExtendWith(WorkDirectoryExtension.class)
public class ItNodeTest extends BaseIgniteAbstractTest {
private static final IgniteLogger log = Loggers.forClass(ItNodeTest.class);
private static DumpThread dumpThread;
private static class DumpThread extends Thread {
private static long DUMP_TIMEOUT_MS = 5 * 60 * 1000;
private volatile boolean stopped = false;
/** {@inheritDoc} */
@SuppressWarnings("BusyWait") @Override
public void run() {
while (!stopped) {
try {
Thread.sleep(DUMP_TIMEOUT_MS);
log.info("Test hang too long, dump threads");
TestUtils.dumpThreads();
}
catch (InterruptedException e) {
// reset request, continue
continue;
}
}
}
}
private String dataPath;
private final AtomicInteger startedCounter = new AtomicInteger(0);
private final AtomicInteger stoppedCounter = new AtomicInteger(0);
private long testStartMs;
private TestCluster cluster;
private final List<RaftGroupService> services = new ArrayList<>();
private final List<ExecutorService> executors = new ArrayList<>();
private final List<FixedThreadsExecutorGroup> appendEntriesExecutors = new ArrayList<>();
/** Test info. */
private TestInfo testInfo;
@BeforeAll
public static void setupNodeTest() {
dumpThread = new DumpThread();
dumpThread.setName("NodeTest-DumpThread");
dumpThread.setDaemon(true);
dumpThread.start();
}
@AfterAll
public static void tearNodeTest() throws Exception {
dumpThread.stopped = true;
dumpThread.interrupt();
dumpThread.join(100);
}
@BeforeEach
public void setup(TestInfo testInfo, @WorkDirectory Path workDir) throws Exception {
log.info(">>>>>>>>>>>>>>> Start test method: " + testInfo.getDisplayName());
this.testInfo = testInfo;
dataPath = workDir.toString();
testStartMs = Utils.monotonicMs();
dumpThread.interrupt(); // reset dump timeout
}
@AfterEach
public void teardown() throws Exception {
services.forEach(service -> {
try {
service.shutdown();
}
catch (Exception e) {
log.error("Error while closing a service", e);
}
});
executors.forEach(ExecutorServiceHelper::shutdownAndAwaitTermination);
appendEntriesExecutors.forEach(FixedThreadsExecutorGroup::shutdownGracefully);
if (cluster != null)
cluster.stopAll();
startedCounter.set(0);
stoppedCounter.set(0);
TestUtils.assertAllJraftThreadsStopped();
log.info(">>>>>>>>>>>>>>> End test method: " + testInfo.getDisplayName() + ", cost:"
+ (Utils.monotonicMs() - testStartMs) + " ms.");
}
@Test
public void testInitShutdown() {
TestPeer peer = new TestPeer(testInfo, TestUtils.INIT_PORT);
NodeOptions nodeOptions = createNodeOptions(0);
nodeOptions.setFsm(new MockStateMachine(peer.getPeerId()));
nodeOptions.setRaftMetaUri(dataPath + File.separator + "meta");
nodeOptions.setSnapshotUri(dataPath + File.separator + "snapshot");
RaftGroupService service = createService("unittest", peer, nodeOptions, List.of());
service.start();
}
@Test
public void testNodeTaskOverload() throws Exception {
TestPeer peer = new TestPeer(testInfo, TestUtils.INIT_PORT);
NodeOptions nodeOptions = createNodeOptions(0);
RaftOptions raftOptions = new RaftOptions();
raftOptions.setDisruptorBufferSize(2);
nodeOptions.setRaftOptions(raftOptions);
MockStateMachine fsm = new MockStateMachine(peer.getPeerId());
nodeOptions.setFsm(fsm);
nodeOptions.setRaftMetaUri(dataPath + File.separator + "meta");
nodeOptions.setSnapshotUri(dataPath + File.separator + "snapshot");
nodeOptions.setInitialConf(new Configuration(Collections.singletonList(peer.getPeerId())));
RaftGroupService service = createService("unittest", peer, nodeOptions, List.of());
Node node = service.start();
assertEquals(1, node.listPeers().size());
assertTrue(node.listPeers().contains(peer.getPeerId()));
while (!node.isLeader())
;
List<Task> tasks = new ArrayList<>();
AtomicInteger c = new AtomicInteger(0);
for (int i = 0; i < 10; i++) {
ByteBuffer data = ByteBuffer.wrap(("hello" + i).getBytes(UTF_8));
int finalI = i;
Task task = new Task(data, new JoinableClosure(status -> {
log.info("{} i={}", status, finalI);
if (!status.isOk()) {
assertTrue(
status.getRaftError() == RaftError.EBUSY || status.getRaftError() == RaftError.EPERM);
}
c.incrementAndGet();
}));
node.apply(task);
tasks.add(task);
}
Task.joinAll(tasks, TimeUnit.SECONDS.toMillis(30));
assertEquals(10, c.get());
}
/**
* Test rollback stateMachine with readIndex for issue 317: https://github.com/sofastack/sofa-jraft/issues/317
*/
@Test
public void testRollbackStateMachineWithReadIndex_Issue317() throws Exception {
TestPeer peer = new TestPeer(testInfo, TestUtils.INIT_PORT);
NodeOptions nodeOptions = createNodeOptions(0);
CountDownLatch applyCompleteLatch = new CountDownLatch(1);
CountDownLatch applyLatch = new CountDownLatch(1);
CountDownLatch readIndexLatch = new CountDownLatch(1);
AtomicInteger currentValue = new AtomicInteger(-1);
String errorMsg = testInfo.getDisplayName();
StateMachine fsm = new StateMachineAdapter() {
@Override
public void onApply(Iterator iter) {
// Notify that the #onApply is preparing to go.
readIndexLatch.countDown();
// Wait for submitting a read-index request
try {
applyLatch.await();
}
catch (InterruptedException e) {
fail();
}
int i = 0;
while (iter.hasNext()) {
byte[] data = iter.next().array();
int v = Bits.getInt(data, 0);
assertEquals(i++, v);
currentValue.set(v);
}
if (i > 0) {
// rollback
currentValue.set(i - 1);
iter.setErrorAndRollback(1, new Status(-1, errorMsg));
applyCompleteLatch.countDown();
}
}
};
nodeOptions.setFsm(fsm);
nodeOptions.setRaftMetaUri(dataPath + File.separator + "meta");
nodeOptions.setSnapshotUri(dataPath + File.separator + "snapshot");
nodeOptions.setInitialConf(new Configuration(Collections.singletonList(peer.getPeerId())));
RaftGroupService service = createService("unittest", peer, nodeOptions, List.of());
Node node = service.start();
assertEquals(1, node.listPeers().size());
assertTrue(node.listPeers().contains(peer.getPeerId()));
while (!node.isLeader())
;
int n = 5;
{
// apply tasks
for (int i = 0; i < n; i++) {
byte[] b = new byte[4];
Bits.putInt(b, 0, i);
node.apply(new Task(ByteBuffer.wrap(b), null));
}
}
AtomicInteger readIndexSuccesses = new AtomicInteger(0);
{
// Submit a read-index, wait for #onApply
readIndexLatch.await();
CountDownLatch latch = new CountDownLatch(1);
node.readIndex(null, new ReadIndexClosure() {
@Override
public void run(Status status, long index, byte[] reqCtx) {
try {
if (status.isOk())
readIndexSuccesses.incrementAndGet();
else {
assertTrue(
status.getErrorMsg().contains(errorMsg) || status.getRaftError() == RaftError.ETIMEDOUT
|| status.getErrorMsg().contains("Invalid state for readIndex: STATE_ERROR"),
"Unexpected status: " + status);
}
}
finally {
latch.countDown();
}
}
});
// We have already submit a read-index request,
// notify #onApply can go right now
applyLatch.countDown();
// The state machine is in error state, the node should step down.
waitForCondition(() -> !node.isLeader(), 5_000);
latch.await();
applyCompleteLatch.await();
}
// No read-index request succeed.
assertEquals(0, readIndexSuccesses.get());
assertTrue(n - 1 >= currentValue.get());
}
@Test
public void testSingleNode() throws Exception {
TestPeer peer = new TestPeer(testInfo, TestUtils.INIT_PORT);
NodeOptions nodeOptions = createNodeOptions(0);
MockStateMachine fsm = new MockStateMachine(peer.getPeerId());
nodeOptions.setFsm(fsm);
nodeOptions.setRaftMetaUri(dataPath + File.separator + "meta");
nodeOptions.setSnapshotUri(dataPath + File.separator + "snapshot");
nodeOptions.setInitialConf(new Configuration(Collections.singletonList(peer.getPeerId())));
RaftGroupService service = createService("unittest", peer, nodeOptions, List.of());
Node node = service.start();
assertEquals(1, node.listPeers().size());
assertTrue(node.listPeers().contains(peer.getPeerId()));
while (!node.isLeader())
;
sendTestTaskAndWait(node);
assertEquals(10, fsm.getLogs().size());
int i = 0;
for (ByteBuffer data : fsm.getLogs()) {
assertEquals("hello" + i++, stringFromBytes(data.array()));
}
}
private String stringFromBytes(byte[] bytes) {
return new String(bytes, UTF_8);
}
@Test
public void testNoLeader() throws Exception {
List<TestPeer> peers = TestUtils.generatePeers(testInfo, 3);
cluster = new TestCluster("unittest", dataPath, peers, testInfo);
assertTrue(cluster.start(peers.get(0)));
List<Node> followers = cluster.getFollowers();
assertEquals(1, followers.size());
Node follower = followers.get(0);
sendTestTaskAndWait(follower, 0, RaftError.EPERM);
// adds a peer3
PeerId peer3 = new PeerId(UUID.randomUUID().toString());
CountDownLatch latch = new CountDownLatch(1);
follower.addPeer(peer3, new ExpectClosure(RaftError.EPERM, latch));
waitLatch(latch);
// remove the peer0
PeerId peer0 = peers.get(0).getPeerId();
latch = new CountDownLatch(1);
follower.removePeer(peer0, new ExpectClosure(RaftError.EPERM, latch));
waitLatch(latch);
}
@Test
public void testTripleNodesWithReplicatorStateListener() throws Exception {
List<TestPeer> peers = TestUtils.generatePeers(testInfo, 3);
//final TestCluster cluster = new TestCluster("unittest", this.dataPath, peers);
UserReplicatorStateListener listener1 = new UserReplicatorStateListener();
UserReplicatorStateListener listener2 = new UserReplicatorStateListener();
cluster = new TestCluster("unitest", dataPath, peers, new LinkedHashSet<>(), ELECTION_TIMEOUT_MILLIS,
opts -> opts.setReplicationStateListeners(List.of(listener1, listener2)), testInfo);
for (TestPeer peer : peers)
assertTrue(cluster.start(peer));
// elect leader
cluster.ensureLeader(cluster.waitAndGetLeader());
for (Node follower : cluster.getFollowers())
waitForCondition(() -> follower.getLeaderId() != null, 5_000);
assertEquals(4, startedCounter.get());
assertEquals(2, cluster.getLeader().getReplicatorStateListeners().size());
assertEquals(2, cluster.getFollowers().get(0).getReplicatorStateListeners().size());
assertEquals(2, cluster.getFollowers().get(1).getReplicatorStateListeners().size());
for (Node node : cluster.getNodes())
node.removeReplicatorStateListener(listener1);
assertEquals(1, cluster.getLeader().getReplicatorStateListeners().size());
assertEquals(1, cluster.getFollowers().get(0).getReplicatorStateListeners().size());
assertEquals(1, cluster.getFollowers().get(1).getReplicatorStateListeners().size());
}
// TODO asch Broken then using volatile log. A follower with empty log can become a leader IGNITE-14832.
@Test
@Disabled("https://issues.apache.org/jira/browse/IGNITE-14832")
public void testVoteTimedoutStepDown() throws Exception {
List<TestPeer> peers = TestUtils.generatePeers(testInfo, 3);
cluster = new TestCluster("unittest", dataPath, peers, testInfo);
for (TestPeer peer : peers)
assertTrue(cluster.start(peer));
// elect and get leader
Node leader = cluster.waitAndGetLeader();
assertNotNull(leader);
cluster.ensureLeader(leader);
assertEquals(3, leader.listPeers().size());
// apply tasks to leader
sendTestTaskAndWait(leader);
// Stop all followers
List<Node> followers = cluster.getFollowers();
assertFalse(followers.isEmpty());
for (Node node : followers)
assertTrue(cluster.stop(node.getNodeId().getPeerId()));
// Wait leader to step down.
while (leader.isLeader())
Thread.sleep(10);
// old leader try to elect self, it should fail.
((NodeImpl) leader).tryElectSelf();
Thread.sleep(1500);
assertNull(cluster.getLeader());
// Start followers
for (Node node : followers) {
assertTrue(cluster.start(findById(peers, node.getNodeId().getPeerId())));
}
cluster.ensureSame();
}
class UserReplicatorStateListener implements Replicator.ReplicatorStateListener {
/** {@inheritDoc} */
@Override
public void onCreated(PeerId peer) {
int val = startedCounter.incrementAndGet();
log.info("Replicator has been created {} {}", peer, val);
}
@Override
public void stateChanged(final PeerId peer, final ReplicatorState newState) {
log.info("Replicator {} state is changed into {}.", peer, newState);
}
/** {@inheritDoc} */
@Override
public void onError(PeerId peer, Status status) {
log.info("Replicator has errors {} {}", peer, status);
}
/** {@inheritDoc} */
@Override
public void onDestroyed(PeerId peer) {
int val = stoppedCounter.incrementAndGet();
log.info("Replicator has been destroyed {} {}", peer, val);
}
}
@Test
public void testLeaderTransferWithReplicatorStateListener() throws Exception {
List<TestPeer> peers = TestUtils.generatePeers(testInfo, 3);
cluster = new TestCluster("unitest", dataPath, peers, new LinkedHashSet<>(), ELECTION_TIMEOUT_MILLIS,
opts -> opts.setReplicationStateListeners(List.of(new UserReplicatorStateListener())), testInfo);
for (TestPeer peer : peers)
assertTrue(cluster.start(peer));
Node leader = cluster.waitAndGetLeader();
cluster.ensureLeader(leader);
sendTestTaskAndWait(leader);
Thread.sleep(100);
List<Node> followers = cluster.getFollowers();
assertTrue(waitForCondition(() -> startedCounter.get() == 2, 5_000), startedCounter.get() + "");
PeerId targetPeer = followers.get(0).getNodeId().getPeerId().copy();
log.info("Transfer leadership from {} to {}", leader, targetPeer);
assertTrue(leader.transferLeadershipTo(targetPeer).isOk());
Thread.sleep(1000);
cluster.waitAndGetLeader();
assertTrue(waitForCondition(() -> startedCounter.get() == 4, 5_000), startedCounter.get() + "");
for (Node node : cluster.getNodes())
node.clearReplicatorStateListeners();
assertEquals(0, cluster.getLeader().getReplicatorStateListeners().size());
assertEquals(0, cluster.getFollowers().get(0).getReplicatorStateListeners().size());
assertEquals(0, cluster.getFollowers().get(1).getReplicatorStateListeners().size());
}
@Test
public void testTripleNodes() throws Exception {
List<TestPeer> peers = TestUtils.generatePeers(testInfo, 3);
cluster = new TestCluster("unittest", dataPath, peers, testInfo);
for (TestPeer peer : peers)
assertTrue(cluster.start(peer));
// elect and get leader
Node leader = cluster.waitAndGetLeader();
assertNotNull(leader);
cluster.ensureLeader(leader);
assertEquals(3, leader.listPeers().size());
// apply tasks to leader
sendTestTaskAndWait(leader);
{
ByteBuffer data = ByteBuffer.wrap("no closure".getBytes(UTF_8));
Task task = new Task(data, null);
leader.apply(task);
}
{
// task with TaskClosure
ByteBuffer data = ByteBuffer.wrap("task closure".getBytes(UTF_8));
List<String> cbs = synchronizedList(new ArrayList<>());
CountDownLatch latch = new CountDownLatch(1);
Task task = new Task(data, new TaskClosure() {
@Override
public void run(Status status) {
cbs.add("apply");
latch.countDown();
}
@Override
public void onCommitted() {
cbs.add("commit");
}
});
leader.apply(task);
latch.await();
assertEquals(2, cbs.size());
assertEquals("commit", cbs.get(0));
assertEquals("apply", cbs.get(1));
}
cluster.ensureSame();
assertEquals(2, cluster.getFollowers().size());
}
@Test
public void testSingleNodeWithLearner() throws Exception {
TestPeer peer = new TestPeer(testInfo, TestUtils.INIT_PORT);
TestPeer learnerPeer = new TestPeer(testInfo, TestUtils.INIT_PORT + 1);
final int cnt = 10;
MockStateMachine learnerFsm;
RaftGroupService learnerServer;
{
// Start learner
NodeOptions nodeOptions = createNodeOptions(0);
learnerFsm = new MockStateMachine(learnerPeer.getPeerId());
nodeOptions.setFsm(learnerFsm);
nodeOptions.setRaftMetaUri(dataPath + File.separator + "meta1");
nodeOptions.setSnapshotUri(dataPath + File.separator + "snapshot1");
nodeOptions.setInitialConf(new Configuration(Collections.singletonList(peer.getPeerId()), Collections
.singletonList(learnerPeer.getPeerId())));
learnerServer = createService("unittest", learnerPeer, nodeOptions, List.of(peer, learnerPeer));
learnerServer.start();
}
{
// Start leader
NodeOptions nodeOptions = createNodeOptions(1);
MockStateMachine fsm = new MockStateMachine(peer.getPeerId());
nodeOptions.setFsm(fsm);
nodeOptions.setRaftMetaUri(dataPath + File.separator + "meta");
nodeOptions.setSnapshotUri(dataPath + File.separator + "snapshot");
nodeOptions.setInitialConf(new Configuration(Collections.singletonList(peer.getPeerId()), Collections
.singletonList(learnerPeer.getPeerId())));
RaftGroupService server = createService("unittest", peer, nodeOptions, List.of(peer, learnerPeer));
Node node = server.start();
assertEquals(1, node.listPeers().size());
assertTrue(node.listPeers().contains(peer.getPeerId()));
assertTrue(waitForCondition(() -> node.isLeader(), 1_000));
sendTestTaskAndWait(node, cnt);
assertEquals(cnt, fsm.getLogs().size());
int i = 0;
for (ByteBuffer data : fsm.getLogs())
assertEquals("hello" + i++, stringFromBytes(data.array()));
Thread.sleep(1000); //wait for entries to be replicated to learner.
server.shutdown();
}
{
// assert learner fsm
assertEquals(cnt, learnerFsm.getLogs().size());
int i = 0;
for (ByteBuffer data : learnerFsm.getLogs())
assertEquals("hello" + i++, stringFromBytes(data.array()));
learnerServer.shutdown();
}
}
@Test
public void testResetLearners() throws Exception {
List<TestPeer> peers = TestUtils.generatePeers(testInfo, 3);
LinkedHashSet<TestPeer> learners = new LinkedHashSet<>();
for (int i = 0; i < 3; i++)
learners.add(new TestPeer(testInfo, TestUtils.INIT_PORT + 3 + i));
cluster = new TestCluster("unittest", dataPath, peers, learners, ELECTION_TIMEOUT_MILLIS, testInfo);
for (TestPeer peer : peers)
assertTrue(cluster.start(peer));
int i = 0;
for (TestPeer peer : learners) {
assertTrue(cluster.startLearner(peer));
i++;
}
// elect leader
Node leader = cluster.waitAndGetLeader();
cluster.ensureLeader(leader);
waitForCondition(() -> leader.listAlivePeers().size() == 3, 5_000);
waitForCondition(() -> leader.listAliveLearners().size() == 3, 5_000);
sendTestTaskAndWait(leader);
List<MockStateMachine> fsms = cluster.getFsms();
assertEquals(6, fsms.size());
cluster.ensureSame();
{
// Reset learners to 2 nodes
TestPeer learnerPeer = learners.iterator().next();
learners.remove(learnerPeer);
assertEquals(2, learners.size());
SynchronizedClosure done = new SynchronizedClosure();
leader.resetLearners(learners.stream().map(TestPeer::getPeerId).collect(toList()), done);
assertTrue(done.await().isOk());
assertEquals(2, leader.listAliveLearners().size());
assertEquals(2, leader.listLearners().size());
sendTestTaskAndWait(leader);
Thread.sleep(500);
assertEquals(6, fsms.size());
MockStateMachine fsm = fsms.remove(3); // get the removed learner's fsm
assertEquals(fsm.getPeerId(), learnerPeer.getPeerId());
// Ensure no more logs replicated to the removed learner.
assertTrue(cluster.getLeaderFsm().getLogs().size() > fsm.getLogs().size());
assertEquals(cluster.getLeaderFsm().getLogs().size(), 2 * fsm.getLogs().size());
}
{
// remove another learner
TestPeer learnerPeer = learners.iterator().next();
SynchronizedClosure done = new SynchronizedClosure();
leader.removeLearners(Arrays.asList(learnerPeer.getPeerId()), done);
assertTrue(done.await().isOk());
sendTestTaskAndWait(leader);
Thread.sleep(500);
MockStateMachine fsm = fsms.remove(3); // get the removed learner's fsm
assertEquals(fsm.getPeerId(), learnerPeer.getPeerId());
// Ensure no more logs replicated to the removed learner.
assertTrue(cluster.getLeaderFsm().getLogs().size() > fsm.getLogs().size());
assertEquals(cluster.getLeaderFsm().getLogs().size(), fsm.getLogs().size() / 2 * 3);
}
assertEquals(3, leader.listAlivePeers().size());
assertEquals(1, leader.listAliveLearners().size());
assertEquals(1, leader.listLearners().size());
}
@Test
public void testTripleNodesWithStaticLearners() throws Exception {
List<TestPeer> peers = TestUtils.generatePeers(testInfo, 3);
cluster = new TestCluster("unittest", dataPath, peers, testInfo);
LinkedHashSet<TestPeer> learners = new LinkedHashSet<>();
TestPeer learnerPeer = new TestPeer(testInfo, TestUtils.INIT_PORT + 3);
learners.add(learnerPeer);
cluster.setLearners(learners);
for (TestPeer peer : peers)
assertTrue(cluster.start(peer));
Node leader = cluster.waitAndGetLeader();
cluster.ensureLeader(leader);
assertEquals(3, leader.listPeers().size());
assertEquals(1, leader.listLearners().size());
assertTrue(leader.listLearners().contains(learnerPeer.getPeerId()));
assertTrue(leader.listAliveLearners().isEmpty());
// start learner after cluster setup.
assertTrue(cluster.start(learnerPeer));
Thread.sleep(1000);
assertEquals(3, leader.listPeers().size());
assertEquals(1, leader.listLearners().size());
assertEquals(1, leader.listAliveLearners().size());
// apply tasks to leader
sendTestTaskAndWait(leader);
cluster.ensureSame();
assertEquals(4, cluster.getFsms().size());
}
@Test
public void testTripleNodesWithLearners() throws Exception {
List<TestPeer> peers = TestUtils.generatePeers(testInfo, 3);
cluster = new TestCluster("unittest", dataPath, peers, testInfo);
for (TestPeer peer : peers)
assertTrue(cluster.start(peer));
//elect and get leader
Node leader = cluster.waitAndGetLeader();
assertNotNull(leader);
cluster.ensureLeader(leader);
assertEquals(3, leader.listPeers().size());
assertTrue(leader.listLearners().isEmpty());
assertTrue(leader.listAliveLearners().isEmpty());
{
// Adds a learner
SynchronizedClosure done = new SynchronizedClosure();
TestPeer learnerPeer = new TestPeer(testInfo, TestUtils.INIT_PORT + 3);
// Start learner
assertTrue(cluster.startLearner(learnerPeer));
leader.addLearners(Arrays.asList(learnerPeer.getPeerId()), done);
assertTrue(done.await().isOk());
assertEquals(1, leader.listAliveLearners().size());
assertEquals(1, leader.listLearners().size());
}
// apply tasks to leader
sendTestTaskAndWait(leader);
{
ByteBuffer data = ByteBuffer.wrap("no closure".getBytes(UTF_8));
Task task = new Task(data, null);
leader.apply(task);
}
{
// task with TaskClosure
ByteBuffer data = ByteBuffer.wrap("task closure".getBytes(UTF_8));
List<String> cbs = synchronizedList(new ArrayList<>());
CountDownLatch latch = new CountDownLatch(1);
Task task = new Task(data, new TaskClosure() {
@Override
public void run(Status status) {
cbs.add("apply");
latch.countDown();
}
@Override
public void onCommitted() {
cbs.add("commit");
}
});
leader.apply(task);
latch.await();
assertEquals(2, cbs.size());
assertEquals("commit", cbs.get(0));
assertEquals("apply", cbs.get(1));
}
assertEquals(4, cluster.getFsms().size());
assertEquals(2, cluster.getFollowers().size());
assertEquals(1, cluster.getLearners().size());
cluster.ensureSame();
{
// Adds another learner
SynchronizedClosure done = new SynchronizedClosure();
TestPeer learnerPeer = new TestPeer(testInfo, TestUtils.INIT_PORT + 4);
// Start learner
assertTrue(cluster.startLearner(learnerPeer));
leader.addLearners(Arrays.asList(learnerPeer.getPeerId()), done);
assertTrue(done.await().isOk());
assertEquals(2, leader.listAliveLearners().size());
assertEquals(2, leader.listLearners().size());
cluster.ensureSame();
}
{
// stop two followers
for (Node follower : cluster.getFollowers())
assertTrue(cluster.stop(follower.getNodeId().getPeerId()));
// send a new task
ByteBuffer data = ByteBuffer.wrap("task closure".getBytes(UTF_8));
SynchronizedClosure done = new SynchronizedClosure();
leader.apply(new Task(data, done));
// should fail
assertFalse(done.await().isOk());
assertEquals(RaftError.EPERM, done.getStatus().getRaftError());
// One peer with two learners.
assertEquals(3, cluster.getFsms().size());
}
}
@Test
public void testNodesWithPriorityElection() throws Exception {
List<Integer> priorities = new ArrayList<>();
priorities.add(100);
priorities.add(40);
priorities.add(40);
List<TestPeer> peers = TestUtils.generatePriorityPeers(testInfo, 3, priorities);
cluster = new TestCluster("unittest", dataPath, peers, testInfo);
for (TestPeer peer : peers)
assertTrue(cluster.start(peer));
//elect get leader
Node leader = cluster.waitAndGetLeader();
assertNotNull(leader);
cluster.ensureLeader(leader);
assertEquals(3, leader.listPeers().size());
assertEquals(100, leader.getNodeTargetPriority());
assertEquals(100, leader.getLeaderId().getPriority());
assertEquals(2, cluster.getFollowers().size());
}
@Test
public void testNodesWithPartPriorityElection() throws Exception {
List<Integer> priorities = new ArrayList<>();
priorities.add(100);
priorities.add(40);
priorities.add(-1);
List<TestPeer> peers = TestUtils.generatePriorityPeers(testInfo, 3, priorities);
cluster = new TestCluster("unittest", dataPath, peers, testInfo);
for (TestPeer peer : peers)
assertTrue(cluster.start(peer));
//elect and get leader
Node leader = cluster.waitAndGetLeader();
assertNotNull(leader);
cluster.ensureLeader(leader);
assertEquals(3, leader.listPeers().size());
assertEquals(2, cluster.getFollowers().size());
}
@Test
public void testNodesWithSpecialPriorityElection() throws Exception {
List<Integer> priorities = new ArrayList<>();
priorities.add(0);
priorities.add(0);
priorities.add(-1);
List<TestPeer> peers = TestUtils.generatePriorityPeers(testInfo, 3, priorities);
cluster = new TestCluster("unittest", dataPath, peers, testInfo);
for (TestPeer peer : peers)
assertTrue(cluster.start(peer));
//elect and get leader
Node leader = cluster.waitAndGetLeader();
assertNotNull(leader);
cluster.ensureLeader(leader);
assertEquals(3, leader.listPeers().size());
assertEquals(2, cluster.getFollowers().size());
}
@Test
public void testNodesWithZeroValPriorityElection() throws Exception {
List<Integer> priorities = new ArrayList<>();
priorities.add(50);
priorities.add(0);
priorities.add(0);
List<TestPeer> peers = TestUtils.generatePriorityPeers(testInfo, 3, priorities);
cluster = new TestCluster("unittest", dataPath, peers, testInfo);
for (TestPeer peer : peers)
assertTrue(cluster.start(peer));
//wait and get leader
Node leader = cluster.waitAndGetLeader();
assertNotNull(leader);
cluster.ensureLeader(leader);
assertEquals(3, leader.listPeers().size());
assertEquals(2, cluster.getFollowers().size());
assertEquals(50, leader.getNodeTargetPriority());
assertEquals(50, leader.getLeaderId().getPriority());
}
@Test
public void testNoLeaderWithZeroValPriorityElection() throws Exception {
List<Integer> priorities = new ArrayList<>();
priorities.add(0);
priorities.add(0);
priorities.add(0);
List<TestPeer> peers = TestUtils.generatePriorityPeers(testInfo, 3, priorities);
cluster = new TestCluster("unittest", dataPath, peers, testInfo);
for (TestPeer peer : peers)
assertTrue(cluster.start(peer));
Thread.sleep(200);
List<Node> followers = cluster.getFollowers();
assertEquals(3, followers.size());
for (Node follower : followers)
assertEquals(0, follower.getNodeId().getPeerId().getPriority());
}
@Test
public void testLeaderStopAndReElectWithPriority() throws Exception {
List<Integer> priorities = new ArrayList<>();
priorities.add(100);
priorities.add(60);
priorities.add(10);
List<TestPeer> peers = TestUtils.generatePriorityPeers(testInfo, 3, priorities);
cluster = new TestCluster("unittest", dataPath, peers, testInfo);
for (TestPeer peer : peers)
assertTrue(cluster.start(peer));
Node leader = cluster.waitAndGetLeader();
cluster.ensureLeader(leader);
assertNotNull(leader);
assertEquals(100, leader.getNodeId().getPeerId().getPriority());
assertEquals(100, leader.getNodeTargetPriority());
// apply tasks to leader
sendTestTaskAndWait(leader);
// wait for all update received, before election of new leader
cluster.ensureSame();
// stop leader
assertTrue(cluster.stop(leader.getNodeId().getPeerId()));
// elect new leader
leader = cluster.waitAndGetLeader();
assertNotNull(leader);
// nodes with the same log size will elect leader only by priority
assertEquals(60, leader.getNodeId().getPeerId().getPriority());
assertEquals(100, leader.getNodeTargetPriority());
}
@Test
public void testRemoveLeaderWithPriority() throws Exception {
List<Integer> priorities = new ArrayList<>();
priorities.add(100);
priorities.add(60);
priorities.add(10);
List<TestPeer> peers = TestUtils.generatePriorityPeers(testInfo, 3, priorities);
cluster = new TestCluster("unittest", dataPath, peers, testInfo);
for (TestPeer peer : peers)
assertTrue(cluster.start(peer));
// elect and get leader
Node leader = cluster.waitAndGetLeader();
assertNotNull(leader);
cluster.ensureLeader(leader);
assertEquals(100, leader.getNodeTargetPriority());
assertEquals(100, leader.getNodeId().getPeerId().getPriority());
List<Node> followers = cluster.getFollowers();
assertEquals(2, followers.size());
PeerId oldLeader = leader.getNodeId().getPeerId().copy();
// remove old leader
log.info("Remove old leader {}", oldLeader);
CountDownLatch latch = new CountDownLatch(1);
leader.removePeer(oldLeader, new ExpectClosure(latch));
waitLatch(latch);
assertEquals(60, leader.getNodeTargetPriority());
// stop and clean old leader
log.info("Stop and clean old leader {}", oldLeader);
assertTrue(cluster.stop(oldLeader));
cluster.clean(oldLeader);
// elect new leader
leader = cluster.waitAndGetLeader();
log.info("New leader is {}", leader);
assertNotNull(leader);
assertNotEquals(leader.getNodeId().getPeerId(), oldLeader);
}
@Test
public void testChecksum() throws Exception {
List<TestPeer> peers = TestUtils.generatePeers(testInfo, 3);
// start with checksum validation
{
TestCluster cluster = new TestCluster("unittest", dataPath, peers, testInfo);
try {
RaftOptions raftOptions = new RaftOptions();
raftOptions.setEnableLogEntryChecksum(true);
for (TestPeer peer : peers)
assertTrue(cluster.start(peer, false, 300, true, null, raftOptions));
Node leader = cluster.waitAndGetLeader();
assertNotNull(leader);
assertEquals(3, leader.listPeers().size());
sendTestTaskAndWait(leader);
cluster.ensureSame();
}
finally {
cluster.stopAll();
}
}
// restart with peer3 enable checksum validation
{
TestCluster cluster = new TestCluster("unittest", dataPath, peers, testInfo);
try {
RaftOptions raftOptions = new RaftOptions();
raftOptions.setEnableLogEntryChecksum(false);
for (TestPeer peer : peers) {
if (peer.equals(peers.get(2))) {
raftOptions = new RaftOptions();
raftOptions.setEnableLogEntryChecksum(true);
}
assertTrue(cluster.start(peer, false, 300, true, null, raftOptions));
}
Node leader = cluster.waitAndGetLeader();
assertNotNull(leader);
assertEquals(3, leader.listPeers().size());
sendTestTaskAndWait(leader);
cluster.ensureSame();
}
finally {
cluster.stopAll();
}
}
// restart with no checksum validation
{
TestCluster cluster = new TestCluster("unittest", dataPath, peers, testInfo);
try {
RaftOptions raftOptions = new RaftOptions();
raftOptions.setEnableLogEntryChecksum(false);
for (TestPeer peer : peers)
assertTrue(cluster.start(peer, false, 300, true, null, raftOptions));
Node leader = cluster.waitAndGetLeader();
assertNotNull(leader);
assertEquals(3, leader.listPeers().size());
sendTestTaskAndWait(leader);
cluster.ensureSame();
}
finally {
cluster.stopAll();
}
}
// restart with all peers enable checksum validation
{
TestCluster cluster = new TestCluster("unittest", dataPath, peers, testInfo);
try {
RaftOptions raftOptions = new RaftOptions();
raftOptions.setEnableLogEntryChecksum(true);
for (TestPeer peer : peers)
assertTrue(cluster.start(peer, false, 300, true, null, raftOptions));
Node leader = cluster.waitAndGetLeader();
assertNotNull(leader);
assertEquals(3, leader.listPeers().size());
sendTestTaskAndWait(leader);
cluster.ensureSame();
}
finally {
cluster.stopAll();
}
}
}
@Test
public void testReadIndex() throws Exception {
List<TestPeer> peers = TestUtils.generatePeers(testInfo, 3);
cluster = new TestCluster("unittest", dataPath, peers, testInfo);
for (TestPeer peer : peers)
assertTrue(cluster.start(peer, false, 300, true));
//elect and get leader
Node leader = cluster.waitAndGetLeader();
assertNotNull(leader);
assertEquals(3, leader.listPeers().size());
// apply tasks to leader
sendTestTaskAndWait(leader);
// first call will fail-fast when no connection
if (!assertReadIndex(leader, 11))
assertTrue(assertReadIndex(leader, 11));
// read from follower
for (Node follower : cluster.getFollowers()) {
assertNotNull(follower);
assertTrue(waitForCondition(() -> leader.getNodeId().getPeerId().equals(follower.getLeaderId()), 5_000));
assertReadIndex(follower, 11);
}
// read with null request context
CountDownLatch latch = new CountDownLatch(1);
leader.readIndex(null, new ReadIndexClosure() {
@Override
public void run(Status status, long index, byte[] reqCtx) {
assertNull(reqCtx);
assertTrue(status.isOk());
latch.countDown();
}
});
latch.await();
}
@Test // TODO asch do we need read index timeout ? https://issues.apache.org/jira/browse/IGNITE-14832
public void testReadIndexTimeout() throws Exception {
List<TestPeer> peers = TestUtils.generatePeers(testInfo, 3);
cluster = new TestCluster("unittest", dataPath, peers, testInfo);
for (TestPeer peer : peers)
assertTrue(cluster.start(peer, false, 300, true));
//elect and get leader
Node leader = cluster.waitAndGetLeader();
assertNotNull(leader);
assertEquals(3, leader.listPeers().size());
// apply tasks to leader
sendTestTaskAndWait(leader);
// first call will fail-fast when no connection
if (!assertReadIndex(leader, 11))
assertTrue(assertReadIndex(leader, 11));
// read from follower
for (Node follower : cluster.getFollowers()) {
assertNotNull(follower);
assertTrue(waitForCondition(() -> leader.getNodeId().getPeerId().equals(follower.getLeaderId()), 5_000));
assertReadIndex(follower, 11);
}
// read with null request context
CountDownLatch latch = new CountDownLatch(1);
long start = System.currentTimeMillis();
leader.readIndex(null, new ReadIndexClosure() {
@Override
public void run(Status status, long index, byte[] reqCtx) {
assertNull(reqCtx);
if (status.isOk())
System.err.println("Read-index so fast: " + (System.currentTimeMillis() - start) + "ms");
else {
assertEquals(new Status(RaftError.ETIMEDOUT, "read-index request timeout"), status);
assertEquals(-1, index);
}
latch.countDown();
}
});
latch.await();
}
@Test
public void testReadIndexFromLearner() throws Exception {
List<TestPeer> peers = TestUtils.generatePeers(testInfo, 3);
cluster = new TestCluster("unittest", dataPath, peers, testInfo);
for (TestPeer peer : peers)
assertTrue(cluster.start(peer, false, 300, true));
// elect and get leader
Node leader = cluster.waitAndGetLeader();
assertNotNull(leader);
assertEquals(3, leader.listPeers().size());
// apply tasks to leader
sendTestTaskAndWait(leader);
{
// Adds a learner
SynchronizedClosure done = new SynchronizedClosure();
TestPeer learnerPeer = new TestPeer(testInfo, TestUtils.INIT_PORT + 3);
// Start learner
assertTrue(cluster.startLearner(learnerPeer));
leader.addLearners(Arrays.asList(learnerPeer.getPeerId()), done);
assertTrue(done.await().isOk());
assertEquals(1, leader.listAliveLearners().size());
assertEquals(1, leader.listLearners().size());
}
Thread.sleep(100);
// read from learner
Node learner = cluster.getNodes().get(3);
assertNotNull(leader);
assertReadIndex(learner, 12);
assertReadIndex(learner, 12);
}
@Test
public void testReadIndexChaos() throws Exception {
List<TestPeer> peers = TestUtils.generatePeers(testInfo, 3);
cluster = new TestCluster("unittest", dataPath, peers, testInfo);
for (TestPeer peer : peers)
assertTrue(cluster.start(peer, false, 300, true));
//wait and get leader
Node leader = cluster.waitAndGetLeader();
assertNotNull(leader);
assertEquals(3, leader.listPeers().size());
CountDownLatch latch = new CountDownLatch(10);
ExecutorService executor = Executors.newFixedThreadPool(10);
executors.add(executor);
for (int i = 0; i < 10; i++) {
executor.submit(new Runnable() {
/** {@inheritDoc} */
@Override public void run() {
try {
for (int i = 0; i < 100; i++) {
try {
sendTestTaskAndWait(leader);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
readIndexRandom(cluster);
}
}
finally {
latch.countDown();
}
}
private void readIndexRandom(TestCluster cluster) {
CountDownLatch readLatch = new CountDownLatch(1);
byte[] requestContext = TestUtils.getRandomBytes();
cluster.getNodes().get(ThreadLocalRandom.current().nextInt(3))
.readIndex(requestContext, new ReadIndexClosure() {
@Override
public void run(Status status, long index, byte[] reqCtx) {
if (status.isOk()) {
assertTrue(status.isOk(), status.toString());
assertTrue(index > 0);
assertArrayEquals(requestContext, reqCtx);
}
readLatch.countDown();
}
});
try {
readLatch.await();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
}
latch.await();
cluster.ensureSame();
for (MockStateMachine fsm : cluster.getFsms())
assertEquals(10000, fsm.getLogs().size());
}
@Test
public void testNodeMetrics() throws Exception {
List<TestPeer> peers = TestUtils.generatePeers(testInfo, 3);
cluster = new TestCluster("unittest", dataPath, peers, testInfo);
for (TestPeer peer : peers)
assertTrue(cluster.start(peer, false, 300, true));
//elect and get leader
Node leader = cluster.waitAndGetLeader();
assertNotNull(leader);
assertEquals(3, leader.listPeers().size());
// apply tasks to leader
sendTestTaskAndWait(leader);
{
ByteBuffer data = ByteBuffer.wrap("no closure".getBytes(UTF_8));
Task task = new Task(data, null);
leader.apply(task);
}
cluster.ensureSame();
for (Node node : cluster.getNodes()) {
System.out.println("-------------" + node.getNodeId() + "-------------");
ConsoleReporter reporter = ConsoleReporter.forRegistry(node.getNodeMetrics().getMetricRegistry())
.build();
reporter.report();
reporter.close();
System.out.println();
}
// TODO check http status https://issues.apache.org/jira/browse/IGNITE-14832
assertEquals(2, cluster.getFollowers().size());
}
@Test
public void testLeaderFail() throws Exception {
List<TestPeer> peers = TestUtils.generatePeers(testInfo, 3);
cluster = new TestCluster("unittest", dataPath, peers, testInfo);
for (TestPeer peer : peers)
assertTrue(cluster.start(peer));
//elect get leader
Node leader = cluster.waitAndGetLeader();
assertNotNull(leader);
log.info("Current leader is {}", leader.getLeaderId());
// apply tasks to leader
sendTestTaskAndWait(leader);
List<Node> followers = cluster.getFollowers();
blockMessagesOnFollowers(followers, (msg, nodeId) -> {
if (msg instanceof RpcRequests.RequestVoteRequest) {
RpcRequests.RequestVoteRequest msg0 = (RpcRequests.RequestVoteRequest) msg;
return !msg0.preVote();
}
return false;
});
// stop leader
log.warn("Stop leader {}", leader.getNodeId().getPeerId());
PeerId oldLeader = leader.getNodeId().getPeerId();
assertTrue(cluster.stop(leader.getNodeId().getPeerId()));
assertFalse(followers.isEmpty());
int success = sendTestTaskAndWait("follower apply ", followers.get(0), 10, -1); // Should fail, because no leader.
stopBlockingMessagesOnFollowers(followers);
// elect new leader
leader = cluster.waitAndGetLeader();
log.info("Elect new leader is {}", leader.getLeaderId());
// apply tasks to new leader
CountDownLatch latch = new CountDownLatch(10);
for (int i = 10; i < 20; i++) {
ByteBuffer data = ByteBuffer.wrap(("hello" + i).getBytes(UTF_8));
Task task = new Task(data, new ExpectClosure(latch));
leader.apply(task);
}
waitLatch(latch);
// restart old leader
log.info("restart old leader {}", oldLeader);
assertTrue(cluster.start(findById(peers, oldLeader)));
// apply something
latch = new CountDownLatch(10);
for (int i = 20; i < 30; i++) {
ByteBuffer data = ByteBuffer.wrap(("hello" + i).getBytes(UTF_8));
Task task = new Task(data, new ExpectClosure(latch));
leader.apply(task);
}
waitLatch(latch);
// stop and clean old leader
cluster.stop(oldLeader);
cluster.clean(oldLeader);
// restart old leader
log.info("Restart old leader with cleanup {}", oldLeader);
assertTrue(cluster.start(findById(peers, oldLeader)));
cluster.ensureSame();
for (MockStateMachine fsm : cluster.getFsms())
assertEquals(30 + success, fsm.getLogs().size());
}
@Test
public void testJoinNodes() throws Exception {
TestPeer peer0 = new TestPeer(testInfo, TestUtils.INIT_PORT);
TestPeer peer1 = new TestPeer(testInfo, TestUtils.INIT_PORT + 1);
TestPeer peer2 = new TestPeer(testInfo, TestUtils.INIT_PORT + 2);
TestPeer peer3 = new TestPeer(testInfo, TestUtils.INIT_PORT + 3);
ArrayList<TestPeer> peers = new ArrayList<>();
peers.add(peer0);
// start single cluster
cluster = new TestCluster("unittest", dataPath, peers, testInfo);
assertTrue(cluster.start(peer0));
Node leader = cluster.waitAndGetLeader();
assertNotNull(leader);
assertEquals(leader.getNodeId().getPeerId(), peer0.getPeerId());
sendTestTaskAndWait(leader);
// start peer1
assertTrue(cluster.start(peer1, false, 300));
// add peer1
CountDownLatch latch = new CountDownLatch(1);
peers.add(peer1);
leader.addPeer(peer1.getPeerId(), new ExpectClosure(latch));
waitLatch(latch);
cluster.ensureSame();
assertEquals(2, cluster.getFsms().size());
for (MockStateMachine fsm : cluster.getFsms())
assertEquals(10, fsm.getLogs().size());
// add peer2 but not start
peers.add(peer2);
latch = new CountDownLatch(1);
leader.addPeer(peer2.getPeerId(), new ExpectClosure(RaftError.ECATCHUP, latch));
waitLatch(latch);
// start peer2 after 2 seconds
Thread.sleep(2000);
assertTrue(cluster.start(peer2, false, 300));
// re-add peer2
latch = new CountDownLatch(2);
leader.addPeer(peer2.getPeerId(), new ExpectClosure(latch));
// concurrent configuration change
leader.addPeer(peer3.getPeerId(), new ExpectClosure(RaftError.EBUSY, latch));
waitLatch(latch);
// re-add peer2 directly
try {
leader.addPeer(peer2.getPeerId(), new ExpectClosure(latch));
fail();
}
catch (IllegalArgumentException e) {
assertEquals("Peer already exists in current configuration", e.getMessage());
}
cluster.ensureSame();
assertEquals(3, cluster.getFsms().size());
assertEquals(2, cluster.getFollowers().size());
for (MockStateMachine fsm : cluster.getFsms())
assertEquals(10, fsm.getLogs().size());
}
@Test
public void testRemoveFollower() throws Exception {
List<TestPeer> peers = TestUtils.generatePeers(testInfo, 3);
cluster = new TestCluster("unittest", dataPath, peers, testInfo);
for (TestPeer peer : peers)
assertTrue(cluster.start(peer));
// wait and get leader
Node leader = cluster.waitAndGetLeader();
assertNotNull(leader);
cluster.ensureLeader(leader);
// apply tasks to leader
sendTestTaskAndWait(leader);
cluster.ensureSame();
List<Node> followers = cluster.getFollowers();
assertEquals(2, followers.size());
TestPeer followerPeer = findById(peers, followers.get(0).getNodeId().getPeerId());
// stop and clean follower
log.info("Stop and clean follower {}", followerPeer);
assertTrue(cluster.stop(followerPeer.getPeerId()));
cluster.clean(followerPeer.getPeerId());
// remove follower
log.info("Remove follower {}", followerPeer);
CountDownLatch latch = new CountDownLatch(1);
leader.removePeer(followerPeer.getPeerId(), new ExpectClosure(latch));
waitLatch(latch);
sendTestTaskAndWait(leader, 10, RaftError.SUCCESS);
followers = cluster.getFollowers();
assertEquals(1, followers.size());
peers = TestUtils.generatePeers(testInfo, 3);
assertTrue(peers.remove(followerPeer));
// start follower
log.info("Start and add follower {}", followerPeer);
assertTrue(cluster.start(followerPeer));
// re-add follower
latch = new CountDownLatch(1);
leader.addPeer(followerPeer.getPeerId(), new ExpectClosure(latch));
waitLatch(latch);
followers = cluster.getFollowers();
assertEquals(2, followers.size());
cluster.ensureSame();
assertEquals(3, cluster.getFsms().size());
for (MockStateMachine fsm : cluster.getFsms())
assertEquals(20, fsm.getLogs().size());
}
@Test
public void testRemoveLeader() throws Exception {
List<TestPeer> peers = TestUtils.generatePeers(testInfo, 3);
cluster = new TestCluster("unittest", dataPath, peers, testInfo);
for (TestPeer peer : peers)
assertTrue(cluster.start(peer));
//elect and get leader
Node leader = cluster.waitAndGetLeader();
assertNotNull(leader);
cluster.ensureLeader(leader);
// apply tasks to leader
sendTestTaskAndWait(leader);
cluster.ensureSame();
List<Node> followers = cluster.getFollowers();
assertEquals(2, followers.size());
PeerId leaderId = leader.getNodeId().getPeerId();
TestPeer oldLeader = findById(peers, leaderId);
// remove old leader
log.info("Remove old leader {}", oldLeader);
CountDownLatch latch = new CountDownLatch(1);
leader.removePeer(oldLeader.getPeerId(), new ExpectClosure(latch));
waitLatch(latch);
// elect new leader
leader = cluster.waitAndGetLeader();
log.info("New leader is {}", leader);
assertNotNull(leader);
// apply tasks to new leader
sendTestTaskAndWait(leader, 10, RaftError.SUCCESS);
// stop and clean old leader
log.info("Stop and clean old leader {}", oldLeader);
assertTrue(cluster.stop(oldLeader.getPeerId()));
cluster.clean(oldLeader.getPeerId());
// Add and start old leader
log.info("Start and add old leader {}", oldLeader);
assertTrue(cluster.start(oldLeader));
peers = TestUtils.generatePeers(testInfo, 3);
assertTrue(peers.remove(oldLeader));
latch = new CountDownLatch(1);
leader.addPeer(oldLeader.getPeerId(), new ExpectClosure(latch));
waitLatch(latch);
followers = cluster.getFollowers();
assertEquals(2, followers.size());
cluster.ensureSame();
assertEquals(3, cluster.getFsms().size());
for (MockStateMachine fsm : cluster.getFsms())
assertEquals(20, fsm.getLogs().size());
}
@Test
public void testPreVote() throws Exception {
List<TestPeer> peers = TestUtils.generatePeers(testInfo, 3);
cluster = new TestCluster("unitest", dataPath, peers, testInfo);
for (TestPeer peer : peers)
assertTrue(cluster.start(peer));
// get leader
Node leader = cluster.waitAndGetLeader();
long savedTerm = ((NodeImpl) leader).getCurrentTerm();
assertNotNull(leader);
// apply tasks to leader
sendTestTaskAndWait(leader);
cluster.ensureSame();
List<Node> followers = cluster.getFollowers();
assertEquals(2, followers.size());
PeerId followerPeerId = followers.get(0).getNodeId().getPeerId();
TestPeer followerPeer = findById(peers, followerPeerId);
// remove follower
log.info("Remove follower {}", followerPeer);
CountDownLatch latch = new CountDownLatch(1);
leader.removePeer(followerPeer.getPeerId(), new ExpectClosure(latch));
waitLatch(latch);
sendTestTaskAndWait(leader, 10, RaftError.SUCCESS);
Thread.sleep(2000);
// add follower
log.info("Add follower {}", followerPeer);
peers = TestUtils.generatePeers(testInfo, 3);
assertTrue(peers.remove(followerPeer));
latch = new CountDownLatch(1);
leader.addPeer(followerPeer.getPeerId(), new ExpectClosure(latch));
waitLatch(latch);
leader = cluster.getLeader();
assertNotNull(leader);
// leader term should not be changed.
assertEquals(savedTerm, ((NodeImpl) leader).getCurrentTerm());
}
@Test
public void testSetPeer1() throws Exception {
cluster = new TestCluster("testSetPeer1", dataPath, new ArrayList<>(), testInfo);
TestPeer bootPeer = new TestPeer(testInfo, TestUtils.INIT_PORT);
assertTrue(cluster.start(bootPeer));
List<Node> nodes = cluster.getFollowers();
assertEquals(1, nodes.size());
List<PeerId> peers = new ArrayList<>();
peers.add(bootPeer.getPeerId());
// reset peers from empty
assertTrue(nodes.get(0).resetPeers(new Configuration(peers)).isOk());
assertNotNull(cluster.waitAndGetLeader());
}
@Test
@Disabled("https://issues.apache.org/jira/browse/IGNITE-21457")
public void testSetPeer2() throws Exception {
List<TestPeer> peers = TestUtils.generatePeers(testInfo, 3);
cluster = new TestCluster("unitest", dataPath, peers, testInfo);
for (TestPeer peer : peers)
assertTrue(cluster.start(peer));
// get leader
Node leader = cluster.waitAndGetLeader();
assertNotNull(leader);
// apply tasks to leader
sendTestTaskAndWait(leader);
cluster.ensureSame();
List<Node> followers = cluster.getFollowers();
assertEquals(2, followers.size());
PeerId followerPeer1 = followers.get(0).getNodeId().getPeerId();
PeerId followerPeer2 = followers.get(1).getNodeId().getPeerId();
log.info("Stop and clean follower {}", followerPeer1);
assertTrue(cluster.stop(followerPeer1));
cluster.clean(followerPeer1);
// apply tasks to leader again
sendTestTaskAndWait(leader, 10, RaftError.SUCCESS);
// set peer when no quorum die
PeerId leaderId = leader.getLeaderId().copy();
log.info("Set peers to {}", leaderId);
log.info("Stop and clean follower {}", followerPeer2);
assertTrue(cluster.stop(followerPeer2));
cluster.clean(followerPeer2);
assertTrue(waitForTopology(cluster, leaderId, 1, 5_000));
// leader will step-down, become follower
Thread.sleep(2000);
List<PeerId> newPeers = new ArrayList<>();
newPeers.add(leaderId);
// new peers equal to current conf
assertTrue(leader.resetPeers(new Configuration(peers.stream().map(TestPeer::getPeerId).collect(toList()))).isOk());
// set peer when quorum die
log.warn("Set peers to {}", leaderId);
assertTrue(leader.resetPeers(new Configuration(newPeers)).isOk());
leader = cluster.waitAndGetLeader();
assertNotNull(leader);
assertEquals(leaderId, leader.getNodeId().getPeerId());
log.info("start follower {}", followerPeer1);
assertTrue(cluster.start(findById(peers, followerPeer1), true, 300));
log.info("start follower {}", followerPeer2);
assertTrue(cluster.start(findById(peers, followerPeer2), true, 300));
assertTrue(waitForTopology(cluster, followerPeer1, 3, 10_000));
assertTrue(waitForTopology(cluster, followerPeer2, 3, 10_000));
CountDownLatch latch = new CountDownLatch(1);
log.info("Add old follower {}", followerPeer1);
leader.addPeer(followerPeer1, new ExpectClosure(latch));
waitLatch(latch);
latch = new CountDownLatch(1);
log.info("Add old follower {}", followerPeer2);
leader.addPeer(followerPeer2, new ExpectClosure(latch));
waitLatch(latch);
newPeers.add(followerPeer1);
newPeers.add(followerPeer2);
cluster.ensureSame();
assertEquals(3, cluster.getFsms().size());
for (MockStateMachine fsm : cluster.getFsms())
assertEquals(20, fsm.getLogs().size());
}
/**
* @throws Exception
*/
@Test
public void testRestoreSnapshot() throws Exception {
List<TestPeer> peers = TestUtils.generatePeers(testInfo, 3);
cluster = new TestCluster("unitest", dataPath, peers, testInfo);
for (TestPeer peer : peers)
assertTrue(cluster.start(peer));
// get leader
Node leader = cluster.waitAndGetLeader();
log.info("Leader: " + leader);
assertNotNull(leader);
// apply tasks to leader
sendTestTaskAndWait(leader);
cluster.ensureSame();
triggerLeaderSnapshot(cluster, leader);
// stop leader
PeerId leaderAddr = leader.getNodeId().getPeerId().copy();
assertTrue(cluster.stop(leaderAddr));
// restart leader
cluster.waitAndGetLeader();
assertEquals(0, cluster.getLeaderFsm().getLoadSnapshotTimes());
assertTrue(cluster.start(findById(peers, leaderAddr)));
cluster.ensureSame();
assertEquals(0, cluster.getLeaderFsm().getLoadSnapshotTimes());
}
/**
* @throws Exception
*/
@Test
public void testRestoreSnapshotWithDelta() throws Exception {
List<TestPeer> peers = TestUtils.generatePeers(testInfo, 3);
cluster = new TestCluster("unitest", dataPath, peers, testInfo);
for (TestPeer peer : peers)
assertTrue(cluster.start(peer));
// get leader
Node leader = cluster.waitAndGetLeader();
log.info("Leader: " + leader);
assertNotNull(leader);
// apply tasks to leader
sendTestTaskAndWait(leader);
cluster.ensureSame();
triggerLeaderSnapshot(cluster, leader);
// stop leader
PeerId leaderAddr = leader.getNodeId().getPeerId().copy();
assertTrue(cluster.stop(leaderAddr));
// restart leader
sendTestTaskAndWait(cluster.waitAndGetLeader(), 10, RaftError.SUCCESS);
assertEquals(0, cluster.getLeaderFsm().getLoadSnapshotTimes());
assertTrue(cluster.start(findById(peers, leaderAddr)));
Node oldLeader = cluster.getNode(leaderAddr);
cluster.ensureSame();
assertEquals(0, cluster.getLeaderFsm().getLoadSnapshotTimes());
MockStateMachine fsm = (MockStateMachine) oldLeader.getOptions().getFsm();
assertEquals(1, fsm.getLoadSnapshotTimes());
}
@Test
public void testInstallSnapshotWithThrottle() throws Exception {
List<TestPeer> peers = TestUtils.generatePeers(testInfo, 3);
cluster = new TestCluster("unitest", dataPath, peers, testInfo);
for (TestPeer peer : peers)
assertTrue(cluster.start(peer, false, 200, false, new ThroughputSnapshotThrottle(1024, 1)));
// get leader
Node leader = cluster.waitAndGetLeader();
assertNotNull(leader);
cluster.ensureLeader(leader);
// apply tasks to leader
sendTestTaskAndWait(leader);
cluster.ensureSame();
// stop follower1
List<Node> followers = cluster.getFollowers();
assertEquals(2, followers.size());
PeerId followerAddr = followers.get(0).getNodeId().getPeerId();
assertTrue(cluster.stop(followerAddr));
cluster.waitAndGetLeader();
// apply something more
sendTestTaskAndWait(leader, 10, RaftError.SUCCESS);
Thread.sleep(1000);
// trigger leader snapshot
triggerLeaderSnapshot(cluster, leader);
// apply something more
sendTestTaskAndWait(leader, 20, RaftError.SUCCESS);
// trigger leader snapshot
triggerLeaderSnapshot(cluster, leader, 2);
// wait leader to compact logs
Thread.sleep(1000);
// restart follower.
cluster.clean(followerAddr);
assertTrue(cluster.start(findById(peers, followerAddr), true, 300, false, new ThroughputSnapshotThrottle(1024, 1)));
Thread.sleep(2000);
cluster.ensureSame();
assertEquals(3, cluster.getFsms().size());
for (MockStateMachine fsm : cluster.getFsms())
assertEquals(30, fsm.getLogs().size());
}
@Test // TODO add test for timeout on snapshot install https://issues.apache.org/jira/browse/IGNITE-14832
@Disabled("https://issues.apache.org/jira/browse/IGNITE-16467")
public void testInstallLargeSnapshotWithThrottle() throws Exception {
List<TestPeer> peers = TestUtils.generatePeers(testInfo, 4);
cluster = new TestCluster("unitest", dataPath, peers.subList(0, 3), testInfo);
for (int i = 0; i < peers.size() - 1; i++) {
TestPeer peer = peers.get(i);
boolean started = cluster.start(peer, false, 200, false);
assertTrue(started);
}
// get leader
Node leader = cluster.waitAndGetLeader();
assertNotNull(leader);
cluster.ensureLeader(leader);
// apply tasks to leader
sendTestTaskAndWait(leader, 0, RaftError.SUCCESS);
cluster.ensureSame();
// apply something more
for (int i = 1; i < 100; i++)
sendTestTaskAndWait(leader, i * 10, RaftError.SUCCESS);
Thread.sleep(1000);
// trigger leader snapshot
triggerLeaderSnapshot(cluster, leader);
// apply something more
for (int i = 100; i < 200; i++)
sendTestTaskAndWait(leader, i * 10, RaftError.SUCCESS);
// trigger leader snapshot
triggerLeaderSnapshot(cluster, leader, 2);
// wait leader to compact logs
Thread.sleep(1000);
// add follower
TestPeer newPeer = peers.get(3);
SnapshotThrottle snapshotThrottle = new ThroughputSnapshotThrottle(128, 1);
boolean started = cluster.start(newPeer, false, 300, false, snapshotThrottle);
assertTrue(started);
CountDownLatch latch = new CountDownLatch(1);
leader.addPeer(newPeer.getPeerId(), status -> {
assertTrue(status.isOk(), status.toString());
latch.countDown();
});
waitLatch(latch);
cluster.ensureSame();
assertEquals(4, cluster.getFsms().size());
for (MockStateMachine fsm : cluster.getFsms())
assertEquals(2000, fsm.getLogs().size());
}
@Test
public void testInstallLargeSnapshot() throws Exception {
List<TestPeer> peers = TestUtils.generatePeers(testInfo, 4);
cluster = new TestCluster("unitest", dataPath, peers.subList(0, 3), testInfo);
for (int i = 0; i < peers.size() - 1; i++) {
TestPeer peer = peers.get(i);
boolean started = cluster.start(peer, false, 200, false);
assertTrue(started);
}
// get leader
Node leader = cluster.waitAndGetLeader();
assertNotNull(leader);
cluster.ensureLeader(leader);
// apply tasks to leader
sendTestTaskAndWait(leader, 0, RaftError.SUCCESS);
cluster.ensureSame();
// apply something more
for (int i = 1; i < 100; i++)
sendTestTaskAndWait(leader, i * 10, RaftError.SUCCESS);
Thread.sleep(1000);
// trigger leader snapshot
triggerLeaderSnapshot(cluster, leader);
// apply something more
for (int i = 100; i < 200; i++)
sendTestTaskAndWait(leader, i * 10, RaftError.SUCCESS);
// trigger leader snapshot
triggerLeaderSnapshot(cluster, leader, 2);
// wait leader to compact logs
Thread.sleep(1000);
// add follower
TestPeer newPeer = peers.get(3);
RaftOptions raftOptions = new RaftOptions();
raftOptions.setMaxByteCountPerRpc(128);
boolean started = cluster.start(newPeer, false, 300, false, null, raftOptions);
assertTrue(started);
CountDownLatch latch = new CountDownLatch(1);
leader.addPeer(newPeer.getPeerId(), status -> {
assertTrue(status.isOk(), status.toString());
latch.countDown();
});
waitLatch(latch);
cluster.ensureSame();
assertEquals(4, cluster.getFsms().size());
for (MockStateMachine fsm : cluster.getFsms())
assertEquals(2000, fsm.getLogs().size());
}
@Test
public void testInstallSnapshot() throws Exception {
List<TestPeer> peers = TestUtils.generatePeers(testInfo, 3);
cluster = new TestCluster("unitest", dataPath, peers, testInfo);
for (TestPeer peer : peers)
assertTrue(cluster.start(peer));
// get leader
Node leader = cluster.waitAndGetLeader();
assertNotNull(leader);
cluster.ensureLeader(leader);
// apply tasks to leader
sendTestTaskAndWait(leader);
cluster.ensureSame();
// stop follower1
List<Node> followers = cluster.getFollowers();
assertEquals(2, followers.size());
PeerId followerAddr = followers.get(0).getNodeId().getPeerId();
assertTrue(cluster.stop(followerAddr));
// apply something more
sendTestTaskAndWait(leader, 10, RaftError.SUCCESS);
// trigger leader snapshot
triggerLeaderSnapshot(cluster, leader);
// apply something more
sendTestTaskAndWait(leader, 20, RaftError.SUCCESS);
triggerLeaderSnapshot(cluster, leader, 2);
// wait leader to compact logs
Thread.sleep(50);
//restart follower.
cluster.clean(followerAddr);
assertTrue(cluster.start(findById(peers, followerAddr), false, 300));
cluster.ensureSame();
assertEquals(3, cluster.getFsms().size());
for (MockStateMachine fsm : cluster.getFsms())
assertEquals(30, fsm.getLogs().size(), fsm.getPeerId().toString());
}
@Test
public void testNoSnapshot() throws Exception {
TestPeer peer = new TestPeer(testInfo, TestUtils.INIT_PORT);
NodeOptions nodeOptions = createNodeOptions(0);
MockStateMachine fsm = new MockStateMachine(peer.getPeerId());
nodeOptions.setFsm(fsm);
nodeOptions.setRaftMetaUri(dataPath + File.separator + "meta");
nodeOptions.setInitialConf(new Configuration(Collections.singletonList(peer.getPeerId())));
RaftGroupService service = createService("unittest", peer, nodeOptions, List.of());
Node node = service.start();
// wait node elect self as leader
Thread.sleep(2000);
sendTestTaskAndWait(node);
assertEquals(0, fsm.getSaveSnapshotTimes());
// do snapshot but returns error
CountDownLatch latch = new CountDownLatch(1);
node.snapshot(new ExpectClosure(RaftError.EINVAL, "Snapshot is not supported", latch));
waitLatch(latch);
assertEquals(0, fsm.getSaveSnapshotTimes());
}
@Test
public void testAutoSnapshot() throws Exception {
TestPeer peer = new TestPeer(testInfo, TestUtils.INIT_PORT);
NodeOptions nodeOptions = createNodeOptions(0);
MockStateMachine fsm = new MockStateMachine(peer.getPeerId());
nodeOptions.setFsm(fsm);
nodeOptions.setSnapshotUri(dataPath + File.separator + "snapshot");
nodeOptions.setRaftMetaUri(dataPath + File.separator + "meta");
nodeOptions.setSnapshotIntervalSecs(10);
nodeOptions.setInitialConf(new Configuration(Collections.singletonList(peer.getPeerId())));
RaftGroupService service = createService("unittest", peer, nodeOptions, List.of());
Node node = service.start();
// wait node elect self as leader
Thread.sleep(2000);
sendTestTaskAndWait(node);
// wait for auto snapshot
Thread.sleep(10000);
// first snapshot will be triggered randomly
int times = fsm.getSaveSnapshotTimes();
assertTrue(times >= 1, "snapshotTimes=" + times);
assertTrue(fsm.getSnapshotIndex() > 0);
}
@Test
public void testLeaderShouldNotChange() throws Exception {
List<TestPeer> peers = TestUtils.generatePeers(testInfo, 3);
cluster = new TestCluster("unitest", dataPath, peers, testInfo);
for (TestPeer peer : peers)
assertTrue(cluster.start(peer));
// get leader
Node leader0 = cluster.waitAndGetLeader();
assertNotNull(leader0);
long savedTerm = ((NodeImpl) leader0).getCurrentTerm();
log.info("Current leader is {}, term is {}", leader0, savedTerm);
Thread.sleep(5000);
Node leader1 = cluster.waitAndGetLeader();
assertNotNull(leader1);
log.info("Current leader is {}", leader1);
assertEquals(savedTerm, ((NodeImpl) leader1).getCurrentTerm());
}
@Test
public void testRecoverFollower() throws Exception {
List<TestPeer> peers = TestUtils.generatePeers(testInfo, 3);
cluster = new TestCluster("unitest", dataPath, peers, testInfo);
for (TestPeer peer : peers)
assertTrue(cluster.start(peer));
Node leader = cluster.waitAndGetLeader();
assertNotNull(leader);
cluster.ensureLeader(leader);
List<Node> followers = cluster.getFollowers();
assertEquals(2, followers.size());
PeerId followerAddr = followers.get(0).getNodeId().getPeerId().copy();
assertTrue(cluster.stop(followerAddr));
sendTestTaskAndWait(leader);
for (int i = 10; i < 30; i++) {
ByteBuffer data = ByteBuffer.wrap(("no cluster" + i).getBytes(UTF_8));
Task task = new Task(data, null);
leader.apply(task);
}
// wait leader to compact logs
Thread.sleep(5000);
// restart follower
assertTrue(cluster.start(findById(peers, followerAddr)));
cluster.ensureSame();
assertEquals(3, cluster.getFsms().size());
for (MockStateMachine fsm : cluster.getFsms())
assertEquals(30, fsm.getLogs().size());
}
@Test
public void testLeaderTransfer() throws Exception {
List<TestPeer> peers = TestUtils.generatePeers(testInfo, 3);
cluster = new TestCluster("unitest", dataPath, peers, ELECTION_TIMEOUT_MILLIS, testInfo);
for (TestPeer peer : peers)
assertTrue(cluster.start(peer));
Node leader = cluster.waitAndGetLeader();
assertNotNull(leader);
sendTestTaskAndWait(leader);
Thread.sleep(100);
List<Node> followers = cluster.getFollowers();
assertEquals(2, followers.size());
PeerId targetPeer = followers.get(0).getNodeId().getPeerId().copy();
log.info("Transfer leadership from {} to {}", leader, targetPeer);
assertTrue(leader.transferLeadershipTo(targetPeer).isOk());
leader = cluster.waitAndGetLeader();
assertEquals(leader.getNodeId().getPeerId(), targetPeer);
}
@Test
public void testLeaderTransferBeforeLogIsCompleted() throws Exception {
List<TestPeer> peers = TestUtils.generatePeers(testInfo, 3);
cluster = new TestCluster("unitest", dataPath, peers, ELECTION_TIMEOUT_MILLIS, testInfo);
for (TestPeer peer : peers)
assertTrue(cluster.start(peer, false, 1));
Node leader = cluster.waitAndGetLeader();
assertNotNull(leader);
cluster.ensureLeader(leader);
List<Node> followers = cluster.getFollowers();
assertEquals(2, followers.size());
PeerId targetPeer = followers.get(0).getNodeId().getPeerId().copy();
assertTrue(cluster.stop(targetPeer));
sendTestTaskAndWait(leader);
log.info("Transfer leadership from {} to {}", leader, targetPeer);
assertTrue(leader.transferLeadershipTo(targetPeer).isOk());
CountDownLatch latch = new CountDownLatch(1);
Task task = new Task(ByteBuffer.wrap("aaaaa".getBytes(UTF_8)), new ExpectClosure(RaftError.EBUSY, latch));
leader.apply(task);
waitLatch(latch);
cluster.waitAndGetLeader();
assertTrue(cluster.start(findById(peers, targetPeer)));
leader = cluster.getLeader();
assertNotEquals(targetPeer, leader.getNodeId().getPeerId());
cluster.ensureSame();
}
@Test
public void testLeaderTransferResumeOnFailure() throws Exception {
List<TestPeer> peers = TestUtils.generatePeers(testInfo, 3);
cluster = new TestCluster("unitest", dataPath, peers, ELECTION_TIMEOUT_MILLIS, testInfo);
for (TestPeer peer : peers)
assertTrue(cluster.start(peer, false, 1));
Node leader = cluster.waitAndGetLeader();
assertNotNull(leader);
cluster.ensureLeader(leader);
List<Node> followers = cluster.getFollowers();
assertEquals(2, followers.size());
PeerId targetPeer = followers.get(0).getNodeId().getPeerId().copy();
assertTrue(cluster.stop(targetPeer));
sendTestTaskAndWait(leader);
assertTrue(leader.transferLeadershipTo(targetPeer).isOk());
Node savedLeader = leader;
//try to apply task when transferring leadership
CountDownLatch latch = new CountDownLatch(1);
Task task = new Task(ByteBuffer.wrap("aaaaa".getBytes(UTF_8)), new ExpectClosure(RaftError.EBUSY, latch));
leader.apply(task);
waitLatch(latch);
Thread.sleep(100);
leader = cluster.waitAndGetLeader();
assertSame(leader, savedLeader);
// restart target peer
assertTrue(cluster.start(findById(peers, targetPeer)));
Thread.sleep(100);
// retry apply task
latch = new CountDownLatch(1);
task = new Task(ByteBuffer.wrap("aaaaa".getBytes(UTF_8)), new ExpectClosure(latch));
leader.apply(task);
waitLatch(latch);
cluster.ensureSame();
}
/**
* mock state machine that fails to load snapshot.
*/
static class MockFSM1 extends MockStateMachine {
MockFSM1(PeerId peerId) {
super(peerId);
}
/** {@inheritDoc} */
@Override
public boolean onSnapshotLoad(SnapshotReader reader) {
return false;
}
}
@Test
public void testShutdownAndJoinWorkAfterInitFails() throws Exception {
TestPeer peer = new TestPeer(testInfo, TestUtils.INIT_PORT);
{
NodeOptions nodeOptions = createNodeOptions(0);
MockStateMachine fsm = new MockStateMachine(peer.getPeerId());
nodeOptions.setFsm(fsm);
nodeOptions.setSnapshotUri(dataPath + File.separator + "snapshot");
nodeOptions.setRaftMetaUri(dataPath + File.separator + "meta");
nodeOptions.setSnapshotIntervalSecs(10);
nodeOptions.setInitialConf(new Configuration(Collections.singletonList(peer.getPeerId())));
RaftGroupService service = createService("unittest", peer, nodeOptions, List.of());
Node node = service.start();
Thread.sleep(1000);
sendTestTaskAndWait(node);
// save snapshot
CountDownLatch latch = new CountDownLatch(1);
node.snapshot(new ExpectClosure(latch));
waitLatch(latch);
service.shutdown();
}
{
NodeOptions nodeOptions = createNodeOptions(1);
MockStateMachine fsm = new MockFSM1(peer.getPeerId());
nodeOptions.setFsm(fsm);
nodeOptions.setSnapshotUri(dataPath + File.separator + "snapshot");
nodeOptions.setRaftMetaUri(dataPath + File.separator + "meta");
nodeOptions.setSnapshotIntervalSecs(10);
nodeOptions.setInitialConf(new Configuration(Collections.singletonList(peer.getPeerId())));
RaftGroupService service = createService("unittest", peer, nodeOptions, List.of());
try {
service.start();
fail();
}
catch (Exception e) {
// Expected.
}
}
}
/**
* 4.2.2 Removing the current leader
*
* @throws Exception If failed.
*/
@Test
public void testShuttingDownLeaderTriggerTimeoutNow() throws Exception {
List<TestPeer> peers = TestUtils.generatePeers(testInfo, 3);
cluster = new TestCluster("unitest", dataPath, peers, ELECTION_TIMEOUT_MILLIS, testInfo);
for (TestPeer peer : peers)
assertTrue(cluster.start(peer));
Node leader = cluster.waitAndGetLeader();
assertNotNull(leader);
Node oldLeader = leader;
log.info("Shutdown leader {}", leader);
leader.shutdown();
leader.join();
leader = cluster.waitAndGetLeader();
assertNotNull(leader);
assertNotSame(leader, oldLeader);
}
@Test
public void testRemovingLeaderTriggerTimeoutNow() throws Exception {
List<TestPeer> peers = TestUtils.generatePeers(testInfo, 3);
cluster = new TestCluster("unitest", dataPath, peers, ELECTION_TIMEOUT_MILLIS, testInfo);
for (TestPeer peer : peers)
assertTrue(cluster.start(peer));
cluster.waitAndGetLeader();
// Ensure the quorum before removing a leader, otherwise removePeer can be rejected.
for (Node follower : cluster.getFollowers())
assertTrue(waitForCondition(() -> follower.getLeaderId() != null, 5_000));
Node leader = cluster.getLeader();
assertNotNull(leader);
Node oldLeader = leader;
CountDownLatch latch = new CountDownLatch(1);
oldLeader.removePeer(oldLeader.getNodeId().getPeerId(), new ExpectClosure(latch));
waitLatch(latch);
leader = cluster.waitAndGetLeader();
assertNotNull(leader);
assertNotSame(leader, oldLeader);
}
@Test
public void testTransferShouldWorkAfterInstallSnapshot() throws Exception {
List<TestPeer> peers = TestUtils.generatePeers(testInfo, 3);
cluster = new TestCluster("unitest", dataPath, peers, ELECTION_TIMEOUT_MILLIS, testInfo);
for (int i = 0; i < peers.size() - 1; i++)
assertTrue(cluster.start(peers.get(i)));
Node leader = cluster.waitAndGetLeader();
assertNotNull(leader);
sendTestTaskAndWait(leader);
List<Node> followers = cluster.getFollowers();
assertEquals(1, followers.size());
PeerId follower = followers.get(0).getNodeId().getPeerId();
assertTrue(leader.transferLeadershipTo(follower).isOk());
leader = cluster.waitAndGetLeader();
assertEquals(follower, leader.getNodeId().getPeerId());
CountDownLatch latch = new CountDownLatch(1);
leader.snapshot(new ExpectClosure(latch));
waitLatch(latch);
latch = new CountDownLatch(1);
leader.snapshot(new ExpectClosure(latch));
waitLatch(latch);
// start the last peer which should be recover with snapshot.
TestPeer lastPeer = peers.get(2);
assertTrue(cluster.start(lastPeer));
Thread.sleep(5000);
assertTrue(leader.transferLeadershipTo(lastPeer.getPeerId()).isOk());
Thread.sleep(2000);
leader = cluster.getLeader();
assertEquals(lastPeer.getPeerId(), leader.getNodeId().getPeerId());
assertEquals(3, cluster.getFsms().size());
for (MockStateMachine fsm : cluster.getFsms())
assertEquals(10, fsm.getLogs().size());
}
@Test
public void testAppendEntriesWhenFollowerIsInErrorState() throws Exception {
// start five nodes
List<TestPeer> peers = TestUtils.generatePeers(testInfo, 5);
cluster = new TestCluster("unitest", dataPath, peers, ELECTION_TIMEOUT_MILLIS, testInfo);
for (TestPeer peer : peers)
assertTrue(cluster.start(peer));
Node oldLeader = cluster.waitAndGetLeader();
assertNotNull(oldLeader);
// apply something
sendTestTaskAndWait(oldLeader);
// set one follower into error state
List<Node> followers = cluster.getFollowers();
assertEquals(4, followers.size());
Node errorNode = followers.get(0);
PeerId errorPeer = errorNode.getNodeId().getPeerId().copy();
log.info("Set follower {} into error state", errorNode);
((NodeImpl) errorNode).onError(new RaftException(EnumOutter.ErrorType.ERROR_TYPE_STATE_MACHINE, new Status(-1,
"Follower has something wrong.")));
// increase term by stopping leader and electing a new leader again
PeerId oldLeaderAddr = oldLeader.getNodeId().getPeerId().copy();
assertTrue(cluster.stop(oldLeaderAddr));
Node leader = cluster.waitAndGetLeader();
assertNotNull(leader);
log.info("Elect a new leader {}", leader);
// apply something again
sendTestTaskAndWait(leader, 10, RaftError.SUCCESS);
// stop error follower
Thread.sleep(20);
log.info("Stop error follower {}", errorNode);
assertTrue(cluster.stop(errorPeer));
// restart error and old leader
log.info("Restart error follower {} and old leader {}", errorPeer, oldLeaderAddr);
assertTrue(cluster.start(findById(peers, errorPeer)));
assertTrue(cluster.start(findById(peers, oldLeaderAddr)));
cluster.ensureSame();
assertEquals(5, cluster.getFsms().size());
for (MockStateMachine fsm : cluster.getFsms())
assertEquals(20, fsm.getLogs().size());
}
@Test
@Disabled("https://issues.apache.org/jira/browse/IGNITE-21792")
public void testFollowerStartStopFollowing() throws Exception {
// start five nodes
List<TestPeer> peers = TestUtils.generatePeers(testInfo, 5);
cluster = new TestCluster("unitest", dataPath, peers, ELECTION_TIMEOUT_MILLIS, testInfo);
for (TestPeer peer : peers)
assertTrue(cluster.start(peer));
Node firstLeader = cluster.waitAndGetLeader();
assertNotNull(firstLeader);
cluster.ensureLeader(firstLeader);
// apply something
sendTestTaskAndWait(firstLeader);
// assert follow times
List<Node> firstFollowers = cluster.getFollowers();
assertEquals(4, firstFollowers.size());
for (Node node : firstFollowers) {
assertTrue(
waitForCondition(() -> ((MockStateMachine) node.getOptions().getFsm()).getOnStartFollowingTimes() == 1, 5_000));
assertEquals(0, ((MockStateMachine) node.getOptions().getFsm()).getOnStopFollowingTimes());
}
// stop leader and elect new one
PeerId fstLeaderAddr = firstLeader.getNodeId().getPeerId();
assertTrue(cluster.stop(fstLeaderAddr));
Node secondLeader = cluster.waitAndGetLeader();
assertNotNull(secondLeader);
sendTestTaskAndWait(secondLeader, 10, RaftError.SUCCESS);
// ensure start/stop following times
List<Node> secondFollowers = cluster.getFollowers();
assertEquals(3, secondFollowers.size());
for (Node node : secondFollowers) {
assertTrue(
waitForCondition(() -> ((MockStateMachine) node.getOptions().getFsm()).getOnStartFollowingTimes() == 2, 5_000));
assertEquals(1, ((MockStateMachine) node.getOptions().getFsm()).getOnStopFollowingTimes());
}
// transfer leadership to a follower
PeerId targetPeer = secondFollowers.get(0).getNodeId().getPeerId().copy();
assertTrue(secondLeader.transferLeadershipTo(targetPeer).isOk());
Thread.sleep(100);
Node thirdLeader = cluster.waitAndGetLeader();
assertEquals(targetPeer, thirdLeader.getNodeId().getPeerId());
sendTestTaskAndWait(thirdLeader, 20, RaftError.SUCCESS);
List<Node> thirdFollowers = cluster.getFollowers();
assertEquals(3, thirdFollowers.size());
for (int i = 0; i < 3; i++) {
Node follower = thirdFollowers.get(i);
if (follower.getNodeId().getPeerId().equals(secondLeader.getNodeId().getPeerId())) {
assertTrue(
waitForCondition(() -> ((MockStateMachine) follower.getOptions().getFsm()).getOnStartFollowingTimes() == 2, 5_000));
assertEquals(1,
((MockStateMachine) follower.getOptions().getFsm()).getOnStopFollowingTimes());
continue;
}
assertTrue(waitForCondition(() -> ((MockStateMachine) follower.getOptions().getFsm()).getOnStartFollowingTimes() == 3, 5_000));
assertEquals(2, ((MockStateMachine) follower.getOptions().getFsm()).getOnStopFollowingTimes());
}
cluster.ensureSame();
}
@Test
public void testLeaderPropagatedBeforeVote() throws Exception {
// start five nodes
List<TestPeer> peers = TestUtils.generatePeers(testInfo, 3);
cluster = new TestCluster("unitest", dataPath, peers, 3_000, testInfo);
for (TestPeer peer : peers) {
RaftOptions opts = new RaftOptions();
opts.setElectionHeartbeatFactor(4); // Election timeout divisor.
assertTrue(cluster.start(peer, false, 300, false, null, opts));
}
List<NodeImpl> nodes = cluster.getNodes();
AtomicReference<String> guard = new AtomicReference();
// Block only one vote message.
for (NodeImpl node : nodes) {
RpcClientEx rpcClientEx = sender(node);
rpcClientEx.recordMessages((msg, nodeId) -> true);
rpcClientEx.blockMessages((msg, nodeId) -> {
if (msg instanceof RpcRequests.RequestVoteRequest) {
RpcRequests.RequestVoteRequest msg0 = (RpcRequests.RequestVoteRequest)msg;
if (msg0.preVote())
return false;
if (guard.compareAndSet(null, nodeId))
return true;
}
if (msg instanceof RpcRequests.AppendEntriesRequest && nodeId.equals(guard.get())) {
RpcRequests.AppendEntriesRequest tmp = (RpcRequests.AppendEntriesRequest) msg;
if (tmp.entriesList() != null && !tmp.entriesList().isEmpty()) {
return true;
}
}
return false;
});
}
Node leader = cluster.waitAndGetLeader();
cluster.ensureLeader(leader);
RpcClientEx client = sender(leader);
client.stopBlock(1); // Unblock vote message.
// The follower shouldn't stop following on receiving stale vote request.
Node follower = cluster.getNode(PeerId.parsePeer(guard.get()));
boolean res =
waitForCondition(() -> ((MockStateMachine) follower.getOptions().getFsm()).getOnStopFollowingTimes() != 0, 1_000);
assertFalse(res, "The follower shouldn't stop following");
}
@Test
public void readCommittedUserLog() throws Exception {
// setup cluster
List<TestPeer> peers = TestUtils.generatePeers(testInfo, 3);
cluster = new TestCluster("unitest", dataPath, peers, ELECTION_TIMEOUT_MILLIS, testInfo);
for (TestPeer peer : peers)
assertTrue(cluster.start(peer));
Node leader = cluster.waitAndGetLeader();
assertNotNull(leader);
cluster.ensureLeader(leader);
int amount = 10;
sendTestTaskAndWait(leader, amount);
assertTrue(waitForCondition(() -> {
try {
// index == 1 is a CONFIGURATION log
UserLog userLog = leader.readCommittedUserLog(1 + amount);
return userLog != null;
} catch (Exception ignore) {
// There is a gap between task is applied to FSM and FSMCallerImpl.lastAppliedIndex
// is updated, so we need to wait.
return false;
}
}, 10_000));
// index == 1 is a CONFIGURATION log, so real_index will be 2 when returned.
UserLog userLog = leader.readCommittedUserLog(1);
assertNotNull(userLog);
assertEquals(2, userLog.getIndex());
assertEquals("hello0", stringFromBytes(byteBufferToByteArray(userLog.getData())));
// index == 5 is a DATA log(a user log)
userLog = leader.readCommittedUserLog(5);
assertNotNull(userLog);
assertEquals(5, userLog.getIndex());
assertEquals("hello3", stringFromBytes(byteBufferToByteArray(userLog.getData())));
// index == 15 is greater than last_committed_index
try {
assertNull(leader.readCommittedUserLog(15));
fail();
}
catch (LogIndexOutOfBoundsException e) {
assertEquals("Request index 15 is greater than lastAppliedIndex: 11", e.getMessage());
}
// index == 0 invalid request
try {
assertNull(leader.readCommittedUserLog(0));
fail();
}
catch (LogIndexOutOfBoundsException e) {
assertEquals("Request index is invalid: 0", e.getMessage());
}
log.info("Trigger leader snapshot");
CountDownLatch latch = new CountDownLatch(1);
leader.snapshot(new ExpectClosure(latch));
waitLatch(latch);
// remove and add a peer to add two CONFIGURATION logs
List<Node> followers = cluster.getFollowers();
assertEquals(2, followers.size());
Node testFollower = followers.get(0);
latch = new CountDownLatch(1);
leader.removePeer(testFollower.getNodeId().getPeerId(), new ExpectClosure(latch));
waitLatch(latch);
latch = new CountDownLatch(1);
leader.addPeer(testFollower.getNodeId().getPeerId(), new ExpectClosure(latch));
waitLatch(latch);
sendTestTaskAndWait(leader, amount, RaftError.SUCCESS);
// trigger leader snapshot for the second time, after this the log of index 1~11 will be deleted.
log.info("Trigger leader snapshot");
latch = new CountDownLatch(1);
leader.snapshot(new ExpectClosure(latch));
waitLatch(latch);
Thread.sleep(100);
// index == 5 log has been deleted in log_storage.
try {
leader.readCommittedUserLog(5);
fail();
}
catch (LogNotFoundException e) {
assertEquals("User log is deleted at index: 5", e.getMessage());
}
// index == 12、index == 13、index=14、index=15 are 4 CONFIGURATION logs(joint consensus), so real_index will be 16 when returned.
userLog = leader.readCommittedUserLog(12);
assertNotNull(userLog);
assertEquals(16, userLog.getIndex());
assertEquals("hello10", stringFromBytes(byteBufferToByteArray(userLog.getData())));
// now index == 17 is a user log
userLog = leader.readCommittedUserLog(17);
assertNotNull(userLog);
assertEquals(17, userLog.getIndex());
assertEquals("hello11", stringFromBytes(byteBufferToByteArray(userLog.getData())));
cluster.ensureSame();
assertEquals(3, cluster.getFsms().size());
for (MockStateMachine fsm : cluster.getFsms()) {
assertEquals(20, fsm.getLogs().size());
for (int i = 0; i < 20; i++)
assertEquals("hello" + i, stringFromBytes(byteBufferToByteArray(fsm.getLogs().get(i))));
}
}
@Test
public void testBootStrapWithSnapshot() throws Exception {
TestPeer peer = new TestPeer(testInfo, 5006);
MockStateMachine fsm = new MockStateMachine(peer.getPeerId());
Path path = Path.of(dataPath, "node0", "log");
Files.createDirectories(path);
for (char ch = 'a'; ch <= 'z'; ch++)
fsm.getLogs().add(ByteBuffer.wrap(new byte[] {(byte) ch}));
BootstrapOptions opts = new BootstrapOptions();
DefaultLogStorageFactory logStorageProvider = new DefaultLogStorageFactory(path);
logStorageProvider.start();
opts.setServiceFactory(new IgniteJraftServiceFactory(logStorageProvider));
opts.setLastLogIndex(fsm.getLogs().size());
opts.setRaftMetaUri(dataPath + File.separator + "meta");
opts.setSnapshotUri(dataPath + File.separator + "snapshot");
opts.setLogUri("test");
opts.setGroupConf(JRaftUtils.getConfiguration(peer.getPeerId().toString()));
opts.setFsm(fsm);
assertTrue(JRaftUtils.bootstrap(opts));
logStorageProvider.close();
NodeOptions nodeOpts = new NodeOptions();
nodeOpts.setRaftMetaUri(dataPath + File.separator + "meta");
nodeOpts.setSnapshotUri(dataPath + File.separator + "snapshot");
nodeOpts.setLogUri("test");
DefaultLogStorageFactory log2 = new DefaultLogStorageFactory(path);
log2.start();
nodeOpts.setServiceFactory(new IgniteJraftServiceFactory(log2));
nodeOpts.setFsm(fsm);
RaftGroupService service = createService("test", peer, nodeOpts, List.of());
Node node = service.start();
assertEquals(26, fsm.getLogs().size());
for (int i = 0; i < 26; i++)
assertEquals('a' + i, fsm.getLogs().get(i).get());
// Group configuration will be restored from snapshot meta.
while (!node.isLeader())
Thread.sleep(20);
sendTestTaskAndWait(node);
assertEquals(36, fsm.getLogs().size());
}
@Test
public void testBootStrapWithoutSnapshot() throws Exception {
TestPeer peer = new TestPeer(testInfo, 5006);
MockStateMachine fsm = new MockStateMachine(peer.getPeerId());
Path path = Path.of(dataPath, "node0", "log");
Files.createDirectories(path);
BootstrapOptions opts = new BootstrapOptions();
DefaultLogStorageFactory logStorageProvider = new DefaultLogStorageFactory(path);
logStorageProvider.start();
opts.setServiceFactory(new IgniteJraftServiceFactory(logStorageProvider));
opts.setLastLogIndex(0);
opts.setRaftMetaUri(dataPath + File.separator + "meta");
opts.setSnapshotUri(dataPath + File.separator + "snapshot");
opts.setLogUri("test");
opts.setGroupConf(JRaftUtils.getConfiguration(peer.getPeerId().toString()));
opts.setFsm(fsm);
assertTrue(JRaftUtils.bootstrap(opts));
logStorageProvider.close();
NodeOptions nodeOpts = new NodeOptions();
nodeOpts.setRaftMetaUri(dataPath + File.separator + "meta");
nodeOpts.setSnapshotUri(dataPath + File.separator + "snapshot");
nodeOpts.setLogUri("test");
nodeOpts.setFsm(fsm);
DefaultLogStorageFactory log2 = new DefaultLogStorageFactory(path);
log2.start();
nodeOpts.setServiceFactory(new IgniteJraftServiceFactory(log2));
RaftGroupService service = createService("test", peer, nodeOpts, List.of());
Node node = service.start();
while (!node.isLeader())
Thread.sleep(20);
sendTestTaskAndWait(node);
assertEquals(10, fsm.getLogs().size());
}
@Test
public void testChangePeers() throws Exception {
changePeers(false);
}
@Test
public void testChangeAsyncPeers() throws Exception {
changePeers(true);
}
private void changePeers(boolean async) throws Exception {
TestPeer peer0 = new TestPeer(testInfo, TestUtils.INIT_PORT);
cluster = new TestCluster("testChangePeers", dataPath, Collections.singletonList(peer0), testInfo);
assertTrue(cluster.start(peer0));
Node leader = cluster.waitAndGetLeader();
sendTestTaskAndWait(leader);
List<TestPeer> peers = new ArrayList<>();
peers.add(peer0);
int numPeers = 10;
for (int i = 1; i < numPeers; i++) {
TestPeer peer = new TestPeer(testInfo, TestUtils.INIT_PORT + i);
peers.add(peer);
assertTrue(cluster.start(peer, false, 300));
}
waitForTopologyOnEveryNode(numPeers, cluster);
for (int i = 0; i < 9; i++) {
leader = cluster.waitAndGetLeader();
assertNotNull(leader);
PeerId leaderPeer = peers.get(i).getPeerId();
assertEquals(leaderPeer, leader.getNodeId().getPeerId());
PeerId newLeaderPeer = peers.get(i + 1).getPeerId();
if (async) {
SynchronizedClosure done = new SynchronizedClosure();
leader.changePeersAsync(new Configuration(Collections.singletonList(newLeaderPeer)),
leader.getCurrentTerm(), done);
Status status = done.await();
assertTrue(status.isOk(), status.getRaftError().toString());
assertTrue(waitForCondition(() -> {
if (cluster.getLeader() != null) {
return newLeaderPeer.equals(cluster.getLeader().getLeaderId());
}
return false;
}, 10_000));
} else {
SynchronizedClosure done = new SynchronizedClosure();
leader.changePeers(new Configuration(Collections.singletonList(newLeaderPeer)), done);
Status status = done.await();
assertTrue(status.isOk(), status.getRaftError().toString());
}
}
cluster.waitAndGetLeader();
for (MockStateMachine fsm : cluster.getFsms()) {
assertEquals(10, fsm.getLogs().size());
}
}
@Test
public void testOnReconfigurationErrorListener() throws Exception {
TestPeer peer0 = new TestPeer(testInfo, TestUtils.INIT_PORT);
cluster = new TestCluster("testChangePeers", dataPath, Collections.singletonList(peer0), testInfo);
var raftGrpEvtsLsnr = mock(JraftGroupEventsListener.class);
cluster.setRaftGrpEvtsLsnr(raftGrpEvtsLsnr);
assertTrue(cluster.start(peer0));
Node leader = cluster.waitAndGetLeader();
sendTestTaskAndWait(leader);
verify(raftGrpEvtsLsnr, never()).onNewPeersConfigurationApplied(any(), any());
PeerId newPeer = new TestPeer(testInfo, TestUtils.INIT_PORT + 1).getPeerId();
SynchronizedClosure done = new SynchronizedClosure();
leader.changePeersAsync(new Configuration(Collections.singletonList(newPeer)),
leader.getCurrentTerm(), done);
assertEquals(done.await(), Status.OK());
verify(raftGrpEvtsLsnr, timeout(10_000))
.onReconfigurationError(argThat(st -> st.getRaftError() == RaftError.ECATCHUP), any(), any(), anyLong());
}
@Test
public void testNewPeersConfigurationAppliedListener() throws Exception {
TestPeer peer0 = new TestPeer(testInfo, TestUtils.INIT_PORT);
cluster = new TestCluster("testChangePeers", dataPath, Collections.singletonList(peer0), testInfo);
var raftGrpEvtsLsnr = mock(JraftGroupEventsListener.class);
cluster.setRaftGrpEvtsLsnr(raftGrpEvtsLsnr);
assertTrue(cluster.start(peer0));
Node leader = cluster.waitAndGetLeader();
sendTestTaskAndWait(leader);
List<TestPeer> peers = new ArrayList<>();
peers.add(peer0);
List<TestPeer> learners = new ArrayList<>();
int numPeers = 5;
for (int i = 1; i < numPeers; i++) {
TestPeer peer = new TestPeer(testInfo, TestUtils.INIT_PORT + i);
peers.add(peer);
assertTrue(cluster.start(peer, false, 300));
TestPeer learner = new TestPeer(testInfo, TestUtils.INIT_PORT + i + numPeers);
learners.add(learner);
}
verify(raftGrpEvtsLsnr, never()).onNewPeersConfigurationApplied(any(), any());
// Wait until every node sees every other node, otherwise
// changePeersAsync can fail.
waitForTopologyOnEveryNode(numPeers, cluster);
for (int i = 0; i < 4; i++) {
leader = cluster.getLeader();
assertNotNull(leader);
PeerId peer = peers.get(i).getPeerId();
assertEquals(peer, leader.getNodeId().getPeerId());
PeerId newPeer = peers.get(i + 1).getPeerId();
PeerId newLearner = learners.get(i).getPeerId();
SynchronizedClosure done = new SynchronizedClosure();
leader.changePeersAsync(new Configuration(List.of(newPeer), List.of(newLearner)), leader.getCurrentTerm(), done);
assertEquals(done.await(), Status.OK());
assertTrue(waitForCondition(() -> {
if (cluster.getLeader() != null) {
return newPeer.equals(cluster.getLeader().getLeaderId());
}
return false;
}, 10_000));
verify(raftGrpEvtsLsnr, times(1)).onNewPeersConfigurationApplied(List.of(newPeer), List.of(newLearner));
}
}
@Test
public void testChangePeersOnLeaderElected() throws Exception {
List<TestPeer> peers = IntStream.range(0, 6)
.mapToObj(i -> new TestPeer(testInfo, TestUtils.INIT_PORT + i))
.collect(toList());
cluster = new TestCluster("testChangePeers", dataPath, peers, testInfo);
var raftGrpEvtsLsnr = mock(JraftGroupEventsListener.class);
cluster.setRaftGrpEvtsLsnr(raftGrpEvtsLsnr);
for (TestPeer p : peers) {
assertTrue(cluster.start(p, false, 300));
}
cluster.waitAndGetLeader();
verify(raftGrpEvtsLsnr, times(1)).onLeaderElected(anyLong());
cluster.stop(cluster.getLeader().getLeaderId());
cluster.waitAndGetLeader();
verify(raftGrpEvtsLsnr, times(2)).onLeaderElected(anyLong());
cluster.stop(cluster.getLeader().getLeaderId());
cluster.waitAndGetLeader();
verify(raftGrpEvtsLsnr, times(3)).onLeaderElected(anyLong());
}
@Test
public void changePeersAsyncResponses() throws Exception {
TestPeer peer0 = new TestPeer(testInfo, TestUtils.INIT_PORT);
cluster = new TestCluster("testChangePeers", dataPath, Collections.singletonList(peer0), testInfo);
assertTrue(cluster.start(peer0));
Node leader = cluster.waitAndGetLeader();
sendTestTaskAndWait(leader);
TestPeer peer = new TestPeer(testInfo, TestUtils.INIT_PORT + 1);
assertTrue(cluster.start(peer, false, 300));
leader = cluster.waitAndGetLeader();
assertNotNull(leader);
PeerId leaderPeer = peer0.getPeerId();
assertEquals(leaderPeer, leader.getNodeId().getPeerId());
TestPeer newLeaderPeer = new TestPeer(testInfo, peer0.getPort() + 1);
// wrong leader term, do nothing
SynchronizedClosure done = new SynchronizedClosure();
leader.changePeersAsync(new Configuration(Collections.singletonList(newLeaderPeer.getPeerId())),
leader.getCurrentTerm() - 1, done);
assertEquals(done.await(), Status.OK());
// the same config, do nothing
done = new SynchronizedClosure();
leader.changePeersAsync(new Configuration(Collections.singletonList(leaderPeer)),
leader.getCurrentTerm(), done);
assertEquals(done.await(), Status.OK());
// change peer to new conf containing only new node
done = new SynchronizedClosure();
leader.changePeersAsync(new Configuration(Collections.singletonList(newLeaderPeer.getPeerId())),
leader.getCurrentTerm(), done);
assertEquals(done.await(), Status.OK());
assertTrue(waitForCondition(() -> {
if (cluster.getLeader() != null)
return newLeaderPeer.getPeerId().equals(cluster.getLeader().getLeaderId());
return false;
}, 10_000));
for (MockStateMachine fsm : cluster.getFsms()) {
assertEquals(10, fsm.getLogs().size());
}
// check concurrent start of two async change peers.
Node newLeader = cluster.getLeader();
sendTestTaskAndWait(newLeader);
ExecutorService executor = Executors.newFixedThreadPool(10);
List<SynchronizedClosure> dones = new ArrayList<>();
List<Future> futs = new ArrayList<>();
for (int i = 0; i < 2; i++) {
SynchronizedClosure newDone = new SynchronizedClosure();
dones.add(newDone);
futs.add(executor.submit(() -> {
newLeader.changePeersAsync(new Configuration(Collections.singletonList(peer0.getPeerId())), 2, newDone);
}));
}
futs.get(0).get();
futs.get(1).get();
Status firstDoneStatus = dones.get(0).await();
Status secondDoneStatus = dones.get(1).await();
assertThat(
List.of(firstDoneStatus.getRaftError(), secondDoneStatus.getRaftError()),
containsInAnyOrder(RaftError.SUCCESS, RaftError.EBUSY)
);
assertTrue(waitForCondition(() -> {
if (cluster.getLeader() != null)
return peer0.getPeerId().equals(cluster.getLeader().getLeaderId());
return false;
}, 10_000));
for (MockStateMachine fsm : cluster.getFsms()) {
assertEquals(20, fsm.getLogs().size());
}
}
@Test
public void testChangePeersAddMultiNodes() throws Exception {
List<TestPeer> peers = new ArrayList<>();
TestPeer peer0 = new TestPeer(testInfo, TestUtils.INIT_PORT);
peers.add(peer0);
cluster = new TestCluster("testChangePeersAddMultiNodes", dataPath, Collections.singletonList(peer0), testInfo);
assertTrue(cluster.start(peer0));
Node leader = cluster.waitAndGetLeader();
sendTestTaskAndWait(leader);
Configuration conf = new Configuration();
conf.addPeer(peer0.getPeerId());
for (int i = 1; i < 3; i++) {
TestPeer peer = new TestPeer(testInfo, TestUtils.INIT_PORT + i);
peers.add(peer);
conf.addPeer(peer.getPeerId());
}
TestPeer peer = peers.get(1);
// fail, because the peers are not started.
SynchronizedClosure done = new SynchronizedClosure();
leader.changePeers(new Configuration(Collections.singletonList(peer.getPeerId())), done);
assertEquals(RaftError.ECATCHUP, done.await().getRaftError());
// start peer1
assertTrue(cluster.start(peer));
// still fail, because peer2 is not started
done.reset();
leader.changePeers(conf, done);
assertEquals(RaftError.ECATCHUP, done.await().getRaftError());
// start peer2
peer = peers.get(2);
assertTrue(cluster.start(peer));
done.reset();
// works
leader.changePeers(conf, done);
Status await = done.await();
assertTrue(await.isOk(), await.getErrorMsg());
cluster.ensureSame();
assertEquals(3, cluster.getFsms().size());
for (MockStateMachine fsm : cluster.getFsms())
assertEquals(10, fsm.getLogs().size());
}
@Test
public void testChangePeersStepsDownInJointConsensus() throws Exception {
List<TestPeer> peers = new ArrayList<>();
TestPeer peer0 = new TestPeer(testInfo, 5006);
TestPeer peer1 = new TestPeer(testInfo, 5007);
TestPeer peer2 = new TestPeer(testInfo, 5008);
TestPeer peer3 = new TestPeer(testInfo, 5009);
// start single cluster
peers.add(peer0);
cluster = new TestCluster("testChangePeersStepsDownInJointConsensus", dataPath, peers, testInfo);
assertTrue(cluster.start(peer0));
Node leader = cluster.waitAndGetLeader();
assertNotNull(leader);
sendTestTaskAndWait(leader);
// start peer1-3
assertTrue(cluster.start(peer1));
assertTrue(cluster.start(peer2));
assertTrue(cluster.start(peer3));
// Make sure the topology is ready before adding peers.
assertTrue(waitForTopology(cluster, leader.getNodeId().getPeerId(), 4, 3_000));
Configuration conf = new Configuration();
conf.addPeer(peer0.getPeerId());
conf.addPeer(peer1.getPeerId());
conf.addPeer(peer2.getPeerId());
conf.addPeer(peer3.getPeerId());
// change peers
SynchronizedClosure done = new SynchronizedClosure();
leader.changePeers(conf, done);
assertTrue(done.await().isOk());
// stop peer3
assertTrue(cluster.stop(peer3.getPeerId()));
conf.removePeer(peer0.getPeerId());
conf.removePeer(peer1.getPeerId());
// Change peers to [peer2, peer3], which must fail since peer3 is stopped
done.reset();
leader.changePeers(conf, done);
assertEquals(RaftError.EPERM, done.await().getRaftError());
log.info(done.getStatus().toString());
assertFalse(((NodeImpl) leader).getConf().isStable());
leader = cluster.getLeader();
assertNull(leader);
assertTrue(cluster.start(peer3));
Thread.sleep(1000);
leader = cluster.waitAndGetLeader(Set.of(peer2.getPeerId(), peer3.getPeerId()));
List<PeerId> thePeers = leader.listPeers();
assertTrue(!thePeers.isEmpty());
assertEquals(conf.getPeerSet(), new HashSet<>(thePeers));
}
static class ChangeArg {
TestCluster c;
List<PeerId> peers;
volatile boolean stop;
boolean dontRemoveFirstPeer;
ChangeArg(TestCluster c, List<PeerId> peers, boolean stop,
boolean dontRemoveFirstPeer) {
super();
this.c = c;
this.peers = peers;
this.stop = stop;
this.dontRemoveFirstPeer = dontRemoveFirstPeer;
}
}
private Future<?> startChangePeersThread(ChangeArg arg) {
Set<RaftError> expectedErrors = new HashSet<>();
expectedErrors.add(RaftError.EBUSY);
expectedErrors.add(RaftError.EPERM);
expectedErrors.add(RaftError.ECATCHUP);
ExecutorService executor = Executors.newSingleThreadExecutor();
executors.add(executor);
return Utils.runInThread(executor, () -> {
try {
while (!arg.stop) {
Node leader = arg.c.waitAndGetLeader();
if (leader == null)
continue;
// select peers in random
Configuration conf = new Configuration();
if (arg.dontRemoveFirstPeer)
conf.addPeer(arg.peers.get(0));
for (int i = 0; i < arg.peers.size(); i++) {
boolean select = ThreadLocalRandom.current().nextInt(64) < 32;
if (select && !conf.contains(arg.peers.get(i)))
conf.addPeer(arg.peers.get(i));
}
if (conf.isEmpty()) {
log.warn("No peer has been selected");
continue;
}
SynchronizedClosure done = new SynchronizedClosure();
leader.changePeers(conf, done);
done.await();
assertTrue(done.getStatus().isOk() || expectedErrors.contains(done.getStatus().getRaftError()), done.getStatus().toString());
}
}
catch (InterruptedException e) {
log.error("ChangePeersThread is interrupted", e);
}
});
}
@Test
public void testChangePeersChaosWithSnapshot() throws Exception {
// start cluster
List<TestPeer> peers = new ArrayList<>();
peers.add(new TestPeer(testInfo, TestUtils.INIT_PORT));
cluster = new TestCluster("testChangePeersChaosWithSnapshot", dataPath, peers, ELECTION_TIMEOUT_MILLIS, testInfo);
assertTrue(cluster.start(peers.get(0), false, 2));
// start other peers
for (int i = 1; i < 10; i++) {
TestPeer peer = new TestPeer(testInfo, TestUtils.INIT_PORT + i);
peers.add(peer);
assertTrue(cluster.start(peer));
}
ChangeArg arg = new ChangeArg(cluster, peers.stream().map(TestPeer::getPeerId).collect(toList()), false, false);
Future<?> future = startChangePeersThread(arg);
for (int i = 0; i < 5000; ) {
Node leader = cluster.waitAndGetLeader();
if (leader == null)
continue;
SynchronizedClosure done = new SynchronizedClosure();
Task task = new Task(ByteBuffer.wrap(("hello" + i).getBytes(UTF_8)), done);
leader.apply(task);
Status status = done.await();
if (status.isOk()) {
if (++i % 100 == 0)
System.out.println("Progress:" + i);
}
else
assertEquals(RaftError.EPERM, status.getRaftError());
}
arg.stop = true;
future.get();
SynchronizedClosure done = new SynchronizedClosure();
Node leader = cluster.waitAndGetLeader();
leader.changePeers(new Configuration(peers.stream().map(TestPeer::getPeerId).collect(toList())), done);
Status st = done.await();
assertTrue(st.isOk(), st.getErrorMsg());
cluster.ensureSame();
assertEquals(10, cluster.getFsms().size());
for (MockStateMachine fsm : cluster.getFsms())
assertTrue(fsm.getLogs().size() >= 5000);
}
@Test
public void testChangePeersChaosWithoutSnapshot() throws Exception {
// start cluster
List<TestPeer> peers = new ArrayList<>();
peers.add(new TestPeer(testInfo, TestUtils.INIT_PORT));
cluster = new TestCluster("testChangePeersChaosWithoutSnapshot", dataPath, peers, ELECTION_TIMEOUT_MILLIS, testInfo);
assertTrue(cluster.start(peers.get(0), false, 100000));
// start other peers
for (int i = 1; i < 10; i++) {
TestPeer peer = new TestPeer(testInfo, TestUtils.INIT_PORT + i);
peers.add(peer);
assertTrue(cluster.start(peer, false, 10000));
}
ChangeArg arg = new ChangeArg(cluster, peers.stream().map(TestPeer::getPeerId).collect(toList()), false, true);
Future<?> future = startChangePeersThread(arg);
final int tasks = 5000;
for (int i = 0; i < tasks; ) {
Node leader = cluster.waitAndGetLeader();
if (leader == null)
continue;
SynchronizedClosure done = new SynchronizedClosure();
Task task = new Task(ByteBuffer.wrap(("hello" + i).getBytes(UTF_8)), done);
leader.apply(task);
Status status = done.await();
if (status.isOk()) {
if (++i % 100 == 0)
System.out.println("Progress:" + i);
}
else
assertEquals(RaftError.EPERM, status.getRaftError());
}
arg.stop = true;
future.get();
SynchronizedClosure done = new SynchronizedClosure();
Node leader = cluster.waitAndGetLeader();
leader.changePeers(new Configuration(peers.stream().map(TestPeer::getPeerId).collect(toList())), done);
assertTrue(done.await().isOk());
cluster.ensureSame();
assertEquals(10, cluster.getFsms().size());
for (MockStateMachine fsm : cluster.getFsms()) {
final int logSize = fsm.getLogs().size();
assertTrue(logSize >= tasks, "logSize=" + logSize);
}
}
@Test
public void testChangePeersChaosApplyTasks() throws Exception {
// start cluster
List<TestPeer> peers = new ArrayList<>();
peers.add(new TestPeer(testInfo, TestUtils.INIT_PORT));
cluster = new TestCluster("testChangePeersChaosApplyTasks", dataPath, peers, ELECTION_TIMEOUT_MILLIS, testInfo);
assertTrue(cluster.start(peers.get(0), false, 100000));
// start other peers
for (int i = 1; i < 10; i++) {
TestPeer peer = new TestPeer(testInfo, TestUtils.INIT_PORT + i);
peers.add(peer);
assertTrue(cluster.start(peer, false, 100000));
}
final int threads = 3;
List<ChangeArg> args = new ArrayList<>();
List<Future<?>> futures = new ArrayList<>();
CountDownLatch latch = new CountDownLatch(threads);
ExecutorService executor = Executors.newFixedThreadPool(threads);
executors.add(executor);
for (int t = 0; t < threads; t++) {
ChangeArg arg = new ChangeArg(cluster, peers.stream().map(TestPeer::getPeerId).collect(toList()), false, true);
args.add(arg);
futures.add(startChangePeersThread(arg));
Utils.runInThread(executor, () -> {
try {
for (int i = 0; i < 5000; ) {
Node leader = cluster.waitAndGetLeader();
if (leader == null)
continue;
SynchronizedClosure done = new SynchronizedClosure();
Task task = new Task(ByteBuffer.wrap(("hello" + i).getBytes(UTF_8)), done);
leader.apply(task);
Status status = done.await();
if (status.isOk()) {
if (++i % 100 == 0)
System.out.println("Progress:" + i);
}
else
assertEquals(RaftError.EPERM, status.getRaftError());
}
}
catch (Exception e) {
log.error("Failed to run tasks", e);
}
finally {
latch.countDown();
}
});
}
latch.await();
for (ChangeArg arg : args)
arg.stop = true;
for (Future<?> future : futures)
future.get();
SynchronizedClosure done = new SynchronizedClosure();
Node leader = cluster.waitAndGetLeader();
leader.changePeers(new Configuration(peers.stream().map(TestPeer::getPeerId).collect(toList())), done);
assertTrue(done.await().isOk());
cluster.ensureSame();
assertEquals(10, cluster.getFsms().size());
for (MockStateMachine fsm : cluster.getFsms()) {
int logSize = fsm.getLogs().size();
assertTrue(logSize >= 5000 * threads, "logSize= " + logSize);
}
}
@Test
public void testBlockedElection() throws Exception {
List<TestPeer> peers = TestUtils.generatePeers(testInfo, 3);
cluster = new TestCluster("unittest", dataPath, peers, testInfo);
for (TestPeer peer : peers)
assertTrue(cluster.start(peer));
Node leader = cluster.waitAndGetLeader();
log.warn("Current leader {}, electTimeout={}", leader.getNodeId().getPeerId(), leader.getOptions().getElectionTimeoutMs());
List<Node> followers = cluster.getFollowers();
blockMessagesOnFollowers(followers, (msg, nodeId) -> {
if (msg instanceof RpcRequests.RequestVoteRequest) {
RpcRequests.RequestVoteRequest msg0 = (RpcRequests.RequestVoteRequest) msg;
return !msg0.preVote();
}
return false;
});
log.warn("Stop leader {}, curTerm={}", leader.getNodeId().getPeerId(), ((NodeImpl) leader).getCurrentTerm());
assertTrue(cluster.stop(leader.getNodeId().getPeerId()));
assertNull(cluster.getLeader());
Thread.sleep(2000);
assertNull(cluster.getLeader());
stopBlockingMessagesOnFollowers(followers);
// elect new leader
leader = cluster.waitAndGetLeader();
log.info("Elect new leader is {}, curTerm={}", leader.getLeaderId(), ((NodeImpl) leader).getCurrentTerm());
}
@Test
public void testElectionTimeoutAutoAdjustWhenBlockedAllMessages() throws Exception {
testElectionTimeoutAutoAdjustWhenBlockedMessages((msg, nodeId) -> true);
}
@Test
public void testElectionTimeoutAutoAdjustWhenBlockedRequestVoteMessages() throws Exception {
testElectionTimeoutAutoAdjustWhenBlockedMessages((msg, nodeId) -> {
if (msg instanceof RpcRequests.RequestVoteRequest) {
RpcRequests.RequestVoteRequest msg0 = (RpcRequests.RequestVoteRequest) msg;
return !msg0.preVote();
}
return false;
});
}
private void testElectionTimeoutAutoAdjustWhenBlockedMessages(BiPredicate<Object, String> blockingPredicate) throws Exception {
List<TestPeer> peers = TestUtils.generatePeers(testInfo, 4);
int maxElectionRoundsWithoutAdjusting = 3;
cluster = new TestCluster("unittest", dataPath, peers, new LinkedHashSet<>(), ELECTION_TIMEOUT_MILLIS,
opts -> opts.setElectionTimeoutStrategy(new ExponentialBackoffTimeoutStrategy(11_000, maxElectionRoundsWithoutAdjusting)),
testInfo);
for (TestPeer peer : peers) {
assertTrue(cluster.start(peer));
}
Node leader = cluster.waitAndGetLeader();
int initElectionTimeout = leader.getOptions().getElectionTimeoutMs();
log.warn("Current leader {}, electTimeout={}", leader.getNodeId().getPeerId(), leader.getOptions().getElectionTimeoutMs());
List<Node> followers = cluster.getFollowers();
for (Node follower : followers) {
NodeImpl follower0 = (NodeImpl) follower;
assertEquals(initElectionTimeout, follower0.getOptions().getElectionTimeoutMs());
}
blockMessagesOnFollowers(followers, blockingPredicate);
log.warn("Stop leader {}, curTerm={}", leader.getNodeId().getPeerId(), ((NodeImpl) leader).getCurrentTerm());
assertTrue(cluster.stop(leader.getNodeId().getPeerId()));
assertNull(cluster.getLeader());
assertTrue(waitForCondition(() -> followers.stream().allMatch(f -> f.getOptions().getElectionTimeoutMs() > initElectionTimeout),
(long) maxElectionRoundsWithoutAdjusting
// need to multiply to 2 because stepDown happens after voteTimer timeout
* (initElectionTimeout + followers.get(0).getOptions().getRaftOptions().getMaxElectionDelayMs()) * 2));
stopBlockingMessagesOnFollowers(followers);
// elect new leader
leader = cluster.waitAndGetLeader();
log.info("Elected new leader is {}, curTerm={}", leader.getLeaderId(), ((NodeImpl) leader).getCurrentTerm());
assertTrue(
waitForCondition(() -> followers.stream().allMatch(f -> f.getOptions().getElectionTimeoutMs() == initElectionTimeout),
3_000));
}
/**
* Tests if a read using leader leases works correctly after previous leader segmentation.
*/
@Test
public void testLeaseReadAfterSegmentation() throws Exception {
List<TestPeer> peers = TestUtils.generatePeers(testInfo, 3);
cluster = new TestCluster("unittest", dataPath, peers, 3_000, testInfo);
for (TestPeer peer : peers) {
RaftOptions opts = new RaftOptions();
opts.setElectionHeartbeatFactor(2); // Election timeout divisor.
opts.setReadOnlyOptions(ReadOnlyOption.ReadOnlyLeaseBased);
assertTrue(cluster.start(peer, false, 300, false, null, opts));
}
NodeImpl leader = (NodeImpl) cluster.waitAndGetLeader();
assertNotNull(leader);
cluster.ensureLeader(leader);
sendTestTaskAndWait(leader);
cluster.ensureSame();
DefaultRaftClientService rpcService = (DefaultRaftClientService) leader.getRpcClientService();
RpcClientEx rpcClientEx = (RpcClientEx) rpcService.getRpcClient();
AtomicInteger cnt = new AtomicInteger();
rpcClientEx.blockMessages((msg, nodeId) -> {
assertTrue(msg instanceof RpcRequests.AppendEntriesRequest);
if (cnt.get() >= 2)
return true;
log.info("Send heartbeat: " + msg + " to " + nodeId);
cnt.incrementAndGet();
return false;
});
assertTrue(waitForCondition(() -> {
Node currentLeader = cluster.getLeader();
return currentLeader != null && !leader.getNodeId().equals(currentLeader.getNodeId());
}, 10_000));
CompletableFuture<Status> res = new CompletableFuture<>();
cluster.getLeader().readIndex(null, new ReadIndexClosure() {
@Override public void run(Status status, long index, byte[] reqCtx) {
res.complete(status);
}
});
assertTrue(res.get().isOk());
}
/**
* Tests propagation of HLC on heartbeat request and response.
*/
@Test
public void testHlcPropagation() throws Exception {
List<TestPeer> peers = TestUtils.generatePeers(testInfo, 2);
cluster = new TestCluster("unitest", dataPath, peers, 3_000, testInfo);
for (TestPeer peer : peers) {
RaftOptions opts = new RaftOptions();
opts.setElectionHeartbeatFactor(4); // Election timeout divisor.
HybridClock clock = new HybridClockImpl();
assertTrue(cluster.start(peer, false, 300, false, null, opts, clock));
}
List<NodeImpl> nodes = cluster.getNodes();
for (NodeImpl node : nodes) {
RpcClientEx rpcClientEx = sender(node);
rpcClientEx.recordMessages((msg, nodeId) -> {
if (msg instanceof AppendEntriesRequestImpl ||
msg instanceof AppendEntriesResponseImpl) {
return true;
}
return false;
});
}
Node leader = cluster.waitAndGetLeader();
cluster.ensureLeader(leader);
RpcClientEx client = sender(leader);
AtomicBoolean heartbeatRequest = new AtomicBoolean(false);
AtomicBoolean appendEntriesRequest = new AtomicBoolean(false);
AtomicBoolean heartbeatResponse = new AtomicBoolean(false);
AtomicBoolean appendEntriesResponse = new AtomicBoolean(false);
waitForCondition(() -> {
client.recordedMessages().forEach(msgs -> {
if (msgs[0] instanceof AppendEntriesRequestImpl) {
AppendEntriesRequestImpl msg = (AppendEntriesRequestImpl) msgs[0];
if (msg.entriesList() == null && msg.data() == null) {
heartbeatRequest.set(true);
} else {
appendEntriesRequest.set(true);
}
assertTrue(msg.timestamp() != null);
} else if (msgs[0] instanceof AppendEntriesResponseImpl) {
AppendEntriesResponseImpl msg = (AppendEntriesResponseImpl) msgs[0];
if (msg.timestamp() == null) {
appendEntriesResponse.set(true);
} else {
heartbeatResponse.set(true);
}
}
});
return heartbeatRequest.get() &&
appendEntriesRequest.get() &&
heartbeatResponse.get() &&
appendEntriesResponse.get();
},
5000);
assertTrue(heartbeatRequest.get());
assertTrue(appendEntriesRequest.get());
assertTrue(heartbeatResponse.get());
assertTrue(appendEntriesResponse.get());
}
private NodeOptions createNodeOptions(int nodeIdx) {
NodeOptions options = new NodeOptions();
DefaultLogStorageFactory log = new DefaultLogStorageFactory(Path.of(dataPath, "node" + nodeIdx, "log"));
log.start();
options.setServiceFactory(new IgniteJraftServiceFactory(log));
options.setLogUri("test");
return options;
}
/**
* TODO asch get rid of waiting for topology IGNITE-14832
*
* @param cluster
* @param peerId
* @param expected
* @param timeout
* @return
*/
private static boolean waitForTopology(TestCluster cluster, PeerId peerId, int expected, long timeout) {
RaftGroupService grp = cluster.getServer(peerId);
if (grp == null) {
log.warn("Node has not been found {}", peerId);
return false;
}
RpcServer rpcServer = grp.getRpcServer();
if (!(rpcServer instanceof IgniteRpcServer))
return true;
ClusterService service = ((IgniteRpcServer) grp.getRpcServer()).clusterService();
long stop = System.currentTimeMillis() + timeout;
while (System.currentTimeMillis() < stop) {
if (service.topologyService().allMembers().size() >= expected)
return true;
try {
Thread.sleep(50);
}
catch (InterruptedException e) {
return false;
}
}
return false;
}
/**
* @param cond The condition.
* @param timeout The timeout.
* @return {@code True} if the condition is satisfied.
*/
private boolean waitForCondition(BooleanSupplier cond, long timeout) {
long stop = System.currentTimeMillis() + timeout;
while (System.currentTimeMillis() < stop) {
if (cond.getAsBoolean())
return true;
try {
Thread.sleep(50);
}
catch (InterruptedException e) {
return false;
}
}
return false;
}
/**
* @param groupId Group id.
* @param peer Peer.
* @param nodeOptions Node options.
* @return Raft group service.
*/
private RaftGroupService createService(String groupId, TestPeer peer, NodeOptions nodeOptions, Collection<TestPeer> peers) {
nodeOptions.setStripes(1);
nodeOptions.setLogStripesCount(1);
List<NetworkAddress> addressList = peers.stream()
.map(p -> new NetworkAddress(TestUtils.getLocalAddress(), p.getPort()))
.collect(toList());
var nodeManager = new NodeManager();
ClusterService clusterService = ClusterServiceTestUtils.clusterService(
testInfo,
peer.getPort(),
new StaticNodeFinder(addressList)
);
ExecutorService requestExecutor = JRaftUtils.createRequestExecutor(nodeOptions);
executors.add(requestExecutor);
IgniteRpcServer rpcServer = new TestIgniteRpcServer(clusterService, nodeManager, nodeOptions, requestExecutor);
nodeOptions.setRpcClient(new IgniteRpcClient(clusterService));
nodeOptions.setCommandsMarshaller(TestCluster.commandsMarshaller(clusterService));
assertThat(clusterService.startAsync(), willCompleteSuccessfully());
var service = new RaftGroupService(groupId, peer.getPeerId(), nodeOptions, rpcServer, nodeManager) {
@Override public synchronized void shutdown() {
rpcServer.shutdown();
super.shutdown();
assertThat(clusterService.stopAsync(), willCompleteSuccessfully());
}
};
services.add(service);
return service;
}
private void sendTestTaskAndWait(Node node) throws InterruptedException {
this.sendTestTaskAndWait(node, 0, 10, RaftError.SUCCESS);
}
private void sendTestTaskAndWait(Node node, int amount) throws InterruptedException {
this.sendTestTaskAndWait(node, 0, amount, RaftError.SUCCESS);
}
private void sendTestTaskAndWait(Node node, RaftError err) throws InterruptedException {
this.sendTestTaskAndWait(node, 0, 10, err);
}
// Note that waiting for the latch when tasks are applying doesn't guarantee that FSMCallerImpl.lastAppliedIndex
// will be updated immediately.
private void sendTestTaskAndWait(Node node, int start, int amount,
RaftError err) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(amount);
for (int i = start; i < start + amount; i++) {
ByteBuffer data = ByteBuffer.wrap(("hello" + i).getBytes(UTF_8));
Task task = new Task(data, new ExpectClosure(err, latch));
node.apply(task);
}
waitLatch(latch);
}
private void sendTestTaskAndWait(Node node, int start,
RaftError err) throws InterruptedException {
sendTestTaskAndWait(node, start, 10, err);
}
@SuppressWarnings("SameParameterValue")
private int sendTestTaskAndWait(String prefix, Node node, int amount,
int code) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(10);
final AtomicInteger successCount = new AtomicInteger(0);
for (int i = 0; i < amount; i++) {
ByteBuffer data = ByteBuffer.wrap((prefix + i).getBytes(UTF_8));
Task task = new Task(data, new ExpectClosure(code, null, latch, successCount));
node.apply(task);
}
waitLatch(latch);
return successCount.get();
}
private void triggerLeaderSnapshot(TestCluster cluster, Node leader) throws InterruptedException {
triggerLeaderSnapshot(cluster, leader, 1);
}
private void triggerLeaderSnapshot(TestCluster cluster, Node leader, int times)
throws InterruptedException {
// trigger leader snapshot
// first snapshot will be triggered randomly
int snapshotTimes = cluster.getLeaderFsm().getSaveSnapshotTimes();
assertTrue(snapshotTimes == times - 1 || snapshotTimes == times, "snapshotTimes=" + snapshotTimes + ", times=" + times);
CountDownLatch latch = new CountDownLatch(1);
leader.snapshot(new ExpectClosure(latch));
waitLatch(latch);
assertEquals(snapshotTimes + 1, cluster.getLeaderFsm().getSaveSnapshotTimes());
}
private void waitLatch(CountDownLatch latch) throws InterruptedException {
assertTrue(latch.await(30, TimeUnit.SECONDS));
}
@SuppressWarnings({"unused", "SameParameterValue"})
private boolean assertReadIndex(Node node, int index) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
byte[] requestContext = TestUtils.getRandomBytes();
AtomicBoolean success = new AtomicBoolean(false);
node.readIndex(requestContext, new ReadIndexClosure() {
@Override
public void run(Status status, long theIndex, byte[] reqCtx) {
if (status.isOk()) {
assertEquals(index, theIndex);
assertArrayEquals(requestContext, reqCtx);
success.set(true);
}
else {
assertTrue(status.getErrorMsg().contains("RPC exception:Check connection["), status.getErrorMsg());
assertTrue(status.getErrorMsg().contains("] fail and try to create new one"), status.getErrorMsg());
}
latch.countDown();
}
});
latch.await();
return success.get();
}
private void blockMessagesOnFollowers(List<Node> followers, BiPredicate<Object, String> blockingPredicate) {
for (Node follower : followers) {
RpcClientEx rpcClientEx = sender(follower);
rpcClientEx.blockMessages(blockingPredicate);
}
}
private void stopBlockingMessagesOnFollowers(List<Node> followers) {
for (Node follower : followers) {
RpcClientEx rpcClientEx = sender(follower);
rpcClientEx.stopBlock();
}
}
static void waitForTopologyOnEveryNode(int count, TestCluster cluster) {
cluster.getAllNodes().forEach(peerId -> {
assertTrue(waitForTopology(cluster, peerId, count, TimeUnit.SECONDS.toMillis(10)));
});
}
private static TestPeer findById(Collection<TestPeer> peers, PeerId id) {
return peers.stream().filter(t -> t.getPeerId().equals(id)).findAny().orElseThrow();
}
}