blob: 7e8330f1654f3e636e8d05b5b46e9bb551b5e87d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.raft.service;
import static java.util.stream.Collectors.collectingAndThen;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
import static org.apache.ignite.internal.raft.server.RaftGroupOptions.defaults;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.internal.util.IgniteUtils.stopAsync;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;
import java.util.function.Function;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.StaticNodeFinder;
import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.raft.Marshaller;
import org.apache.ignite.internal.raft.PeersAndLearners;
import org.apache.ignite.internal.raft.RaftGroupServiceImpl;
import org.apache.ignite.internal.raft.RaftNodeId;
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
import org.apache.ignite.internal.raft.server.RaftServer;
import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
import org.apache.ignite.internal.replicator.TestReplicationGroupId;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.raft.jraft.RaftMessagesFactory;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
/**
* Base class for persistent raft group's snapshots tests.
*
* @param <T> Type of the raft group listener.
*/
@ExtendWith(ConfigurationExtension.class)
public abstract class ItAbstractListenerSnapshotTest<T extends RaftGroupListener> extends IgniteAbstractTest {
private static final IgniteLogger LOG = Loggers.forClass(ItAbstractListenerSnapshotTest.class);
/** Starting server port. */
private static final int PORT = 5003;
/** Starting client port. */
private static final int CLIENT_PORT = 6003;
/** Factory. */
private static final RaftMessagesFactory FACTORY = new RaftMessagesFactory();
/** Initial Raft configuration. */
private PeersAndLearners initialMemberConf;
/** Cluster. */
private final List<ClusterService> cluster = new ArrayList<>();
/** Servers. */
private final List<JraftServerImpl> servers = new ArrayList<>();
/** Clients. */
private final List<RaftGroupService> clients = new ArrayList<>();
/** Executor for raft group services. */
private ScheduledExecutorService executor;
@InjectConfiguration
private RaftConfiguration raftConfiguration;
/**
* Create executor for raft group services.
*/
@BeforeEach
public void beforeTest(TestInfo testInfo) {
executor = new ScheduledThreadPoolExecutor(20, new NamedThreadFactory(Loza.CLIENT_POOL_NAME, LOG));
initialMemberConf = IntStream.range(0, nodes())
.mapToObj(i -> testNodeName(testInfo, PORT + i))
.collect(collectingAndThen(toSet(), PeersAndLearners::fromConsistentIds));
}
/**
* Shutdown raft server, executor for raft group services and stop all cluster nodes.
*
* @throws Exception If failed to shutdown raft server,
*/
@AfterEach
public void afterTest() throws Exception {
Stream<AutoCloseable> stopRaftGroups = servers.stream().map(s -> () -> s.stopRaftNodes(raftGroupId()));
Stream<AutoCloseable> shutdownClients = clients.stream().map(c -> c::shutdown);
Stream<AutoCloseable> stopExecutor = Stream.of(() -> IgniteUtils.shutdownAndAwaitTermination(executor, 10, TimeUnit.SECONDS));
Stream<AutoCloseable> beforeNodeStop = Stream.concat(servers.stream(), cluster.stream()).map(c -> c::beforeNodeStop);
List<IgniteComponent> components = Stream.concat(servers.stream(), cluster.stream()).collect(toList());
Stream<AutoCloseable> nodeStop = Stream.of(() -> assertThat(stopAsync(components), willCompleteSuccessfully()));
IgniteUtils.closeAll(
Stream.of(stopRaftGroups, shutdownClients, stopExecutor, beforeNodeStop, nodeStop).flatMap(Function.identity())
);
}
/**
* Nodes count.
*
* @return Nodes count.
*/
protected int nodes() {
return 3;
}
/**
* Returns a list of started servers.
*/
protected List<JraftServerImpl> servers() {
return List.copyOf(servers);
}
/**
* Test parameters for {@link #testSnapshot}.
*/
private static class TestData {
/**
* {@code true} if the raft group's persistence must be cleared before the follower's restart, {@code false} otherwise.
*/
private final boolean deleteFolder;
/**
* {@code true} if test should interact with the raft group after a snapshot has been captured. In this case, the follower node
* should catch up with the leader using raft log.
*/
private final boolean interactAfterSnapshot;
/**
* Constructor.
*
* @param deleteFolder {@code true} if the raft group's persistence must be cleared before the follower's restart.
* @param interactAfterSnapshot {@code true} if test should interact with the raft group after a snapshot has been captured.
* In this case, the follower node should catch up with the leader using raft log.
*/
private TestData(boolean deleteFolder, boolean interactAfterSnapshot) {
this.deleteFolder = deleteFolder;
this.interactAfterSnapshot = interactAfterSnapshot;
}
/** {@inheritDoc} */
@Override
public String toString() {
return String.format("deleteFolder=%s, interactAfterSnapshot=%s", deleteFolder, interactAfterSnapshot);
}
}
/**
* Returns {@link #testSnapshot} parameters.
*/
private static List<TestData> testSnapshotData() {
return List.of(
new TestData(false, false),
new TestData(true, true),
new TestData(false, true),
new TestData(true, false)
);
}
/**
* Tests that a joining raft node successfully restores a snapshot.
*
* @param testData Test parameters.
* @param testInfo Test info.
* @throws Exception If failed.
*/
@ParameterizedTest
@MethodSource("testSnapshotData")
public void testSnapshot(TestData testData, TestInfo testInfo) throws Exception {
// Set up a raft group service
RaftGroupService service = prepareRaftGroup(testInfo);
CompletableFuture<Void> refreshLeaderFuture = service.refreshLeader()
.thenCompose(v -> {
if (service.leader() == null) {
return service.refreshLeader();
} else {
return nullCompletedFuture();
}
});
assertThat(refreshLeaderFuture, willCompleteSuccessfully());
// Select any node that is not the leader of the group
JraftServerImpl toStop = servers.stream()
.filter(server -> !server.localPeers(raftGroupId()).contains(service.leader()))
.findAny()
.orElseThrow();
beforeFollowerStop(service, toStop);
var nodeId = new RaftNodeId(raftGroupId(), toStop.localPeers(raftGroupId()).get(0));
// Get the path to that node's raft directory
Path serverDataPath = toStop.getServerDataPath(nodeId);
// Get the path to that node's RocksDB key-value storage
Path dbPath = getListenerPersistencePath(getListener(toStop, raftGroupId()), toStop);
int stopIdx = servers.indexOf(toStop);
// Remove that node from the list of servers
servers.remove(stopIdx);
// Shutdown that node
toStop.stopRaftNode(nodeId);
toStop.beforeNodeStop();
assertThat(toStop.stopAsync(), willCompleteSuccessfully());
// Create a snapshot of the raft group
service.snapshot(service.leader()).get();
afterFollowerStop(service, toStop, stopIdx);
// Create another raft snapshot
service.snapshot(service.leader()).get();
if (testData.deleteFolder) {
// Delete a stopped node's raft directory and key-value storage directory
// to check if snapshot could be restored by the restarted node
IgniteUtils.deleteIfExists(dbPath);
IgniteUtils.deleteIfExists(serverDataPath);
}
if (testData.interactAfterSnapshot) {
// Interact with the raft group after the second snapshot to check if the restarted node would see these
// interactions after restoring a snapshot and raft logs
afterSnapshot(service);
}
// Restart the node
JraftServerImpl restarted = startServer(testInfo, stopIdx);
assertTrue(waitForTopology(cluster.get(0), servers.size(), 3_000));
BooleanSupplier closure = snapshotCheckClosure(restarted, testData.interactAfterSnapshot);
boolean success = waitForCondition(closure, 10_000);
assertTrue(success);
}
/**
* Interacts with the raft group before a follower is stopped.
*
* @param service Raft group service.
* @param server Raft server that is going to be stopped.
* @throws Exception If failed.
*/
public abstract void beforeFollowerStop(RaftGroupService service, RaftServer server) throws Exception;
/**
* Interacts with the raft group after a follower is stopped.
*
* @param service Raft group service.
* @param server Raft server that has been stopped.
* @param stoppedNodeIndex index of the stopped node.
* @throws Exception If failed.
*/
public abstract void afterFollowerStop(RaftGroupService service, RaftServer server, int stoppedNodeIndex) throws Exception;
/**
* Interacts with a raft group after the leader has captured a snapshot.
*
* @param service Raft group service.
* @throws Exception If failed.
*/
public abstract void afterSnapshot(RaftGroupService service) throws Exception;
/**
* Creates a closure that will be executed periodically to check if the snapshot and (conditionally on the {@link
* TestData#interactAfterSnapshot}) the raft log was successfully restored by the follower node.
*
* @param restarted Restarted follower node.
* @param interactedAfterSnapshot {@code true} whether raft group was interacted with after the snapshot operation.
* @return Closure.
*/
public abstract BooleanSupplier snapshotCheckClosure(JraftServerImpl restarted, boolean interactedAfterSnapshot);
/**
* Returns path to the group's persistence.
*
* @param listener Raft group listener.
* @param server Raft server, where the listener has been registered.
* @return Path to the group's persistence.
*/
public abstract Path getListenerPersistencePath(T listener, RaftServer server);
/**
* Creates raft group listener.
*
* @param service The cluster service.
* @param listenerPersistencePath Path to storage persistent data.
* @param index Index of node for which the listener is created.
* @return Raft group listener.
*/
public abstract RaftGroupListener createListener(ClusterService service, Path listenerPersistencePath, int index);
/**
* Returns raft group id for tests.
*/
public abstract TestReplicationGroupId raftGroupId();
/**
* Get the raft group listener from the jraft server.
*
* @param server Server.
* @param grpId Raft group id.
* @return Raft group listener.
*/
protected T getListener(JraftServerImpl server, TestReplicationGroupId grpId) {
var nodeId = new RaftNodeId(grpId, server.localPeers(grpId).get(0));
org.apache.ignite.raft.jraft.RaftGroupService svc = server.raftGroupService(nodeId);
JraftServerImpl.DelegatingStateMachine fsm =
(JraftServerImpl.DelegatingStateMachine) svc.getRaftNode().getOptions().getFsm();
return (T) fsm.getListener();
}
/**
* Wait for topology.
*
* @param cluster The cluster.
* @param exp Expected count.
* @param timeout The timeout in millis.
* @return {@code True} if topology size is equal to expected.
*/
private boolean waitForTopology(ClusterService cluster, int exp, int timeout) throws InterruptedException {
return waitForCondition(() -> cluster.topologyService().allMembers().size() >= exp, timeout);
}
/**
* Returns local address.
*/
private static String getLocalAddress() {
try {
return InetAddress.getLocalHost().getHostAddress();
} catch (UnknownHostException e) {
throw new RuntimeException(e);
}
}
/**
* Creates a cluster service.
*/
private ClusterService clusterService(TestInfo testInfo, int port, NetworkAddress otherPeer) {
var network = ClusterServiceTestUtils.clusterService(
testInfo,
port,
new StaticNodeFinder(List.of(otherPeer))
);
assertThat(network.startAsync(), willCompleteSuccessfully());
cluster.add(network);
return network;
}
/**
* Starts a raft server.
*
* @param testInfo Test info.
* @param idx Server index (affects port of the server).
* @return Server.
*/
private JraftServerImpl startServer(TestInfo testInfo, int idx) {
var addr = new NetworkAddress(getLocalAddress(), PORT);
ClusterService service = clusterService(testInfo, PORT + idx, addr);
Path jraft = workDir.resolve("jraft" + idx);
JraftServerImpl server = new JraftServerImpl(service, jraft, raftConfiguration) {
@Override
public CompletableFuture<Void> stopAsync() {
return IgniteUtils.stopAsync(super::stopAsync, service::stopAsync);
}
};
assertThat(server.startAsync(), willCompleteSuccessfully());
Path listenerPersistencePath = workDir.resolve("db" + idx);
servers.add(server);
server.startRaftNode(
new RaftNodeId(raftGroupId(), initialMemberConf.peer(service.topologyService().localMember().name())),
initialMemberConf,
createListener(service, listenerPersistencePath, idx),
defaults().commandsMarshaller(commandsMarshaller(service))
);
return server;
}
/**
* Prepares raft group service by instantiating raft servers and a client.
*
* @return Raft group service instance.
*/
private RaftGroupService prepareRaftGroup(TestInfo testInfo) throws Exception {
for (int i = 0; i < initialMemberConf.peers().size(); i++) {
startServer(testInfo, i);
}
assertTrue(waitForTopology(cluster.get(0), servers.size(), 3_000));
return startClient(testInfo, raftGroupId(), new NetworkAddress(getLocalAddress(), PORT));
}
protected abstract Marshaller commandsMarshaller(ClusterService clusterService);
/**
* Starts a client with a specific address.
*
* @return The service.
*/
private RaftGroupService startClient(TestInfo testInfo, TestReplicationGroupId groupId, NetworkAddress addr) {
ClusterService clientNode = clusterService(testInfo, CLIENT_PORT + clients.size(), addr);
Marshaller commandsMarshaller = commandsMarshaller(clientNode);
CompletableFuture<RaftGroupService> clientFuture = RaftGroupServiceImpl
.start(groupId, clientNode, FACTORY, raftConfiguration, initialMemberConf, true, executor, commandsMarshaller);
assertThat(clientFuture, willCompleteSuccessfully());
clients.add(clientFuture.join());
return clientFuture.join();
}
}