blob: 058d6751aa6f8f5836f1d91473af7ee30c0ce314 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.network.netty;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyShort;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPromise;
import io.netty.channel.ServerChannel;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.net.ConnectException;
import java.net.Socket;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.network.NettyBootstrapFactory;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.configuration.NetworkConfiguration;
import org.apache.ignite.internal.network.handshake.HandshakeManager;
import org.apache.ignite.internal.network.serialization.MessageDeserializer;
import org.apache.ignite.internal.network.serialization.MessageMappingException;
import org.apache.ignite.internal.network.serialization.MessageReader;
import org.apache.ignite.internal.network.serialization.MessageSerializationRegistry;
import org.apache.ignite.internal.network.serialization.SerializationService;
import org.apache.ignite.internal.network.serialization.UserObjectSerializationContext;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InOrder;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.opentest4j.AssertionFailedError;
/**
* Tests for {@link NettyServer}.
*/
@ExtendWith(ConfigurationExtension.class)
public class NettyServerTest extends BaseIgniteAbstractTest {
/** Bootstrap factory. */
private NettyBootstrapFactory bootstrapFactory;
/** Server. */
private NettyServer server;
/** Server configuration. */
@InjectConfiguration
private NetworkConfiguration serverCfg;
/**
* After each.
*/
@AfterEach
final void tearDown() throws Exception {
closeAll(
server == null ? null : () -> server.stop().join(),
bootstrapFactory == null ? null : () -> assertThat(bootstrapFactory.stopAsync(), willCompleteSuccessfully())
);
}
/**
* Tests a successful server start scenario.
*
*/
@Test
public void testSuccessfulServerStart() {
server = getServer(true);
assertTrue(server.isRunning());
}
/**
* Tests a graceful server shutdown scenario.
*
*/
@Test
public void testServerGracefulShutdown() {
server = getServer(true);
server.stop().join();
}
/**
* Tests a scenario where a server is stopped before a server socket is successfully bound.
*
* @throws Exception If failed.
*/
@Test
public void testServerStoppedBeforeStarted() throws Exception {
var channel = new EmbeddedServerChannel();
ChannelPromise future = channel.newPromise();
server = getServer(false);
CompletableFuture<Void> stop = server.stop();
future.setSuccess(null);
stop.get(3, TimeUnit.SECONDS);
assertFalse(channel.finish());
}
/**
* Tests that a {@link NettyServer#start} method can be called only once.
*/
@Test
public void testStartTwice() {
server = getServer(true);
assertThrows(IgniteInternalException.class, server::start);
}
/**
* Tests that bootstrap tries to bind to address specified in configuration.
*/
@Test
public void testBindWithAddress() {
String host = "127.0.0.7";
assertThat(serverCfg.listenAddress().update(host), willCompleteSuccessfully());
assertDoesNotThrow(() -> getServer(true));
assertDoesNotThrow(
() -> {
Socket socket = new Socket(host, serverCfg.port().value());
socket.close();
});
assertThrows(
ConnectException.class,
() -> {
Socket socket = new Socket("127.0.0.1", serverCfg.port().value());
socket.close();
});
}
/**
* Tests the case when couldn't bind to the address provided.
*/
@Test
public void testBindUnknownAddressFailed() {
String address = "unknown-address";
assertThat(serverCfg.listenAddress().update(address), willCompleteSuccessfully());
AssertionFailedError e = assertThrows(AssertionFailedError.class, () -> getServer(true));
String expectedError = String.format("Address %s:%d is not available", address, serverCfg.port().value());
assertThat(e.getCause().getMessage(), containsString(expectedError));
}
/**
* Tests that handshake manager is invoked upon a client connecting to a server.
*
* @throws Exception If failed.
*/
@Test
public void testHandshakeManagerInvoked() throws Exception {
HandshakeManager handshakeManager = mockHandshakeManager();
MessageSerializationRegistry registry = mock(MessageSerializationRegistry.class);
when(registry.createDeserializer(anyShort(), anyShort()))
.thenReturn(new MessageDeserializer<>() {
/** {@inheritDoc} */
@Override
public boolean readMessage(MessageReader reader) throws MessageMappingException {
return true;
}
/** {@inheritDoc} */
@Override
public Class<NetworkMessage> klass() {
return NetworkMessage.class;
}
/** {@inheritDoc} */
@Override
public NetworkMessage getMessage() {
return mock(NetworkMessage.class);
}
});
bootstrapFactory = new NettyBootstrapFactory(serverCfg, "");
assertThat(bootstrapFactory.startAsync(), willCompleteSuccessfully());
server = new NettyServer(
serverCfg.value(),
() -> handshakeManager,
(message) -> {},
new SerializationService(registry, mock(UserObjectSerializationContext.class)),
bootstrapFactory
);
server.start().get(3, TimeUnit.SECONDS);
CompletableFuture<Channel> connectFut = NettyUtils.toChannelCompletableFuture(
new Bootstrap()
.channel(NioSocketChannel.class)
.group(new NioEventLoopGroup())
.handler(new ChannelInitializer<>() {
/** {@inheritDoc} */
@Override
protected void initChannel(Channel ch) {
// No-op.
}
})
.connect(server.address())
);
Channel channel = connectFut.get(3, TimeUnit.SECONDS);
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
// One message only. 1 byte for group type, 1 byte for message type, 1 byte for payload.
for (int i = 0; i < 3; i++) {
buffer.writeByte(1);
}
channel.writeAndFlush(buffer).get(3, TimeUnit.SECONDS);
channel.close().get(3, TimeUnit.SECONDS);
InOrder order = Mockito.inOrder(handshakeManager);
order.verify(handshakeManager, timeout()).onInit(any());
order.verify(handshakeManager, timeout()).onConnectionOpen();
order.verify(handshakeManager, timeout()).localHandshakeFuture();
order.verify(handshakeManager, timeout()).onMessage(any());
}
private HandshakeManager mockHandshakeManager() {
HandshakeManager handshakeManager = mock(HandshakeManager.class);
when(handshakeManager.localHandshakeFuture()).thenReturn(completedFuture(mock(NettySender.class)));
when(handshakeManager.finalHandshakeFuture()).thenReturn(completedFuture(mock(NettySender.class)));
return handshakeManager;
}
/**
* Returns verification mode for a one call with a 3-second timeout.
*
* @return Verification mode for a one call with a 3-second timeout.
*/
private static VerificationMode timeout() {
return Mockito.timeout(TimeUnit.SECONDS.toMillis(3));
}
/**
* Creates a server from a backing {@link ChannelFuture}.
*
* @param shouldStart {@code true} if a server should start successfully
* @return NettyServer.
*/
private NettyServer getServer(boolean shouldStart) {
bootstrapFactory = new NettyBootstrapFactory(serverCfg, "");
assertThat(bootstrapFactory.startAsync(), willCompleteSuccessfully());
MessageSerializationRegistry registry = mock(MessageSerializationRegistry.class);
var server = new NettyServer(
serverCfg.value(),
this::mockHandshakeManager,
(message) -> {},
new SerializationService(registry, mock(UserObjectSerializationContext.class)),
bootstrapFactory
);
try {
server.start().get(3, TimeUnit.SECONDS);
} catch (Exception e) {
if (shouldStart) {
fail(e);
}
}
return server;
}
/** Server channel on top of the {@link EmbeddedChannel}. */
private static class EmbeddedServerChannel extends EmbeddedChannel implements ServerChannel {
// No-op.
}
}