blob: b5f63725b8c32a9f43ae76e2ce2702775235bc16 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.raft.server;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.ignite.internal.raft.server.RaftServer;
import org.apache.ignite.internal.raft.server.impl.JRaftServerImpl;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.raft.client.Peer;
import org.apache.ignite.raft.client.WriteCommand;
import org.apache.ignite.raft.client.service.CommandClosure;
import org.apache.ignite.raft.client.service.RaftGroupService;
import org.apache.ignite.raft.jraft.core.NodeImpl;
import org.apache.ignite.raft.jraft.option.NodeOptions;
import org.apache.ignite.raft.jraft.rpc.impl.RaftException;
import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupServiceImpl;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtendWith;
import static org.apache.ignite.raft.jraft.core.State.STATE_ERROR;
import static org.apache.ignite.raft.jraft.core.State.STATE_LEADER;
import static org.apache.ignite.raft.jraft.test.TestUtils.getLocalAddress;
import static org.apache.ignite.raft.jraft.test.TestUtils.waitForCondition;
import static org.apache.ignite.raft.jraft.test.TestUtils.waitForTopology;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
/**
* Jraft server.
*/
@ExtendWith(WorkDirectoryExtension.class)
class ITJRaftCounterServerTest extends RaftServerAbstractTest {
/**
* The logger.
*/
private static final IgniteLogger LOG = IgniteLogger.forClass(ITJRaftCounterServerTest.class);
/**
* Counter group name 0.
*/
private static final String COUNTER_GROUP_0 = "counter0";
/**
* Counter group name 1.
*/
private static final String COUNTER_GROUP_1 = "counter1";
/**
* The server port offset.
*/
private static final int PORT = 5003;
/**
* The client port offset.
*/
private static final int CLIENT_PORT = 6003;
/**
* Initial configuration.
*/
private static final List<Peer> INITIAL_CONF = IntStream.rangeClosed(0, 2)
.mapToObj(i -> new NetworkAddress(getLocalAddress(), PORT + i))
.map(Peer::new)
.collect(Collectors.toUnmodifiableList());
/**
* Listener factory.
*/
private Supplier<CounterListener> listenerFactory = CounterListener::new;
/**
* Servers list.
*/
private final List<JRaftServerImpl> servers = new ArrayList<>();
/**
* Clients list.
*/
private final List<RaftGroupService> clients = new ArrayList<>();
/**
* Data path.
*/
@WorkDirectory
private Path dataPath;
/** */
@BeforeEach
void before(TestInfo testInfo) {
LOG.info(">>>>>>>>>>>>>>> Start test method: {}", testInfo.getTestMethod().orElseThrow().getName());
}
/** */
@AfterEach
@Override protected void after(TestInfo testInfo) throws Exception {
LOG.info("Start client shutdown");
Iterator<RaftGroupService> iterClients = clients.iterator();
while (iterClients.hasNext()) {
RaftGroupService client = iterClients.next();
iterClients.remove();
client.shutdown();
}
LOG.info("Start server shutdown servers={}", servers.size());
Iterator<JRaftServerImpl> iterSrv = servers.iterator();
while (iterSrv.hasNext()) {
JRaftServerImpl server = iterSrv.next();
iterSrv.remove();
server.stop();
}
super.after(testInfo);
LOG.info(">>>>>>>>>>>>>>> End test method: {}", testInfo.getTestMethod().orElseThrow().getName());
}
/**
* @param idx The index.
* @return Raft server instance.
*/
private JRaftServerImpl startServer(int idx, Consumer<RaftServer> clo) {
var addr = new NetworkAddress(getLocalAddress(), PORT);
ClusterService service = clusterService("server" + idx, PORT + idx, List.of(addr), true);
JRaftServerImpl server = new JRaftServerImpl(service, dataPath) {
@Override public void stop() {
servers.remove(this);
super.stop();
service.stop();
}
};
server.start();
clo.accept(server);
servers.add(server);
assertTrue(waitForTopology(service, servers.size(), 15_000));
return server;
}
/**
* @param groupId Group id.
* @return The client.
* @throws Exception If failed.
*/
private RaftGroupService startClient(String groupId) throws Exception {
var addr = new NetworkAddress(getLocalAddress(), PORT);
ClusterService clientNode = clusterService(
"client_" + groupId + "_", CLIENT_PORT + clients.size(), List.of(addr), true);
RaftGroupService client = RaftGroupServiceImpl.start(groupId, clientNode, FACTORY, 10_000,
List.of(new Peer(addr)), false, 200).get(3, TimeUnit.SECONDS);
clients.add(client);
return client;
}
/**
* Starts a cluster for the test.
*
* @throws Exception If failed.
*/
private void startCluster() throws Exception {
for (int i = 0; i < 3; i++) {
startServer(i, raftServer -> {
raftServer.startRaftGroup(COUNTER_GROUP_0, listenerFactory.get(), INITIAL_CONF);
raftServer.startRaftGroup(COUNTER_GROUP_1, listenerFactory.get(), INITIAL_CONF);
});
}
startClient(COUNTER_GROUP_0);
startClient(COUNTER_GROUP_1);
}
/**
* Checks that the number of Disruptor threads does not depend on count started RAFT nodes.
*/
@Test
public void testDisruptorThreadsCount() {
startServer(0, raftServer -> {
raftServer.startRaftGroup("test_raft_group", listenerFactory.get(), INITIAL_CONF);
});
Set<Thread> threads = getAllDisruptorCurrentThreads();
int threadsBefore = threads.size();
Set<String> threadNamesBefore = threads.stream().map(Thread::getName).collect(Collectors.toSet());
assertEquals(NodeOptions.DEFAULT_STRIPES * 4/*services*/, threadsBefore, "Started thread names: " + threadNamesBefore);
servers.forEach(srv -> {
for (int i = 0; i < 10; i++)
srv.startRaftGroup("test_raft_group_" + i, listenerFactory.get(), INITIAL_CONF);
});
threads = getAllDisruptorCurrentThreads();
int threadsAfter = threads.size();
Set<String> threadNamesAfter = threads.stream().map(Thread::getName).collect(Collectors.toSet());
threadNamesAfter.removeAll(threadNamesBefore);
assertEquals(threadsBefore, threadsAfter, "Difference: " + threadNamesAfter);
}
/**
* Get a set of Disruptor threads for the well known JRaft services.
*
* @return Set of Disruptor threads.
*/
@NotNull private Set<Thread> getAllDisruptorCurrentThreads() {
return Thread.getAllStackTraces().keySet().stream().filter(t ->
t.getName().contains("JRaft-FSMCaller-Disruptor") ||
t.getName().contains("JRaft-NodeImpl-Disruptor") ||
t.getName().contains("JRaft-ReadOnlyService-Disruptor") ||
t.getName().contains("JRaft-LogManager-Disruptor"))
.collect(Collectors.toSet());
}
/**
*
*/
@Test
public void testRefreshLeader() throws Exception {
startCluster();
Peer leader = clients.get(0).leader();
assertNull(leader);
clients.get(0).refreshLeader().get();
assertNotNull(clients.get(0).leader());
leader = clients.get(1).leader();
assertNull(leader);
clients.get(1).refreshLeader().get();
assertNotNull(clients.get(1).leader());
}
/**
* @throws Exception
*/
@Test
public void testCounterCommandListener() throws Exception {
startCluster();
RaftGroupService client1 = clients.get(0);
RaftGroupService client2 = clients.get(1);
client1.refreshLeader().get();
client2.refreshLeader().get();
assertNotNull(client1.leader());
assertNotNull(client2.leader());
assertEquals(2, client1.<Long>run(new IncrementAndGetCommand(2)).get());
assertEquals(2, client1.<Long>run(new GetValueCommand()).get());
assertEquals(3, client1.<Long>run(new IncrementAndGetCommand(1)).get());
assertEquals(3, client1.<Long>run(new GetValueCommand()).get());
assertEquals(4, client2.<Long>run(new IncrementAndGetCommand(4)).get());
assertEquals(4, client2.<Long>run(new GetValueCommand()).get());
assertEquals(7, client2.<Long>run(new IncrementAndGetCommand(3)).get());
assertEquals(7, client2.<Long>run(new GetValueCommand()).get());
}
@Test
public void testCreateSnapshot() throws Exception {
startCluster();
RaftGroupService client1 = clients.get(0);
RaftGroupService client2 = clients.get(1);
client1.refreshLeader().get();
client2.refreshLeader().get();
JRaftServerImpl server = servers.get(0);
long val = applyIncrements(client1, 1, 10);
assertEquals(sum(10), val);
client1.snapshot(server.localPeer(COUNTER_GROUP_0)).get();
long val2 = applyIncrements(client2, 1, 20);
assertEquals(sum(20), val2);
client2.snapshot(server.localPeer(COUNTER_GROUP_1)).get();
Path snapshotDir0 = server.getServerDataPath(COUNTER_GROUP_0).resolve("snapshot");
assertEquals(1L, countFiles(snapshotDir0));
Path snapshotDir1 = server.getServerDataPath(COUNTER_GROUP_1).resolve("snapshot");
assertEquals(1L, countFiles(snapshotDir1));
}
/**
* Returns the number of files in the given directory (non-recursive).
*/
private static long countFiles(Path dir) throws IOException {
try (Stream<Path> files = Files.list(dir)) {
return files.count();
}
}
@Test
public void testCreateSnapshotGracefulFailure() throws Exception {
listenerFactory = () -> new CounterListener() {
@Override public void onSnapshotSave(Path path, Consumer<Throwable> doneClo) {
doneClo.accept(new IgniteInternalException("Very bad"));
}
};
startCluster();
RaftGroupService client1 = clients.get(0);
RaftGroupService client2 = clients.get(1);
client1.refreshLeader().get();
client2.refreshLeader().get();
RaftServer server = servers.get(0);
Peer peer = server.localPeer(COUNTER_GROUP_0);
long val = applyIncrements(client1, 1, 10);
assertEquals(sum(10), val);
try {
client1.snapshot(peer).get();
fail();
}
catch (Exception e) {
assertTrue(e.getCause() instanceof RaftException);
}
}
@Test
public void testCreateSnapshotAbnormalFailure() throws Exception {
listenerFactory = () -> new CounterListener() {
@Override public void onSnapshotSave(Path path, Consumer<Throwable> doneClo) {
doneClo.accept(new IgniteInternalException("Very bad"));
}
};
startCluster();
RaftGroupService client1 = clients.get(0);
RaftGroupService client2 = clients.get(1);
client1.refreshLeader().get();
client2.refreshLeader().get();
long val = applyIncrements(client1, 1, 10);
assertEquals(sum(10), val);
Peer peer = servers.get(0).localPeer(COUNTER_GROUP_0);
try {
client1.snapshot(peer).get();
fail();
}
catch (Exception e) {
assertTrue(e.getCause() instanceof RaftException);
}
}
/** Tests if a raft group become unavaiable in case of a critical error */
@Test
public void testApplyWithFailure() throws Exception {
listenerFactory = () -> new CounterListener() {
@Override public void onWrite(Iterator<CommandClosure<WriteCommand>> iterator) {
Iterator<CommandClosure<WriteCommand>> wrapper = new Iterator<>() {
@Override public boolean hasNext() {
return iterator.hasNext();
}
@Override public CommandClosure<WriteCommand> next() {
CommandClosure<WriteCommand> cmd = iterator.next();
IncrementAndGetCommand command = (IncrementAndGetCommand)cmd.command();
if (command.delta() == 10)
throw new IgniteInternalException("Very bad");
return cmd;
}
};
super.onWrite(wrapper);
}
};
startCluster();
RaftGroupService client1 = clients.get(0);
RaftGroupService client2 = clients.get(1);
client1.refreshLeader().get();
client2.refreshLeader().get();
NodeImpl leader = servers.stream().map(s -> ((NodeImpl)s.raftGroupService(COUNTER_GROUP_0).getRaftNode())).
filter(n -> n.getState() == STATE_LEADER).findFirst().orElse(null);
assertNotNull(leader);
long val1 = applyIncrements(client1, 1, 5);
long val2 = applyIncrements(client2, 1, 7);
assertEquals(sum(5), val1);
assertEquals(sum(7), val2);
long val3 = applyIncrements(client1, 6, 9);
assertEquals(sum(9), val3);
try {
client1.<Long>run(new IncrementAndGetCommand(10)).get();
fail();
}
catch (Exception e) {
// Expected.
Throwable cause = e.getCause();
assertTrue(cause instanceof RaftException);
}
NodeImpl finalLeader = leader;
waitForCondition(() -> finalLeader.getState() == STATE_ERROR, 5_000);
// Client can't switch to new leader, because only one peer in the list.
try {
client1.<Long>run(new IncrementAndGetCommand(11)).get();
}
catch (Exception e) {
boolean isValid = e.getCause() instanceof TimeoutException;
if (!isValid)
LOG.error("Got unexpected exception", e);
assertTrue(isValid, "Expecting the timeout");
}
}
/** Tests if a follower is catching up the leader after restarting. */
@Test
public void testFollowerCatchUpFromLog() throws Exception {
doTestFollowerCatchUp(false, true);
}
/**
*
*/
@Test
public void testFollowerCatchUpFromSnapshot() throws Exception {
doTestFollowerCatchUp(true, true);
}
/**
*
*/
@Test
public void testFollowerCatchUpFromLog2() throws Exception {
doTestFollowerCatchUp(false, false);
}
/**
*
*/
@Test
@Disabled("https://issues.apache.org/jira/browse/IGNITE-15156")
public void testFollowerCatchUpFromSnapshot2() throws Exception {
doTestFollowerCatchUp(true, false);
}
/**
* @param snapshot {@code True} to create snapshot on leader and truncate log.
* @param cleanDir {@code True} to clean persistent state on follower before restart.
* @throws Exception If failed.
*/
private void doTestFollowerCatchUp(boolean snapshot, boolean cleanDir) throws Exception {
startCluster();
RaftGroupService client1 = clients.get(0);
RaftGroupService client2 = clients.get(1);
client1.refreshLeader().get();
client2.refreshLeader().get();
Peer leader1 = client1.leader();
assertNotNull(leader1);
Peer leader2 = client2.leader();
assertNotNull(leader2);
applyIncrements(client1, 0, 10);
applyIncrements(client2, 0, 20);
// First snapshot will not truncate logs.
client1.snapshot(leader1).get();
client2.snapshot(leader2).get();
JRaftServerImpl toStop = null;
// Find the follower for both groups.
for (JRaftServerImpl server : servers) {
Peer peer = server.localPeer(COUNTER_GROUP_0);
if (!peer.equals(leader1) && !peer.equals(leader2)) {
toStop = server;
break;
}
}
Path serverDataPath0 = toStop.getServerDataPath(COUNTER_GROUP_0);
Path serverDataPath1 = toStop.getServerDataPath(COUNTER_GROUP_1);
int stopIdx = servers.indexOf(toStop);
toStop.stop();
applyIncrements(client1, 11, 20);
applyIncrements(client2, 21, 30);
if (snapshot) {
client1.snapshot(leader1).get();
client2.snapshot(leader2).get();
}
if (cleanDir) {
IgniteUtils.deleteIfExists(serverDataPath0);
IgniteUtils.deleteIfExists(serverDataPath1);
}
var svc2 = startServer(stopIdx, r -> {
r.startRaftGroup(COUNTER_GROUP_0, listenerFactory.get(), INITIAL_CONF);
r.startRaftGroup(COUNTER_GROUP_1, listenerFactory.get(), INITIAL_CONF);
});
waitForCondition(() -> validateStateMachine(sum(20), svc2, COUNTER_GROUP_0), 5_000);
waitForCondition(() -> validateStateMachine(sum(30), svc2, COUNTER_GROUP_1), 5_000);
svc2.stop();
var svc3 = startServer(stopIdx, r -> {
r.startRaftGroup(COUNTER_GROUP_0, listenerFactory.get(), INITIAL_CONF);
r.startRaftGroup(COUNTER_GROUP_1, listenerFactory.get(), INITIAL_CONF);
});
waitForCondition(() -> validateStateMachine(sum(20), svc3, COUNTER_GROUP_0), 5_000);
waitForCondition(() -> validateStateMachine(sum(30), svc3, COUNTER_GROUP_1), 5_000);
}
/**
* @param client The client
* @param start Start element.
* @param stop Stop element.
* @return The counter value.
* @throws Exception If failed.
*/
private static long applyIncrements(RaftGroupService client, int start, int stop) throws Exception {
long val = 0;
for (int i = start; i <= stop; i++) {
val = client.<Long>run(new IncrementAndGetCommand(i)).get();
LOG.info("Val={}, i={}", val, i);
}
return val;
}
/**
* Calculates a progression sum.
*
* @param until Until value.
* @return The sum.
*/
private static long sum(long until) {
return (1 + until) * until / 2;
}
/**
* @param expected Expected value.
* @param server The server.
* @param groupId Group id.
* @return Validation result.
*/
private static boolean validateStateMachine(long expected, JRaftServerImpl server, String groupId) {
org.apache.ignite.raft.jraft.RaftGroupService svc = server.raftGroupService(groupId);
JRaftServerImpl.DelegatingStateMachine fsm0 =
(JRaftServerImpl.DelegatingStateMachine)svc.getRaftNode().getOptions().getFsm();
return expected == ((CounterListener)fsm0.getListener()).value();
}
}