blob: a5c7522ecf92c54c1589177c1ab23febd148e8ef [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.raft.server;
import static java.util.stream.Collectors.collectingAndThen;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
import static org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.defaultSerializationRegistry;
import static org.apache.ignite.internal.raft.server.RaftGroupOptions.defaults;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.raft.jraft.test.TestUtils.getLocalAddress;
import static org.apache.ignite.raft.jraft.test.TestUtils.waitForTopology;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.IntStream;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.PeersAndLearners;
import org.apache.ignite.internal.raft.RaftNodeId;
import org.apache.ignite.internal.raft.server.RaftServer;
import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
import org.apache.ignite.internal.raft.util.ThreadLocalOptimizedMarshaller;
import org.apache.ignite.internal.replicator.TestReplicationGroupId;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.raft.jraft.RaftGroupService;
import org.apache.ignite.raft.jraft.core.NodeImpl;
import org.apache.ignite.raft.jraft.option.NodeOptions;
import org.apache.ignite.raft.jraft.test.TestUtils;
import org.apache.ignite.raft.server.counter.CounterListener;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
/**
* Integration tests with hybrid logical clock.
*/
class ItJraftHlcServerTest extends RaftServerAbstractTest {
/**
* The server port offset.
*/
private static final int PORT = 5003;
/**
* Initial configuration.
*/
private PeersAndLearners initialConf;
/**
* Listener factory.
*/
private final Supplier<CounterListener> listenerFactory = CounterListener::new;
/**
* Servers list.
*/
private final List<JraftServerImpl> servers = new ArrayList<>();
@BeforeEach
void setUp() {
initialConf = IntStream.rangeClosed(0, 2)
.mapToObj(i -> testNodeName(testInfo, PORT + i))
.collect(collectingAndThen(toSet(), PeersAndLearners::fromConsistentIds));
}
/**
* After each.
*/
@AfterEach
@Override
protected void after() throws Exception {
super.after();
logger().info("Start server shutdown servers={}", servers.size());
Iterator<JraftServerImpl> iterSrv = servers.iterator();
while (iterSrv.hasNext()) {
JraftServerImpl server = iterSrv.next();
iterSrv.remove();
for (RaftNodeId nodeId : server.localNodes()) {
server.stopRaftNode(nodeId);
}
server.beforeNodeStop();
assertThat(server.stopAsync(), willCompleteSuccessfully());
}
TestUtils.assertAllJraftThreadsStopped();
}
/**
* Starts server.
*
* @param idx The index.
* @param clo Init closure.
* @param cons Node options updater.
*
* @return Raft server instance.
*/
private JraftServerImpl startServer(int idx, Consumer<RaftServer> clo, Consumer<NodeOptions> cons) {
var addr = new NetworkAddress(getLocalAddress(), PORT);
ClusterService service = clusterService(PORT + idx, List.of(addr), true);
NodeOptions opts = new NodeOptions();
cons.accept(opts);
JraftServerImpl server = jraftServer(servers, idx, service, opts);
assertThat(server.startAsync(), willCompleteSuccessfully());
clo.accept(server);
servers.add(server);
assertTrue(waitForTopology(service, servers.size(), 15_000));
return server;
}
/**
* Checks that only one instance of clock is created per Ignite node.
*/
@Test
public void testHlcOneInstancePerIgniteNode() {
ThreadLocalOptimizedMarshaller commandsMarshaller = new ThreadLocalOptimizedMarshaller(defaultSerializationRegistry());
startServer(0, raftServer -> {
String localNodeName = raftServer.clusterService().topologyService().localMember().name();
Peer localNode = initialConf.peer(localNodeName);
var nodeId = new RaftNodeId(new TestReplicationGroupId("test_raft_group"), localNode);
raftServer.startRaftNode(nodeId, initialConf, listenerFactory.get(), defaults().commandsMarshaller(commandsMarshaller));
}, opts -> {});
servers.forEach(srv -> {
String localNodeName = srv.clusterService().topologyService().localMember().name();
Peer localNode = initialConf.peer(localNodeName);
for (int i = 0; i < 5; i++) {
var nodeId = new RaftNodeId(new TestReplicationGroupId("test_raft_group_" + i), localNode);
srv.startRaftNode(nodeId, initialConf, listenerFactory.get(), defaults().commandsMarshaller(commandsMarshaller));
}
});
servers.forEach(srv -> {
List<RaftGroupService> grp = srv.localNodes().stream().map(srv::raftGroupService).collect(toList());
assertTrue(grp.size() > 1);
HybridClock clock = ((NodeImpl) grp.get(0).getRaftNode()).clock();
grp.forEach(grp0 -> assertSame(clock, ((NodeImpl) grp0.getRaftNode()).clock()));
});
servers.forEach(srv -> {
srv.stopRaftNodes(new TestReplicationGroupId("test_raft_group"));
for (int i = 0; i < 10; i++) {
srv.stopRaftNodes(new TestReplicationGroupId("test_raft_group_" + i));
}
});
}
}