blob: 0c2c7ee4aab4318a3ff55c578459f11a4c95d357 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.raft.server;
import static java.util.Comparator.comparing;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
import static org.apache.ignite.internal.raft.server.RaftGroupOptions.defaults;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.raft.jraft.core.State.STATE_ERROR;
import static org.apache.ignite.raft.jraft.core.State.STATE_LEADER;
import static org.apache.ignite.raft.jraft.test.TestUtils.waitForCondition;
import static org.apache.ignite.raft.jraft.test.TestUtils.waitForTopology;
import static org.apache.ignite.raft.server.counter.GetValueCommand.getValueCommand;
import static org.apache.ignite.raft.server.counter.IncrementAndGetCommand.incrementAndGetCommand;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.RaftNodeId;
import org.apache.ignite.internal.raft.ReadCommand;
import org.apache.ignite.internal.raft.WriteCommand;
import org.apache.ignite.internal.raft.server.RaftGroupOptions;
import org.apache.ignite.internal.raft.server.RaftServer;
import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
import org.apache.ignite.internal.raft.service.CommandClosure;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.raft.util.ThreadLocalOptimizedMarshaller;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TestReplicationGroupId;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.raft.jraft.core.NodeImpl;
import org.apache.ignite.raft.jraft.core.StateMachineAdapter;
import org.apache.ignite.raft.jraft.entity.RaftOutter.SnapshotMeta;
import org.apache.ignite.raft.jraft.rpc.impl.RaftException;
import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper;
import org.apache.ignite.raft.messages.TestRaftMessagesFactory;
import org.apache.ignite.raft.server.counter.CounterListener;
import org.apache.ignite.raft.server.counter.GetValueCommand;
import org.apache.ignite.raft.server.counter.IncrementAndGetCommand;
import org.apache.ignite.raft.server.snasphot.SnapshotInMemoryStorageFactory;
import org.apache.ignite.raft.server.snasphot.UpdateCountRaftListener;
import org.junit.jupiter.api.Test;
/**
* Jraft server.
*/
class ItJraftCounterServerTest extends JraftAbstractTest {
/**
* Counter group name 0.
*/
private static final TestReplicationGroupId COUNTER_GROUP_0 = new TestReplicationGroupId("counter0");
/**
* Counter group name 1.
*/
private static final TestReplicationGroupId COUNTER_GROUP_1 = new TestReplicationGroupId("counter1");
/** Amount of stripes for disruptors that are used by JRAFT. */
private static final int RAFT_STRIPES = 3;
/** Amount of stripes for disruptors that are used by the log service for JRAFT. */
private static final int RAFT_LOG_STRIPES = 1;
/**
* Listener factory.
*/
private Supplier<CounterListener> listenerFactory = CounterListener::new;
/**
* Starts a cluster for the test.
*
* @throws Exception If failed.
*/
private void startCluster() throws Exception {
for (int i = 0; i < 3; i++) {
startServer(i, raftServer -> {
String localNodeName = raftServer.clusterService().topologyService().localMember().name();
Peer serverPeer = initialMembersConf.peer(localNodeName);
RaftGroupOptions groupOptions = groupOptions(raftServer);
raftServer.startRaftNode(
new RaftNodeId(COUNTER_GROUP_0, serverPeer), initialMembersConf, listenerFactory.get(), groupOptions
);
raftServer.startRaftNode(
new RaftNodeId(COUNTER_GROUP_1, serverPeer), initialMembersConf, listenerFactory.get(), groupOptions
);
}, opts -> {});
}
startClient(COUNTER_GROUP_0);
startClient(COUNTER_GROUP_1);
}
/**
* Checks that the number of Disruptor threads does not depend on count started RAFT nodes.
*/
@Test
public void testDisruptorThreadsCount() {
startServer(0, raftServer -> {
String localNodeName = raftServer.clusterService().topologyService().localMember().name();
var nodeId = new RaftNodeId(new TestReplicationGroupId("test_raft_group"), initialMembersConf.peer(localNodeName));
raftServer.startRaftNode(nodeId, initialMembersConf, listenerFactory.get(), groupOptions(raftServer));
}, opts -> {
opts.setStripes(RAFT_STRIPES);
opts.setLogStripesCount(RAFT_LOG_STRIPES);
opts.setLogYieldStrategy(true);
});
Set<Thread> threads = getAllDisruptorCurrentThreads();
int threadsBefore = threads.size();
Set<String> threadNamesBefore = threads.stream().map(Thread::getName).collect(toSet());
assertEquals(RAFT_STRIPES * 3/* services */ + RAFT_LOG_STRIPES, threadsBefore, "Started thread names: " + threadNamesBefore);
servers.forEach(srv -> {
String localNodeName = srv.clusterService().topologyService().localMember().name();
Peer serverPeer = initialMembersConf.peer(localNodeName);
for (int i = 0; i < 10; i++) {
var nodeId = new RaftNodeId(new TestReplicationGroupId("test_raft_group_" + i), serverPeer);
srv.startRaftNode(nodeId, initialMembersConf, listenerFactory.get(), groupOptions(srv));
}
});
threads = getAllDisruptorCurrentThreads();
int threadsAfter = threads.size();
Set<String> threadNamesAfter = threads.stream().map(Thread::getName).collect(toSet());
threadNamesAfter.removeAll(threadNamesBefore);
assertEquals(threadsBefore, threadsAfter, "Difference: " + threadNamesAfter);
}
/**
* Get a set of Disruptor threads for the well known JRaft services.
*
* @return Set of Disruptor threads.
*/
private Set<Thread> getAllDisruptorCurrentThreads() {
return Thread.getAllStackTraces().keySet().stream().filter(t ->
t.getName().contains("JRaft-FSMCaller-Disruptor")
|| t.getName().contains("JRaft-NodeImpl-Disruptor")
|| t.getName().contains("JRaft-ReadOnlyService-Disruptor")
|| t.getName().contains("JRaft-LogManager-Disruptor"))
.collect(toSet());
}
@Test
public void testRefreshLeader() throws Exception {
startCluster();
Peer leader = clients.get(0).leader();
assertNull(leader);
clients.get(0).refreshLeader().get();
assertNotNull(clients.get(0).leader());
leader = clients.get(1).leader();
assertNull(leader);
clients.get(1).refreshLeader().get();
assertNotNull(clients.get(1).leader());
}
@Test
public void testCounterCommandListener() throws Exception {
startCluster();
RaftGroupService client1 = clients.get(0);
RaftGroupService client2 = clients.get(1);
client1.refreshLeader().get();
client2.refreshLeader().get();
assertNotNull(client1.leader());
assertNotNull(client2.leader());
assertEquals(2, client1.<Long>run(incrementAndGetCommand(2)).get());
assertEquals(2, client1.<Long>run(getValueCommand()).get());
assertEquals(3, client1.<Long>run(incrementAndGetCommand(1)).get());
assertEquals(3, client1.<Long>run(getValueCommand()).get());
assertEquals(4, client2.<Long>run(incrementAndGetCommand(4)).get());
assertEquals(4, client2.<Long>run(getValueCommand()).get());
assertEquals(7, client2.<Long>run(incrementAndGetCommand(3)).get());
assertEquals(7, client2.<Long>run(getValueCommand()).get());
}
@Test
public void testCreateSnapshot() throws Exception {
startCluster();
RaftGroupService client1 = clients.get(0);
RaftGroupService client2 = clients.get(1);
client1.refreshLeader().get();
client2.refreshLeader().get();
JraftServerImpl server = servers.get(0);
long val = applyIncrements(client1, 1, 10);
assertEquals(sum(10), val);
Peer localPeer0 = server.localPeers(COUNTER_GROUP_0).get(0);
client1.snapshot(localPeer0).get();
long val2 = applyIncrements(client2, 1, 20);
assertEquals(sum(20), val2);
Peer localPeer1 = server.localPeers(COUNTER_GROUP_1).get(0);
client2.snapshot(localPeer1).get();
Path snapshotDir0 = server.getServerDataPath(new RaftNodeId(COUNTER_GROUP_0, localPeer0)).resolve("snapshot");
assertEquals(1L, countFiles(snapshotDir0));
Path snapshotDir1 = server.getServerDataPath(new RaftNodeId(COUNTER_GROUP_1, localPeer1)).resolve("snapshot");
assertEquals(1L, countFiles(snapshotDir1));
}
/**
* Returns the number of files in the given directory (non-recursive).
*/
private static long countFiles(Path dir) throws IOException {
try (Stream<Path> files = Files.list(dir)) {
return files.count();
}
}
@Test
public void testCreateSnapshotGracefulFailure() throws Exception {
listenerFactory = () -> new CounterListener() {
@Override
public void onSnapshotSave(Path path, Consumer<Throwable> doneClo) {
doneClo.accept(new IgniteInternalException("Very bad"));
}
};
startCluster();
RaftGroupService client1 = clients.get(0);
RaftGroupService client2 = clients.get(1);
client1.refreshLeader().get();
client2.refreshLeader().get();
RaftServer server = servers.get(0);
Peer peer = server.localPeers(COUNTER_GROUP_0).get(0);
long val = applyIncrements(client1, 1, 10);
assertEquals(sum(10), val);
try {
client1.snapshot(peer).get();
fail();
} catch (Exception e) {
assertTrue(e.getCause() instanceof RaftException);
}
}
@Test
public void testCreateSnapshotAbnormalFailure() throws Exception {
listenerFactory = () -> new CounterListener() {
@Override
public void onSnapshotSave(Path path, Consumer<Throwable> doneClo) {
doneClo.accept(new IgniteInternalException("Very bad"));
}
};
startCluster();
RaftGroupService client1 = clients.get(0);
RaftGroupService client2 = clients.get(1);
client1.refreshLeader().get();
client2.refreshLeader().get();
long val = applyIncrements(client1, 1, 10);
assertEquals(sum(10), val);
Peer peer = servers.get(0).localPeers(COUNTER_GROUP_0).get(0);
try {
client1.snapshot(peer).get();
fail();
} catch (Exception e) {
assertTrue(e.getCause() instanceof RaftException);
}
}
/** Tests if a raft group become unavailable in case of a critical error. */
@Test
public void testApplyWithFailure() throws Exception {
listenerFactory = () -> new CounterListener() {
@Override
public void onWrite(Iterator<CommandClosure<WriteCommand>> iterator) {
Iterator<CommandClosure<WriteCommand>> wrapper = new Iterator<>() {
@Override
public boolean hasNext() {
return iterator.hasNext();
}
@Override
public CommandClosure<WriteCommand> next() {
CommandClosure<WriteCommand> cmd = iterator.next();
IncrementAndGetCommand command = (IncrementAndGetCommand) cmd.command();
if (command.delta() == 10) {
throw new IgniteInternalException("Very bad");
}
return cmd;
}
};
super.onWrite(wrapper);
}
};
startCluster();
RaftGroupService client1 = clients.get(0);
RaftGroupService client2 = clients.get(1);
client1.refreshLeader().get();
client2.refreshLeader().get();
NodeImpl leader = getRaftNodes(COUNTER_GROUP_0)
.filter(n -> n.getState() == STATE_LEADER)
.findFirst()
.orElse(null);
assertNotNull(leader);
long val1 = applyIncrements(client1, 1, 5);
long val2 = applyIncrements(client2, 1, 7);
assertEquals(sum(5), val1);
assertEquals(sum(7), val2);
long val3 = applyIncrements(client1, 6, 9);
assertEquals(sum(9), val3);
try {
client1.<Long>run(incrementAndGetCommand(10)).get();
fail();
} catch (Exception e) {
// Expected.
Throwable cause = e.getCause();
assertTrue(cause instanceof RaftException);
}
NodeImpl finalLeader = leader;
waitForCondition(() -> finalLeader.getState() == STATE_ERROR, 5_000);
// Client can't switch to new leader, because only one peer in the list.
try {
client1.<Long>run(incrementAndGetCommand(11)).get();
} catch (Exception e) {
boolean isValid = e.getCause() instanceof TimeoutException;
if (!isValid) {
logger().error("Got unexpected exception", e);
}
assertTrue(isValid, "Expecting the timeout");
}
}
/** Tests that users related exceptions from SM are propagated to the client. */
@Test
public void testClientCatchExceptionFromSm() throws Exception {
listenerFactory = () -> new CounterListener() {
@Override
public void onWrite(Iterator<CommandClosure<WriteCommand>> iterator) {
while (iterator.hasNext()) {
CommandClosure<WriteCommand> clo = iterator.next();
IncrementAndGetCommand cmd0 = (IncrementAndGetCommand) clo.command();
clo.result(new RuntimeException("Expected message"));
}
}
@Override
public void onRead(Iterator<CommandClosure<ReadCommand>> iterator) {
while (iterator.hasNext()) {
CommandClosure<ReadCommand> clo = iterator.next();
assert clo.command() instanceof GetValueCommand;
clo.result(new RuntimeException("Another expected message"));
}
}
};
startCluster();
RaftGroupService client1 = clients.get(0);
RaftGroupService client2 = clients.get(1);
client1.refreshLeader().get();
client2.refreshLeader().get();
NodeImpl leader = getRaftNodes(COUNTER_GROUP_0)
.filter(n -> n.getState() == STATE_LEADER)
.findFirst()
.orElse(null);
assertNotNull(leader);
try {
client1.<Long>run(incrementAndGetCommand(3)).get();
fail();
} catch (Exception e) {
// Expected.
Throwable cause = e.getCause();
assertTrue(cause instanceof RuntimeException);
assertEquals(cause.getMessage(), "Expected message");
}
try {
client1.<Long>run(getValueCommand()).get();
fail();
} catch (Exception e) {
// Expected.
Throwable cause = e.getCause();
assertTrue(cause instanceof RuntimeException);
assertEquals(cause.getMessage(), "Another expected message");
}
}
/** Tests if a follower is catching up the leader after restarting. */
@Test
public void testFollowerCatchUpFromLog() throws Exception {
doTestFollowerCatchUp(false, true);
}
@Test
public void testFollowerCatchUpFromSnapshot() throws Exception {
doTestFollowerCatchUp(true, true);
}
@Test
public void testFollowerCatchUpFromLog2() throws Exception {
doTestFollowerCatchUp(false, false);
}
@Test
public void testFollowerCatchUpFromSnapshot2() throws Exception {
doTestFollowerCatchUp(true, false);
}
/** Tests if a starting a new group in shared pools mode doesn't increases timer threads count. */
@Test
public void testTimerThreadsCount() {
JraftServerImpl srv0 = startServer(0, x -> {
}, opts -> opts.setTimerPoolSize(1));
JraftServerImpl srv1 = startServer(1, x -> {
}, opts -> opts.setTimerPoolSize(1));
JraftServerImpl srv2 = startServer(2, x -> {
}, opts -> opts.setTimerPoolSize(1));
waitForTopology(srv0.clusterService(), 3, 5_000);
ExecutorService svc = Executors.newFixedThreadPool(16);
final int groupsCnt = 10;
try {
List<Future<?>> futs = new ArrayList<>(groupsCnt);
for (int i = 0; i < groupsCnt; i++) {
int finalI = i;
futs.add(svc.submit(() -> {
var groupId = new TestReplicationGroupId("counter" + finalI);
for (RaftServer srv : Arrays.asList(srv0, srv1, srv2)) {
String localNodeName = srv.clusterService().topologyService().localMember().name();
Peer serverPeer = initialMembersConf.peer(localNodeName);
RaftGroupOptions groupOptions = groupOptions(srv);
srv.startRaftNode(new RaftNodeId(groupId, serverPeer), initialMembersConf, listenerFactory.get(), groupOptions);
}
}));
}
for (Future<?> fut : futs) {
try {
fut.get();
} catch (Exception e) {
fail(e.getMessage());
}
}
} finally {
ExecutorServiceHelper.shutdownAndAwaitTermination(svc);
}
for (int i = 0; i < groupsCnt; i++) {
TestReplicationGroupId grp = new TestReplicationGroupId("counter" + i);
assertTrue(waitForCondition(() -> hasLeader(grp), 30_000));
}
Set<Thread> threads = Thread.getAllStackTraces().keySet();
logger().info("RAFT threads count {}", threads.stream().filter(t -> t.getName().contains("JRaft")).count());
List<Thread> timerThreads = threads.stream().filter(this::isTimer).sorted(comparing(Thread::getName)).collect(toList());
assertTrue(timerThreads.size() <= 15, // This is a maximum possible number of a timer threads for 3 nodes in this test.
"All timer threads: " + timerThreads.toString());
}
/**
* The test shows that all committed updates are applied after a RAFT group restart automatically.
* Actual data be available to read from state storage (not a state machine) directly just after the RAFT node started.
*
* @throws Exception If failed.
*/
@Test
public void testApplyUpdatesOutOfSnapshot() throws Exception {
var counters = new ConcurrentHashMap<Peer, AtomicInteger>();
var snapshotDataStorage = new ConcurrentHashMap<Path, Integer>();
var snapshotMetaStorage = new ConcurrentHashMap<String, SnapshotMeta>();
var grpId = new TestReplicationGroupId("test_raft_group");
for (int i = 0; i < 3; i++) {
startServer(i, raftServer -> {
String localNodeName = raftServer.clusterService().topologyService().localMember().name();
Peer serverPeer = initialMembersConf.peer(localNodeName);
var counter = new AtomicInteger();
counters.put(serverPeer, counter);
var listener = new UpdateCountRaftListener(counter, snapshotDataStorage);
RaftGroupOptions opts = groupOptions(raftServer)
.snapshotStorageFactory(new SnapshotInMemoryStorageFactory(snapshotMetaStorage));
raftServer.startRaftNode(new RaftNodeId(grpId, serverPeer), initialMembersConf, listener, opts);
}, opts -> {});
}
var raftClient = startClient(grpId);
raftClient.refreshMembers(true).get();
List<Peer> peers = raftClient.peers();
var testWriteCommandBuilder = new TestRaftMessagesFactory().testWriteCommand();
raftClient.run(testWriteCommandBuilder.build());
Peer peer0 = peers.get(0);
assertTrue(waitForCondition(() -> counters.get(peer0).get() == 1, 10_000));
raftClient.snapshot(peer0).get();
raftClient.run(testWriteCommandBuilder.build());
Peer peer1 = peers.get(1);
assertTrue(waitForCondition(() -> counters.get(peer1).get() == 2, 10_000));
raftClient.snapshot(peer1).get();
raftClient.run(testWriteCommandBuilder.build());
for (AtomicInteger counter : counters.values()) {
assertTrue(waitForCondition(() -> counter.get() == 3, 10_000));
}
Path peer0SnapPath = snapshotPath(peer0, grpId);
Path peer1SnapPath = snapshotPath(peer1, grpId);
Path peer2SnapPath = snapshotPath(peers.get(2), grpId);
shutdownCluster();
assertEquals(1, snapshotDataStorage.get(peer0SnapPath));
assertEquals(2, snapshotDataStorage.get(peer1SnapPath));
assertNull(snapshotDataStorage.get(peer2SnapPath));
assertNotNull(snapshotMetaStorage.get(peer0SnapPath.toString()));
assertNotNull(snapshotMetaStorage.get(peer1SnapPath.toString()));
assertNull(snapshotMetaStorage.get(peer2SnapPath.toString()));
for (int i = 0; i < 3; i++) {
startServer(i, raftServer -> {
String localNodeName = raftServer.clusterService().topologyService().localMember().name();
Peer serverPeer = initialMembersConf.peer(localNodeName);
var counter = counters.get(serverPeer);
counter.set(0);
var listener = new UpdateCountRaftListener(counter, snapshotDataStorage);
RaftGroupOptions opts = groupOptions(raftServer)
.snapshotStorageFactory(new SnapshotInMemoryStorageFactory(snapshotMetaStorage));
raftServer.startRaftNode(new RaftNodeId(grpId, serverPeer), initialMembersConf, listener, opts);
}, opts -> {});
}
for (AtomicInteger counter : counters.values()) {
assertTrue(waitForCondition(() -> counter.get() == 3, 10_000));
}
}
/**
* Builds a snapshot path by the peer address of RAFT node.
*/
private Path snapshotPath(Peer peer, ReplicationGroupId groupId) {
JraftServerImpl server = servers.stream()
.filter(s -> s.localPeers(groupId).contains(peer))
.findAny()
.orElseThrow();
return server.getServerDataPath(new RaftNodeId(groupId, peer)).resolve("snapshot");
}
/**
* Returns {@code true} if thread is related to timers.
*
* @param thread The thread.
* @return {@code True} if a timer thread.
*/
private boolean isTimer(Thread thread) {
String name = thread.getName();
return name.contains("ElectionTimer") || name.contains("VoteTimer")
|| name.contains("StepDownTimer") || name.contains("SnapshotTimer") || name.contains("Node-Scheduler");
}
/**
* Returns {@code true} if a raft group has elected a leader for a some term.
*
* @param grpId Group id.
* @return {@code True} if a leader is elected.
*/
private boolean hasLeader(TestReplicationGroupId grpId) {
return getRaftNodes(grpId)
.anyMatch(node -> {
var fsm = (StateMachineAdapter) node.getOptions().getFsm();
return node.isLeader() && fsm.getLeaderTerm() == node.getCurrentTerm();
});
}
private Stream<NodeImpl> getRaftNodes(TestReplicationGroupId grpId) {
return servers.stream()
.flatMap(s -> s.localPeers(grpId).stream()
.map(peer -> new RaftNodeId(grpId, peer))
.map(s::raftGroupService))
.map(s -> ((NodeImpl) s.getRaftNode()));
}
/**
* Do test follower catch up.
*
* @param snapshot {@code True} to create snapshot on leader and truncate log.
* @param cleanDir {@code True} to clean persistent state on follower before restart.
* @throws Exception If failed.
*/
private void doTestFollowerCatchUp(boolean snapshot, boolean cleanDir) throws Exception {
startCluster();
RaftGroupService client1 = clients.get(0);
RaftGroupService client2 = clients.get(1);
client1.refreshLeader().get();
client2.refreshLeader().get();
Peer leader1 = client1.leader();
assertNotNull(leader1);
Peer leader2 = client2.leader();
assertNotNull(leader2);
applyIncrements(client1, 0, 10);
applyIncrements(client2, 0, 20);
// First snapshot will not truncate logs.
client1.snapshot(leader1).get();
client2.snapshot(leader2).get();
JraftServerImpl toStop = null;
// Find the follower for both groups.
for (JraftServerImpl server : servers) {
List<Peer> peers = server.localPeers(COUNTER_GROUP_0);
if (!peers.contains(leader1) && !peers.contains(leader2)) {
toStop = server;
break;
}
}
var raftNodeId0 = new RaftNodeId(COUNTER_GROUP_0, toStop.localPeers(COUNTER_GROUP_0).get(0));
var raftNodeId1 = new RaftNodeId(COUNTER_GROUP_1, toStop.localPeers(COUNTER_GROUP_1).get(0));
Path serverDataPath0 = toStop.getServerDataPath(raftNodeId0);
Path serverDataPath1 = toStop.getServerDataPath(raftNodeId1);
final int stopIdx = servers.indexOf(toStop);
toStop.stopRaftNode(raftNodeId0);
toStop.stopRaftNode(raftNodeId1);
toStop.beforeNodeStop();
assertThat(toStop.stopAsync(), willCompleteSuccessfully());
applyIncrements(client1, 11, 20);
applyIncrements(client2, 21, 30);
if (snapshot) {
client1.snapshot(leader1).get();
client2.snapshot(leader2).get();
}
if (cleanDir) {
IgniteUtils.deleteIfExists(serverDataPath0);
IgniteUtils.deleteIfExists(serverDataPath1);
}
var svc2 = startServer(stopIdx, r -> {
String localNodeName = r.clusterService().topologyService().localMember().name();
Peer serverPeer = initialMembersConf.peer(localNodeName);
r.startRaftNode(new RaftNodeId(COUNTER_GROUP_0, serverPeer), initialMembersConf, listenerFactory.get(), groupOptions(r));
r.startRaftNode(new RaftNodeId(COUNTER_GROUP_1, serverPeer), initialMembersConf, listenerFactory.get(), groupOptions(r));
}, opts -> {});
waitForCondition(() -> validateStateMachine(sum(20), svc2, COUNTER_GROUP_0), 5_000);
waitForCondition(() -> validateStateMachine(sum(30), svc2, COUNTER_GROUP_1), 5_000);
svc2.stopRaftNodes(COUNTER_GROUP_0);
svc2.stopRaftNodes(COUNTER_GROUP_1);
svc2.beforeNodeStop();
assertThat(svc2.stopAsync(), willCompleteSuccessfully());
var svc3 = startServer(stopIdx, r -> {
String localNodeName = r.clusterService().topologyService().localMember().name();
Peer serverPeer = initialMembersConf.peer(localNodeName);
r.startRaftNode(new RaftNodeId(COUNTER_GROUP_0, serverPeer), initialMembersConf, listenerFactory.get(), groupOptions(r));
r.startRaftNode(new RaftNodeId(COUNTER_GROUP_1, serverPeer), initialMembersConf, listenerFactory.get(), groupOptions(r));
}, opts -> {});
waitForCondition(() -> validateStateMachine(sum(20), svc3, COUNTER_GROUP_0), 5_000);
waitForCondition(() -> validateStateMachine(sum(30), svc3, COUNTER_GROUP_1), 5_000);
}
/**
* Applies increments.
*
* @param client The client
* @param start Start element.
* @param stop Stop element.
* @return The counter value.
* @throws Exception If failed.
*/
private long applyIncrements(RaftGroupService client, int start, int stop) throws Exception {
long val = 0;
for (int i = start; i <= stop; i++) {
val = client.<Long>run(incrementAndGetCommand(i)).get();
logger().info("Val={}, i={}", val, i);
}
return val;
}
/**
* Calculates a progression sum.
*
* @param until Until value.
* @return The sum.
*/
private static long sum(long until) {
return (1 + until) * until / 2;
}
/**
* Validates state machine.
*
* @param expected Expected value.
* @param server The server.
* @param groupId Group id.
* @return Validation result.
*/
private static boolean validateStateMachine(long expected, JraftServerImpl server, TestReplicationGroupId groupId) {
Peer serverPeer = server.localPeers(groupId).get(0);
org.apache.ignite.raft.jraft.RaftGroupService svc = server.raftGroupService(new RaftNodeId(groupId, serverPeer));
var fsm0 = (JraftServerImpl.DelegatingStateMachine) svc.getRaftNode().getOptions().getFsm();
return expected == ((CounterListener) fsm0.getListener()).value();
}
@Test
public void testReadIndex() throws Exception {
startCluster();
long index = clients.get(0).readIndex().join();
clients.get(0).<Long>run(incrementAndGetCommand(1)).get();
assertEquals(index + 1, clients.get(0).readIndex().join());
}
private static RaftGroupOptions groupOptions(RaftServer raftServer) {
return defaults().commandsMarshaller(new ThreadLocalOptimizedMarshaller(raftServer.clusterService().serializationRegistry()));
}
}