blob: a3f313ec28dc73e823db4cff1a8d3b628d461bbf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.network.netty;
import static java.util.Collections.emptyList;
import static org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.defaultSerializationRegistry;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureCompletedMatcher.completedFuture;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutIn;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.isA;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.anyShort;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import io.netty.handler.codec.DecoderException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.future.OrderingFuture;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.network.ChannelType;
import org.apache.ignite.internal.network.ClusterNodeImpl;
import org.apache.ignite.internal.network.NettyBootstrapFactory;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
import org.apache.ignite.internal.network.OutNetworkObject;
import org.apache.ignite.internal.network.configuration.NetworkConfiguration;
import org.apache.ignite.internal.network.configuration.NetworkView;
import org.apache.ignite.internal.network.messages.TestMessage;
import org.apache.ignite.internal.network.messages.TestMessagesFactory;
import org.apache.ignite.internal.network.recovery.AllIdsAreFresh;
import org.apache.ignite.internal.network.recovery.message.HandshakeFinishMessage;
import org.apache.ignite.internal.network.serialization.MessageSerializationRegistry;
import org.apache.ignite.internal.network.serialization.SerializationService;
import org.apache.ignite.internal.network.serialization.UserObjectSerializationContext;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.network.NetworkAddress;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.ExtendWith;
/**
* Tests for {@link ConnectionManager}.
*/
@ExtendWith(ConfigurationExtension.class)
public class ItConnectionManagerTest extends BaseIgniteAbstractTest {
/** Started connection managers. */
private final List<ConnectionManagerWrapper> startedManagers = new ArrayList<>();
/** Message factory. */
private final TestMessagesFactory messageFactory = new TestMessagesFactory();
private final TestMessage emptyTestMessage = messageFactory.testMessage().build();
/** Reusable network configuration object. */
@InjectConfiguration
private NetworkConfiguration networkConfiguration;
private TestInfo testInfo;
@BeforeEach
void setTestInfo(TestInfo testInfo) {
this.testInfo = testInfo;
}
/**
* After each.
*/
@AfterEach
final void tearDown() throws Exception {
closeAll(startedManagers);
}
/**
* Tests that a message is sent successfully using the ConnectionManager.
*
* @throws Exception If failed.
*/
@Test
public void testSentSuccessfully() throws Exception {
String msgText = "test";
int port1 = 4000;
int port2 = 4001;
try (ConnectionManagerWrapper manager1 = startManager(port1);
ConnectionManagerWrapper manager2 = startManager(port2)) {
var fut = new CompletableFuture<NetworkMessage>();
manager2.connectionManager.addListener((obj) -> fut.complete(obj.message()));
NettySender sender = manager1.openChannelTo(manager2).get(3, TimeUnit.SECONDS);
TestMessage testMessage = messageFactory.testMessage().msg(msgText).build();
sender.send(new OutNetworkObject(testMessage, emptyList())).get(3, TimeUnit.SECONDS);
NetworkMessage receivedMessage = fut.get(3, TimeUnit.SECONDS);
assertEquals(msgText, ((TestMessage) receivedMessage).msg());
}
}
/**
* Tests that incoming connection is reused for sending messages.
*
* @throws Exception If failed.
*/
@Test
public void testReuseIncomingConnection() throws Exception {
final String msgText = "test";
TestMessage testMessage = messageFactory.testMessage().msg("test").build();
int port1 = 4000;
int port2 = 4001;
try (ConnectionManagerWrapper manager1 = startManager(port1);
ConnectionManagerWrapper manager2 = startManager(port2)) {
var receivedAt1 = new CompletableFuture<NetworkMessage>();
manager1.connectionManager.addListener((obj) -> receivedAt1.complete(obj.message()));
NettySender senderFrom1to2 = manager1.openChannelTo(manager2).get(3, TimeUnit.SECONDS);
assertThat(senderFrom1to2, is(notNullValue()));
// Ensure a handshake has finished on both sides by sending a message.
assertThat(senderFrom1to2.send(new OutNetworkObject(testMessage, emptyList())), willCompleteSuccessfully());
NettySender senderFrom2to1 = manager2.openChannelTo(manager1).get(3, TimeUnit.SECONDS);
assertThat(senderFrom2to1, is(notNullValue()));
InetSocketAddress clientLocalAddress = (InetSocketAddress) senderFrom1to2.channel().localAddress();
InetSocketAddress clientRemoteAddress = (InetSocketAddress) senderFrom2to1.channel().remoteAddress();
assertEquals(clientLocalAddress, clientRemoteAddress);
assertThat(
senderFrom2to1.send(new OutNetworkObject(messageFactory.testMessage().msg("2->1").build(), emptyList())),
willCompleteSuccessfully()
);
NetworkMessage receivedMessage = receivedAt1.get(3, TimeUnit.SECONDS);
assertThat(((TestMessage) receivedMessage).msg(), is("2->1"));
}
}
/**
* Tests that the resources of a connection manager are closed after a shutdown.
*
* @throws Exception If failed.
*/
@Test
public void testShutdown() throws Exception {
int port1 = 4000;
int port2 = 4001;
ConnectionManagerWrapper manager1 = startManager(port1);
ConnectionManagerWrapper manager2 = startManager(port2);
NettySender sender1 = manager1.openChannelTo(manager2).get(3, TimeUnit.SECONDS);
// Wait for the channel to appear on the recipient side.
waitForCondition(() -> !manager2.channels().isEmpty(), 10_000);
NettySender sender2 = manager2.openChannelTo(manager1).get(3, TimeUnit.SECONDS);
assertNotNull(sender1);
assertNotNull(sender2);
for (ConnectionManagerWrapper manager : List.of(manager1, manager2)) {
NettyServer server = manager.connectionManager.server();
Collection<NettyClient> clients = manager.connectionManager.clients();
manager.close();
assertFalse(server.isRunning());
boolean clientsStopped = clients.stream().allMatch(NettyClient::isDisconnected);
assertTrue(clientsStopped);
}
}
/**
* Tests that after a channel was closed, a new channel is opened upon a request.
*
* @throws Exception If failed.
*/
@Test
public void testCanReconnectAfterFail() throws Exception {
String msgText = "test";
int port1 = 4000;
int port2 = 4001;
UUID launchId2 = UUID.randomUUID();
ConnectionManagerWrapper manager1 = startManager(port1);
ConnectionManagerWrapper manager2 = startManager(port2, launchId2);
NettySender sender = manager1.openChannelTo(manager2).get(300, TimeUnit.SECONDS);
assertNotNull(sender);
TestMessage testMessage = messageFactory.testMessage().msg(msgText).build();
manager2.close();
sender.send(new OutNetworkObject(testMessage, emptyList()));
manager2 = startManager(port2, launchId2);
var latch = new CountDownLatch(2);
manager2.connectionManager.addListener((obj) -> {
if (testMessage.equals(obj.message())) {
latch.countDown();
}
});
sender = manager1.openChannelTo(manager2).get(3, TimeUnit.SECONDS);
assertNotNull(sender);
sender.send(new OutNetworkObject(testMessage, emptyList())).get(3, TimeUnit.SECONDS);
assertTrue(latch.await(3, TimeUnit.SECONDS));
}
/**
* Tests that a connection to a misconfigured server results in a connection close and an exception on the client side.
*/
@Test
public void testConnectMisconfiguredServer() throws Exception {
ConnectionManagerWrapper client = startManager(4000);
try (ConnectionManagerWrapper server = startManager(4001, mockSerializationRegistry())) {
client.openChannelTo(server).get(3, TimeUnit.SECONDS);
} catch (ExecutionException e) {
assertThat(e.getCause(), isA(IOException.class));
}
}
/**
* Tests that a connection from a misconfigured client results in an exception.
*/
@Test
public void testConnectMisconfiguredClient() throws Exception {
ConnectionManagerWrapper client = startManager(4000, mockSerializationRegistry());
try (ConnectionManagerWrapper server = startManager(4001)) {
client.openChannelTo(server).get(3, TimeUnit.SECONDS);
} catch (ExecutionException e) {
assertThat(e.getCause(), isA(DecoderException.class));
}
}
/**
* Tests that a connection manager fails to start twice.
*/
@Test
public void testStartTwice() throws Exception {
ConnectionManagerWrapper server = startManager(4000);
assertThrows(IgniteInternalException.class, server.connectionManager::start);
}
/**
* Tests that a connection manager can be stopped twice.
*/
@Test
public void testStopTwice() throws Exception {
ConnectionManagerWrapper server = startManager(4000);
server.close();
server.close();
}
/**
* Tests that if two nodes are opening channels to each other, only one channel survives.
*
* @throws Exception If failed.
*/
@RepeatedTest(100)
@Timeout(10)
public void testOneChannelLeftIfConnectToEachOther() throws Exception {
try (
ConnectionManagerWrapper manager1 = startManager(4000);
ConnectionManagerWrapper manager2 = startManager(4001)
) {
CompletableFuture<NettySender> fut1 = manager1.openChannelTo(manager2).toCompletableFuture();
CompletableFuture<NettySender> fut2 = manager2.openChannelTo(manager1).toCompletableFuture();
NettySender sender1 = fut1.get(1, TimeUnit.SECONDS);
NettySender sender2 = fut2.get(1, TimeUnit.SECONDS);
assertTrue(sender1.isOpen());
assertTrue(sender2.isOpen());
assertTrue(
waitForCondition(
() -> singleOpenedChannel(manager1) && singleOpenedChannel(manager2),
TimeUnit.SECONDS.toMillis(1)
)
);
CompletableFuture<NettySender> channelFut1 = manager1.connectionManager.channel(
manager2.connectionManager.consistentId(),
ChannelType.DEFAULT,
manager2.connectionManager.localAddress()
).toCompletableFuture();
CompletableFuture<NettySender> channelFut2 = manager2.connectionManager.channel(
manager1.connectionManager.consistentId(),
ChannelType.DEFAULT,
manager1.connectionManager.localAddress()
).toCompletableFuture();
assertThat(channelFut1, is(completedFuture()));
assertThat(channelFut2, is(completedFuture()));
NettySender channel1 = channelFut1.getNow(null);
NettySender channel2 = channelFut2.getNow(null);
InetSocketAddress locAddr1 = (InetSocketAddress) channel1.channel().localAddress();
InetSocketAddress remoteAddr1 = (InetSocketAddress) channel1.channel().remoteAddress();
InetSocketAddress locAddr2 = (InetSocketAddress) channel2.channel().localAddress();
InetSocketAddress remoteAddr2 = (InetSocketAddress) channel2.channel().remoteAddress();
// Only compare ports because hosts may look different, eg localhost and 0.0.0.0. They are technically not same,
// although equal.
assertEquals(locAddr1.getPort(), remoteAddr2.getPort());
assertEquals(locAddr2.getPort(), remoteAddr1.getPort());
}
}
private static boolean singleOpenedChannel(ConnectionManagerWrapper manager) {
Iterator<NettySender> it = manager.channels().values().iterator();
return it.hasNext() && it.next().isOpen() && !it.hasNext();
}
@Test
public void sendFutureOfMessageNeedingAckCompletesWhenMessageGetsAcknowledged() throws Exception {
try (
ConnectionManagerWrapper manager1 = startManager(4000);
ConnectionManagerWrapper manager2 = startManager(4001)
) {
NettySender sender = manager1.openChannelTo(manager2).toCompletableFuture().get(10, TimeUnit.SECONDS);
waitTillChannelAppearsInMapOnAcceptor(sender, manager1, manager2);
OutgoingAcknowledgementSilencer ackSilencer = dropAcksFrom(manager2);
CompletableFuture<Void> sendFuture = sender.send(new OutNetworkObject(emptyTestMessage, emptyList(), true));
assertThat(sendFuture, willTimeoutIn(100, TimeUnit.MILLISECONDS));
ackSilencer.stopSilencing();
provokeAckFor(sender);
assertThat(sendFuture, willCompleteSuccessfully());
}
}
private static void waitTillChannelAppearsInMapOnAcceptor(
NettySender senderFromOpener,
ConnectionManagerWrapper opener,
ConnectionManagerWrapper acceptor
) throws InterruptedException {
assertTrue(
waitForCondition(
() -> acceptor.channels().values().stream().anyMatch(acceptorSender
-> acceptorSender.consistentId().equals(opener.connectionManager.consistentId())
&& acceptorSender.channelId() == senderFromOpener.channelId()),
TimeUnit.SECONDS.toMillis(10)
),
"Did not observe the sender appearing in the acceptor's sender map in time"
);
}
@Test
public void sendFuturesCompleteInSendOrder() throws Exception {
try (
ConnectionManagerWrapper manager1 = startManager(4000);
ConnectionManagerWrapper manager2 = startManager(4001)
) {
NettySender sender = manager1.openChannelTo(manager2).toCompletableFuture().get(10, TimeUnit.SECONDS);
waitTillChannelAppearsInMapOnAcceptor(sender, manager1, manager2);
OutgoingAcknowledgementSilencer ackSilencer = dropAcksFrom(manager2);
List<Integer> ordinals = new CopyOnWriteArrayList<>();
CompletableFuture<Void> future1 = sender.send(new OutNetworkObject(emptyTestMessage, emptyList(), true))
.whenComplete((res, ex) -> ordinals.add(1));
CompletableFuture<Void> future2 = sender.send(new OutNetworkObject(emptyTestMessage, emptyList(), true))
.whenComplete((res, ex) -> ordinals.add(2));
ackSilencer.stopSilencing();
provokeAckFor(sender);
assertThat(CompletableFuture.allOf(future1, future2), willCompleteSuccessfully());
assertThat(ordinals, contains(1, 2));
}
}
@Test
public void sendFutureOfMessageNotNeedingAckCompletesWhenMessageGetsWritten() throws Exception {
try (
ConnectionManagerWrapper manager1 = startManager(4000);
ConnectionManagerWrapper manager2 = startManager(4001)
) {
NettySender sender = manager1.openChannelTo(manager2).toCompletableFuture().get(10, TimeUnit.SECONDS);
waitTillChannelAppearsInMapOnAcceptor(sender, manager1, manager2);
dropAcksFrom(manager2);
HandshakeFinishMessage messageNotNeedingAck = new NetworkMessagesFactory().handshakeFinishMessage().build();
CompletableFuture<Void> sendFuture = sender.send(new OutNetworkObject(messageNotNeedingAck, emptyList(), true));
assertThat(sendFuture, willCompleteSuccessfully());
}
}
private static OutgoingAcknowledgementSilencer dropAcksFrom(ConnectionManagerWrapper connectionManagerWrapper)
throws InterruptedException {
return OutgoingAcknowledgementSilencer.installOn(connectionManagerWrapper.channels().values());
}
private void provokeAckFor(NettySender sender1) {
sender1.send(new OutNetworkObject(emptyTestMessage, emptyList(), true));
}
/**
* Creates a mock {@link MessageSerializationRegistry} that throws an exception when trying to get a serializer or a deserializer.
*/
private static MessageSerializationRegistry mockSerializationRegistry() {
var mockRegistry = mock(MessageSerializationRegistry.class);
when(mockRegistry.createDeserializer(anyShort(), anyShort())).thenThrow(RuntimeException.class);
when(mockRegistry.createSerializer(anyShort(), anyShort())).thenThrow(RuntimeException.class);
return mockRegistry;
}
/**
* Creates and starts a {@link ConnectionManager} listening on the given port.
*
* @param port Port for the connection manager to listen on.
* @return Connection manager.
*/
private ConnectionManagerWrapper startManager(int port) throws Exception {
return startManager(port, defaultSerializationRegistry());
}
private ConnectionManagerWrapper startManager(int port, MessageSerializationRegistry registry) throws Exception {
return startManager(port, UUID.randomUUID(), registry);
}
private ConnectionManagerWrapper startManager(int port, UUID launchId) throws Exception {
return startManager(port, launchId, defaultSerializationRegistry());
}
/**
* Creates and starts a {@link ConnectionManager} listening on the given port, configured with the provided serialization registry.
*
* @param port Port for the connection manager to listen on.
* @param registry Serialization registry.
* @return Connection manager.
*/
private ConnectionManagerWrapper startManager(int port, UUID launchId, MessageSerializationRegistry registry) throws Exception {
String consistentId = testNodeName(testInfo, port);
networkConfiguration.port().update(port).join();
NetworkView cfg = networkConfiguration.value();
NettyBootstrapFactory bootstrapFactory = new NettyBootstrapFactory(networkConfiguration, consistentId);
assertThat(bootstrapFactory.startAsync(), willCompleteSuccessfully());
try {
var manager = new ConnectionManager(
cfg,
new SerializationService(registry, mock(UserObjectSerializationContext.class)),
consistentId,
bootstrapFactory,
new AllIdsAreFresh(),
mock(FailureProcessor.class)
);
manager.start();
manager.setLocalNode(new ClusterNodeImpl(
launchId.toString(),
consistentId,
new NetworkAddress(manager.localAddress().getHostName(), port)
));
var wrapper = new ConnectionManagerWrapper(manager, bootstrapFactory);
startedManagers.add(wrapper);
return wrapper;
} catch (Exception e) {
assertThat(bootstrapFactory.stopAsync(), willCompleteSuccessfully());
throw e;
}
}
private static class ConnectionManagerWrapper implements AutoCloseable {
final ConnectionManager connectionManager;
private final NettyBootstrapFactory nettyFactory;
ConnectionManagerWrapper(ConnectionManager connectionManager, NettyBootstrapFactory nettyFactory) {
this.connectionManager = connectionManager;
this.nettyFactory = nettyFactory;
}
@Override
public void close() throws Exception {
closeAll(
connectionManager::initiateStopping,
connectionManager::stop,
() -> assertThat(nettyFactory.stopAsync(), willCompleteSuccessfully())
);
}
OrderingFuture<NettySender> openChannelTo(ConnectionManagerWrapper recipient) {
return connectionManager.channel(
recipient.connectionManager.consistentId(),
ChannelType.DEFAULT,
recipient.connectionManager.localAddress()
);
}
Map<ConnectorKey<String>, NettySender> channels() {
return connectionManager.channels();
}
}
}