blob: c1401c06f82396c6a741d845cfb39d6c330316aa [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.raft;
import static java.util.stream.Collectors.collectingAndThen;
import static java.util.stream.Collectors.toSet;
import static org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.findLocalAddresses;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.will;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.util.IgniteUtils.startAsync;
import static org.apache.ignite.internal.util.IgniteUtils.stopAsync;
import static org.apache.ignite.raft.jraft.test.TestUtils.waitForCondition;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Stream;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.metrics.NoOpMetricManager;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.NodeFinder;
import org.apache.ignite.internal.network.StaticNodeFinder;
import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils;
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
import org.apache.ignite.internal.raft.service.RaftGroupListener;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.replicator.TestReplicationGroupId;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.util.IgniteUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
/**
* Integration test methods of raft group service.
*/
@ExtendWith(MockitoExtension.class)
@ExtendWith(ConfigurationExtension.class)
public class ItRaftGroupServiceTest extends IgniteAbstractTest {
private static final int NODES_CNT = 2;
private static final int NODE_PORT_BASE = 20_000;
private static final TestReplicationGroupId RAFT_GROUP_NAME = new TestReplicationGroupId("part1");
private static final NodeFinder NODE_FINDER = new StaticNodeFinder(findLocalAddresses(NODE_PORT_BASE, NODE_PORT_BASE + NODES_CNT));
private final List<TestNode> nodes = new ArrayList<>();
@Mock
private RaftGroupEventsListener eventsListener;
@InjectConfiguration
private RaftConfiguration raftConfiguration;
@BeforeEach
public void setUp(TestInfo testInfo) {
for (int i = 0; i < NODES_CNT; i++) {
startNode(testInfo);
}
PeersAndLearners configuration = nodes.stream()
.map(TestNode::name)
.collect(collectingAndThen(toSet(), PeersAndLearners::fromConsistentIds));
CompletableFuture<?>[] svcFutures = nodes.stream()
.map(node -> node.startRaftGroup(configuration))
.toArray(CompletableFuture[]::new);
assertThat(CompletableFuture.allOf(svcFutures), willCompleteSuccessfully());
}
private TestNode startNode(TestInfo testInfo) {
var node = new TestNode(testInfo);
node.start();
nodes.add(node);
return node;
}
@AfterEach
public void tearDown() throws Exception {
IgniteUtils.closeAll(nodes.parallelStream().map(node -> node::beforeNodeStop));
IgniteUtils.closeAll(nodes.parallelStream().map(node -> node::stop));
}
@Test
@Timeout(20)
public void testTransferLeadership() {
assertThat(nodes.get(0).raftGroupService, willCompleteSuccessfully());
Peer leader = nodes.get(0).raftGroupService.join().leader();
TestNode oldLeaderNode = nodes.stream()
.filter(node -> node.name().equals(leader.consistentId()))
.findFirst()
.orElseThrow();
TestNode newLeaderNode = nodes.stream()
.filter(node -> !node.name().equals(leader.consistentId()))
.findFirst()
.orElseThrow();
Peer expectedNewLeaderPeer = new Peer(newLeaderNode.name());
CompletableFuture<Void> transferLeadership = oldLeaderNode.raftGroupService
.thenCompose(service -> service.transferLeadership(expectedNewLeaderPeer));
assertThat(transferLeadership, willCompleteSuccessfully());
assertThat(oldLeaderNode.raftGroupService.thenApply(RaftGroupService::leader), willBe(expectedNewLeaderPeer));
assertTrue(waitForCondition(() -> {
assertThat(newLeaderNode.raftGroupService.thenCompose(RaftGroupService::refreshLeader), willCompleteSuccessfully());
return expectedNewLeaderPeer.equals(newLeaderNode.raftGroupService.join().leader());
}, 10_000));
}
@Test
public void testChangePeersAsync(TestInfo testInfo) throws InterruptedException {
// Start some new followers.
List<TestNode> newFollowers = List.of(startNode(testInfo), startNode(testInfo));
Set<String> newFollowersConfig = nodes.stream().map(TestNode::name).collect(toSet());
// Start some new learners.
List<TestNode> newLearners = List.of(startNode(testInfo), startNode(testInfo));
Set<String> newLearnersConfig = newLearners.stream().map(TestNode::name).collect(toSet());
PeersAndLearners configuration = PeersAndLearners.fromConsistentIds(newFollowersConfig, newLearnersConfig);
// Start Raft groups on the new nodes with the new configuration.
CompletableFuture<?>[] startedServices = Stream.concat(newFollowers.stream(), newLearners.stream())
.map(node -> node.startRaftGroup(configuration))
.toArray(CompletableFuture[]::new);
assertThat(CompletableFuture.allOf(startedServices), willCompleteSuccessfully());
// Change Raft configuration and wait until it's applied.
var configurationComplete = new CountDownLatch(1);
doAnswer(invocation -> {
configurationComplete.countDown();
return null;
}).when(eventsListener).onNewPeersConfigurationApplied(any());
CompletableFuture<Void> changePeersFuture = nodes.get(0).raftGroupService
.thenCompose(service -> service.refreshAndGetLeaderWithTerm()
.thenCompose(l -> service.changePeersAsync(configuration, l.term()))
);
assertThat(changePeersFuture, willCompleteSuccessfully());
assertTrue(configurationComplete.await(10, TimeUnit.SECONDS));
// Check that configuration is the same on all nodes.
for (TestNode node : nodes) {
assertThat(
node.raftGroupService.thenCompose(service -> service.refreshMembers(true)),
willCompleteSuccessfully()
);
assertThat(
node.raftGroupService.thenApply(RaftGroupService::peers),
will(containsInAnyOrder(configuration.peers().toArray()))
);
assertThat(
node.raftGroupService.thenApply(RaftGroupService::learners),
will(containsInAnyOrder(configuration.learners().toArray()))
);
}
}
private class TestNode {
private final ClusterService clusterService;
private final Loza loza;
private CompletableFuture<RaftGroupService> raftGroupService;
TestNode(TestInfo testInfo) {
this.clusterService = ClusterServiceTestUtils.clusterService(testInfo, NODE_PORT_BASE + nodes.size(), NODE_FINDER);
this.loza = new Loza(
clusterService,
new NoOpMetricManager(),
raftConfiguration,
workDir.resolve("node" + nodes.size()),
new HybridClockImpl()
);
}
String name() {
return clusterService.topologyService().localMember().name();
}
void start() {
assertThat(startAsync(clusterService, loza), willCompleteSuccessfully());
}
CompletableFuture<RaftGroupService> startRaftGroup(PeersAndLearners configuration) {
String nodeName = clusterService.topologyService().localMember().name();
Peer serverPeer = configuration.peer(nodeName);
var nodeId = new RaftNodeId(RAFT_GROUP_NAME, serverPeer == null ? configuration.learner(nodeName) : serverPeer);
try {
raftGroupService = loza.startRaftGroupNodeAndWaitNodeReadyFuture(
nodeId, configuration, mock(RaftGroupListener.class), eventsListener
);
} catch (NodeStoppingException e) {
return CompletableFuture.failedFuture(e);
}
return raftGroupService;
}
void beforeNodeStop() throws Exception {
Stream<AutoCloseable> shutdownService = Stream.of(
raftGroupService == null
? null
: (AutoCloseable) () -> raftGroupService.get(1, TimeUnit.SECONDS).shutdown()
);
Stream<AutoCloseable> stopRaftGroups = loza.localNodes().stream().map(id -> () -> loza.stopRaftNode(id));
Stream<AutoCloseable> beforeNodeStop = Stream.of(loza::beforeNodeStop, clusterService::beforeNodeStop);
IgniteUtils.closeAll(Stream.of(shutdownService, stopRaftGroups, beforeNodeStop).flatMap(Function.identity()));
}
void stop() {
assertThat(stopAsync(loza, clusterService), willCompleteSuccessfully());
}
}
}