blob: 60ebeafa68c0755b20a6397db15fef15e5a22b6d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.raft.client;
import static java.util.stream.Collectors.toSet;
import static org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.clusterService;
import static org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.findLocalAddresses;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.lang.IgniteBiTuple;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.StaticNodeFinder;
import org.apache.ignite.internal.raft.LeaderElectionListener;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.PeersAndLearners;
import org.apache.ignite.internal.raft.RaftNodeId;
import org.apache.ignite.internal.raft.TestRaftGroupListener;
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
import org.apache.ignite.internal.raft.server.RaftGroupOptions;
import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
import org.apache.ignite.internal.raft.util.ThreadLocalOptimizedMarshaller;
import org.apache.ignite.internal.replicator.TestReplicationGroupId;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.topology.LogicalTopologyServiceTestImpl;
import org.apache.ignite.internal.util.CollectionUtils;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.raft.jraft.RaftMessageGroup;
import org.apache.ignite.raft.jraft.RaftMessagesFactory;
import org.apache.ignite.raft.jraft.option.NodeOptions;
import org.apache.ignite.raft.jraft.rpc.CliRequests.LeaderChangeNotification;
import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtendWith;
/**
* Abstract class containing test scenarios for {@link TopologyAwareRaftGroupService} related test classes.
*/
@ExtendWith(ConfigurationExtension.class)
public abstract class AbstractTopologyAwareGroupServiceTest extends IgniteAbstractTest {
/** RAFT message factory. */
private static final RaftMessagesFactory FACTORY = new RaftMessagesFactory();
/** Base node port. */
private static final int PORT_BASE = 1234;
/** Wait timeout, in milliseconds. */
protected static final int WAIT_TIMEOUT_MILLIS = 10_000;
protected static final TestReplicationGroupId GROUP_ID = new TestReplicationGroupId("group_1");
/** RPC executor. */
protected final ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(
20,
NamedThreadFactory.create("Test", "Raft-Group-Client", log)
);
private final Map<NetworkAddress, ClusterService> clusterServices = new HashMap<>();
private final Map<NetworkAddress, JraftServerImpl> raftServers = new HashMap<>();
private final List<TopologyAwareRaftGroupService> raftClients = new ArrayList<>();
@InjectConfiguration
protected RaftConfiguration raftConfiguration;
@AfterEach
protected void tearDown() throws Exception {
IgniteUtils.shutdownAndAwaitTermination(executor, 10, TimeUnit.SECONDS);
stopCluster();
}
/**
* The method is called after every node of the cluster starts.
*
* @param nodeName Node name.
* @param clusterService Cluster service.
* @param dataPath Data path for raft node.
* @param peersAndLearners Peers and learners.
* @param eventsClientListener Raft events listener for client.
* @param logicalTopologyService Logical topology service.
*/
protected abstract void afterNodeStart(
String nodeName,
ClusterService clusterService,
Path dataPath,
PeersAndLearners peersAndLearners,
RaftGroupEventsClientListener eventsClientListener,
LogicalTopologyService logicalTopologyService
);
/**
* Checks the condition after cluster and raft clients initialization.
*
* @param leaderName Current leader name.
*/
protected abstract void afterClusterInit(String leaderName) throws InterruptedException;
/**
* Checks the condition after leader change.
*
* @param leaderName Current leader name.
*/
protected abstract void afterLeaderChange(String leaderName) throws InterruptedException;
/**
* The method is called after every node of the cluster stops.
*
* @param nodeName Node name.
*/
protected abstract void afterNodeStop(String nodeName) throws Exception;
@Test
public void testOneNodeReplicationGroup(TestInfo testInfo) throws Exception {
int nodes = 2;
TopologyAwareRaftGroupService raftClient = startCluster(
testInfo,
addr -> true,
nodes,
PORT_BASE + 1
);
assertNotNull(raftClient);
CompletableFuture<ClusterNode> leaderFut = new CompletableFuture<>();
subscribeLeader(raftClient, (node, term) -> leaderFut.complete(node), "New leader: {}");
ClusterNode leader = leaderFut.get(WAIT_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
assertNotNull(leader);
afterClusterInit(leader.name());
}
/**
* Starts the cluster, Raft clients and subscribes them to leader change notifications. Returns a pair of two topology aware Raft
* clients. First one will get all leader change notifications, including the initial one (when joining the cluster). Second
* one will get all notifications without the initial one.
*
* @param testInfo Test info.
* @param leaderRef Atomic reference where the current leader will be put by notification listener of the first raft client.
* @param leaderRefNoInitialNotify Atomic reference where the current leader will be put by notification listener of the second raft
* client.
* @return Raft clients.
*/
private IgniteBiTuple<TopologyAwareRaftGroupService, TopologyAwareRaftGroupService> startClusterWithClientsAndSubscribeToLeaderChange(
TestInfo testInfo,
AtomicReference<ClusterNode> leaderRef,
AtomicReference<ClusterNode> leaderRefNoInitialNotify
) throws Exception {
int nodes = 3;
assertTrue(clusterServices.isEmpty());
assertTrue(raftServers.isEmpty());
Predicate<NetworkAddress> isServerAddress = addr -> true;
// Start cluster and the first topology aware client.
TopologyAwareRaftGroupService raftClient = startCluster(
testInfo,
isServerAddress,
nodes,
PORT_BASE
);
assertNotNull(raftClient);
raftClient.refreshLeader().get();
// Start client service for the second client.
int clientPort = PORT_BASE + nodes + 1;
ClusterService clientClusterService =
clusterService(testInfo, clientPort, new StaticNodeFinder(findLocalAddresses(PORT_BASE, PORT_BASE + nodes)));
assertThat(clientClusterService.startAsync(), willCompleteSuccessfully());
// Start the second topology aware client, that should not get the initial leader notification.
TopologyAwareRaftGroupService raftClientNoInitialNotify = startTopologyAwareClient(
clientClusterService,
clusterServices,
isServerAddress,
nodes,
null,
new LogicalTopologyServiceTestImpl(clientClusterService),
false
);
List<NetworkAddress> clientAddress = findLocalAddresses(clientPort, clientPort + 1);
assertEquals(1, clientAddress.size());
clusterServices.put(clientAddress.get(0), clientClusterService);
AtomicInteger callsCount = new AtomicInteger();
// Subscribing clients.
subscribeLeader(raftClient, (node, term) -> leaderRef.set(node), "New leader: {}");
for (int i = 0; i < 2; i++) {
unsubscribeLeader(raftClientNoInitialNotify);
subscribeLeader(raftClientNoInitialNotify, (node, term) -> {
callsCount.incrementAndGet();
leaderRefNoInitialNotify.set(node);
}, "New leader (client without initial notification): {}");
}
// Checking invariants.
assertTrue(callsCount.get() <= 1);
assertTrue(waitForCondition(() -> leaderRef.get() != null, WAIT_TIMEOUT_MILLIS));
ClusterNode leader = leaderRef.get();
assertNotNull(leader);
log.info("Leader: " + leader);
afterClusterInit(leader.name());
raftClients.add(raftClientNoInitialNotify);
return new IgniteBiTuple<>(raftClient, raftClientNoInitialNotify);
}
@Test
public void testChangeLeaderWhenActualLeft(TestInfo testInfo) throws Exception {
AtomicReference<ClusterNode> leaderRef = new AtomicReference<>();
AtomicReference<ClusterNode> leaderRefNoInitialNotify = new AtomicReference<>();
IgniteBiTuple<TopologyAwareRaftGroupService, TopologyAwareRaftGroupService> raftClients =
startClusterWithClientsAndSubscribeToLeaderChange(
testInfo,
leaderRef,
leaderRefNoInitialNotify
);
TopologyAwareRaftGroupService raftClientNoInitialNotify = raftClients.get2();
ClusterNode leader = leaderRef.get();
assertNull(leaderRefNoInitialNotify.get());
// Forcing the leader change by stopping the actual leader.
var raftServiceToStop = raftServers.remove(new NetworkAddress("localhost", leader.address().port()));
raftServiceToStop.stopRaftNodes(GROUP_ID);
assertThat(raftServiceToStop.stopAsync(), willCompleteSuccessfully());
afterNodeStop(leader.name());
CompletableFuture<Void> stopFuture = clusterServices.remove(new NetworkAddress("localhost", leader.address().port())).stopAsync();
assertThat(stopFuture, willCompleteSuccessfully());
// Waiting for the notifications to check.
assertTrue(waitForCondition(() -> !leader.equals(leaderRef.get()), WAIT_TIMEOUT_MILLIS));
assertTrue(waitForCondition(() -> !leader.equals(leaderRefNoInitialNotify.get()), WAIT_TIMEOUT_MILLIS));
log.info("New Leader: " + leaderRef.get());
afterLeaderChange(leaderRef.get().name());
raftClientNoInitialNotify.refreshLeader().get();
assertEquals(raftClientNoInitialNotify.leader().consistentId(), leaderRef.get().name());
}
@Test
public void testChangeLeaderForce(TestInfo testInfo) throws Exception {
AtomicReference<ClusterNode> leaderRef = new AtomicReference<>();
AtomicReference<ClusterNode> leaderRefNoInitialNotify = new AtomicReference<>();
IgniteBiTuple<TopologyAwareRaftGroupService, TopologyAwareRaftGroupService> raftClients =
startClusterWithClientsAndSubscribeToLeaderChange(
testInfo,
leaderRef,
leaderRefNoInitialNotify
);
TopologyAwareRaftGroupService raftClient = raftClients.get1();
ClusterNode leader = leaderRef.get();
assertNull(leaderRefNoInitialNotify.get());
// Forcing the leader change by transferring leadership.
Peer newLeaderPeer = raftClient.peers().stream().filter(peer -> !leader.name().equals(peer.consistentId())).findAny().get();
log.info("Peer to transfer leader: " + newLeaderPeer);
raftClient.transferLeadership(newLeaderPeer).get();
String leaderId = newLeaderPeer.consistentId();
// Waiting for the notifications to check.
assertTrue(waitForCondition(() -> leaderId.equals(leaderRef.get().name()), WAIT_TIMEOUT_MILLIS));
assertTrue(waitForCondition(
() -> leaderRefNoInitialNotify.get() != null && leaderId.equals(leaderRefNoInitialNotify.get().name()),
WAIT_TIMEOUT_MILLIS
));
log.info("New Leader: " + leaderRef.get());
afterLeaderChange(leaderRef.get().name());
raftClient.refreshLeader().get();
assertEquals(raftClient.leader().consistentId(), leaderRef.get().name());
}
/**
* Stops cluster.
*
* @throws Exception If failed.
*/
private void stopCluster() throws Exception {
if (!CollectionUtils.nullOrEmpty(raftClients)) {
raftClients.forEach(TopologyAwareRaftGroupService::shutdown);
raftClients.clear();
}
for (NetworkAddress addr : clusterServices.keySet()) {
if (raftServers.containsKey(addr)) {
raftServers.get(addr).stopRaftNodes(GROUP_ID);
assertThat(raftServers.get(addr).stopAsync(), willCompleteSuccessfully());
}
assertThat(clusterServices.get(addr).stopAsync(), willCompleteSuccessfully());
}
raftServers.clear();
clusterServices.clear();
}
/**
* Starts cluster.
*
* @param testInfo Test info.
* @param isServerAddress Closure to determine a server node.
* @param nodes Node count.
* @param clientPort Port of node where a client will start.
* @return Topology aware client.
*/
private @Nullable TopologyAwareRaftGroupService startCluster(
TestInfo testInfo,
Predicate<NetworkAddress> isServerAddress,
int nodes,
int clientPort
) {
List<NetworkAddress> addresses = findLocalAddresses(PORT_BASE, PORT_BASE + nodes);
var nodeFinder = new StaticNodeFinder(addresses);
TopologyAwareRaftGroupService raftClient = null;
for (NetworkAddress addr : addresses) {
var cluster = clusterService(testInfo, addr.port(), nodeFinder);
assertThat(cluster.startAsync(), willCompleteSuccessfully());
clusterServices.put(addr, cluster);
}
PeersAndLearners peersAndLearners = peersAndLearners(clusterServices, isServerAddress, nodes);
for (NetworkAddress addr : addresses) {
ClusterService cluster = clusterServices.get(addr);
LogicalTopologyService logicalTopologyService = new LogicalTopologyServiceTestImpl(cluster);
RaftGroupEventsClientListener eventsClientListener = new RaftGroupEventsClientListener();
if (isServerAddress.test(addr)) { // RAFT server node
var localPeer = peersAndLearners.peers().stream()
.filter(peer -> peer.consistentId().equals(cluster.topologyService().localMember().name())).findAny().get();
var dataPath = workDir.resolve("raft_" + localPeer.consistentId());
var commandsMarshaller = new ThreadLocalOptimizedMarshaller(cluster.serializationRegistry());
NodeOptions nodeOptions = new NodeOptions();
nodeOptions.setCommandsMarshaller(commandsMarshaller);
var raftServer = new JraftServerImpl(
cluster,
dataPath,
raftConfiguration,
nodeOptions,
eventsClientListener
);
assertThat(raftServer.startAsync(), willCompleteSuccessfully());
raftServer.startRaftNode(
new RaftNodeId(GROUP_ID, localPeer),
peersAndLearners,
new TestRaftGroupListener(),
RaftGroupOptions.defaults().commandsMarshaller(commandsMarshaller)
);
raftServers.put(addr, raftServer);
afterNodeStart(localPeer.consistentId(), cluster, dataPath, peersAndLearners, eventsClientListener, logicalTopologyService);
}
if (addr.port() == clientPort) {
assertTrue(isServerAddress.test(addr));
raftClient = startTopologyAwareClient(cluster, clusterServices, isServerAddress, nodes, eventsClientListener,
logicalTopologyService, true);
raftClients.add(raftClient);
}
}
return raftClient;
}
private TopologyAwareRaftGroupService startTopologyAwareClient(
ClusterService localClusterService,
Map<NetworkAddress, ClusterService> clusterServices,
Predicate<NetworkAddress> isServerAddress,
int nodes,
RaftGroupEventsClientListener eventsClientListener,
LogicalTopologyService logicalTopologyService,
boolean notifyOnSubscription
) {
if (eventsClientListener == null) {
eventsClientListener = new RaftGroupEventsClientListener();
var finalEventsClientListener = eventsClientListener;
localClusterService.messagingService().addMessageHandler(RaftMessageGroup.class, (msg, sender, correlationId) -> {
if (msg instanceof LeaderChangeNotification) {
LeaderChangeNotification msg0 = (LeaderChangeNotification) msg;
finalEventsClientListener.onLeaderElected(msg0.groupId(), sender, msg0.term());
}
});
}
var commandsMarshaller = new ThreadLocalOptimizedMarshaller(localClusterService.serializationRegistry());
return TopologyAwareRaftGroupService.start(
GROUP_ID,
localClusterService,
FACTORY,
raftConfiguration,
peersAndLearners(clusterServices, isServerAddress, nodes),
true,
executor,
logicalTopologyService,
eventsClientListener,
notifyOnSubscription,
commandsMarshaller
).join();
}
private static PeersAndLearners peersAndLearners(
Map<NetworkAddress, ClusterService> clusterServices,
Predicate<NetworkAddress> isServerAddress,
int nodes
) {
return PeersAndLearners.fromConsistentIds(
findLocalAddresses(PORT_BASE, PORT_BASE + nodes).stream().filter(isServerAddress)
.map(netAddr -> clusterServices.get(netAddr).topologyService().localMember().name()).collect(
toSet()));
}
private void subscribeLeader(TopologyAwareRaftGroupService client, LeaderElectionListener callback, String logMessage) {
CompletableFuture<Void> future = client.subscribeLeader((node, term) -> {
callback.onLeaderElected(node, term);
log.info(logMessage, node);
});
assertThat(future, willCompleteSuccessfully());
}
private static void unsubscribeLeader(TopologyAwareRaftGroupService client) {
CompletableFuture<Void> future = client.unsubscribeLeader();
assertThat(future, willCompleteSuccessfully());
}
}