blob: 2970093e4582a4697418398f86c7d3776369b5a6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.raft.jraft.rpc;
import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.raft.jraft.test.TestUtils.INIT_PORT;
import static org.hamcrest.MatcherAssert.assertThat;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.StaticNodeFinder;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.raft.jraft.JRaftUtils;
import org.apache.ignite.raft.jraft.NodeManager;
import org.apache.ignite.raft.jraft.option.NodeOptions;
import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcClient;
import org.apache.ignite.raft.jraft.test.TestUtils;
import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper;
import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.TestInfo;
/**
* Ignite RPC test.
*/
public class IgniteRpcTest extends AbstractRpcTest {
private static final IgniteLogger LOG = Loggers.forClass(IgniteRpcTest.class);
/** The counter. */
private final AtomicInteger cntr = new AtomicInteger();
/** Requests executor. */
private ExecutorService requestExecutor;
/** Test info. */
private final TestInfo testInfo;
/** Constructor. */
public IgniteRpcTest(TestInfo testInfo) {
this.testInfo = testInfo;
}
@AfterEach
public void shutdownExecutor() {
ExecutorServiceHelper.shutdownAndAwaitTermination(requestExecutor);
}
@Override
public RpcServer<?> createServer() {
ClusterService service = ClusterServiceTestUtils.clusterService(
testInfo,
INIT_PORT,
new StaticNodeFinder(Collections.emptyList())
);
NodeOptions nodeOptions = new NodeOptions();
requestExecutor = JRaftUtils.createRequestExecutor(nodeOptions);
var server = new TestIgniteRpcServer(service, new NodeManager(), nodeOptions, requestExecutor) {
@Override public void shutdown() {
super.shutdown();
assertThat(service.stopAsync(), willCompleteSuccessfully());
}
};
assertThat(service.startAsync(), willCompleteSuccessfully());
return server;
}
@Override
public RpcClient createClient0() {
int i = cntr.incrementAndGet();
ClusterService service = ClusterServiceTestUtils.clusterService(
testInfo,
INIT_PORT - i,
new StaticNodeFinder(List.of(new NetworkAddress(TestUtils.getLocalAddress(), INIT_PORT)))
);
IgniteRpcClient client = new IgniteRpcClient(service) {
@Override public void shutdown() {
super.shutdown();
assertThat(service.stopAsync(), willCompleteSuccessfully());
}
};
assertThat(service.startAsync(), willCompleteSuccessfully());
waitForTopology(client, 1 + i, 5_000);
return client;
}
@Override
protected boolean waitForTopology(RpcClient client, int expected, long timeout) {
ClusterService service = ((IgniteRpcClient) client).clusterService();
boolean success = TestUtils.waitForTopology(service, expected, timeout);
if (!success) {
Collection<ClusterNode> topology = service.topologyService().allMembers();
LOG.error("Topology on node '{}' didn't match expected topology size. Expected: {}, actual: {}.\nTopology nodes: {}",
service.nodeName(),
expected,
topology.size(),
topology.stream().map(ClusterNode::name).collect(toList())
);
}
return success;
}
}