blob: 4ef85272ac8ae1f7c3b85cdc1a3c4ef4d6063d18 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.network.recovery;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import io.netty.channel.Channel;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
import org.apache.ignite.internal.network.handshake.HandshakeAction;
import org.apache.ignite.internal.network.netty.ConnectionManager;
import org.apache.ignite.internal.network.netty.NettySender;
import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.TestMessageSerializationRegistryImpl;
import org.apache.ignite.network.TestMessagesFactory;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import static org.apache.ignite.internal.network.recovery.ITRecoveryHandshakeTest.FailingRecoveryClientHandshakeManager.ClientStageFail;
import static org.apache.ignite.internal.network.recovery.ITRecoveryHandshakeTest.FailingRecoveryClientHandshakeManager.ClientStageFail.CLIENT_CONNECTION_OPENED;
import static org.apache.ignite.internal.network.recovery.ITRecoveryHandshakeTest.FailingRecoveryClientHandshakeManager.ClientStageFail.CLIENT_DOESNT_FAIL;
import static org.apache.ignite.internal.network.recovery.ITRecoveryHandshakeTest.FailingRecoveryClientHandshakeManager.ClientStageFail.CLIENT_INIT;
import static org.apache.ignite.internal.network.recovery.ITRecoveryHandshakeTest.FailingRecoveryClientHandshakeManager.ClientStageFail.CLIENT_SERVER_RESPONDED;
import static org.apache.ignite.internal.network.recovery.ITRecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail;
import static org.apache.ignite.internal.network.recovery.ITRecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail.SERVER_CLIENT_RESPONDED;
import static org.apache.ignite.internal.network.recovery.ITRecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail.SERVER_CONNECTION_OPENED;
import static org.apache.ignite.internal.network.recovery.ITRecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail.SERVER_DOESNT_FAIL;
import static org.apache.ignite.internal.network.recovery.ITRecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail.SERVER_INIT;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Recovery protocol handshake tests.
*/
public class ITRecoveryHandshakeTest {
/** Started connection managers. */
private final List<ConnectionManager> startedManagers = new ArrayList<>();
private final TestMessagesFactory messageFactory = new TestMessagesFactory();
/** */
@AfterEach
final void tearDown() {
startedManagers.forEach(ConnectionManager::stop);
}
/**
* Tests handshake scenarios in which some of the parts of handshake protocol can fail.
*
* @param scenario Handshake scenario.
* @throws Exception If failed.
*/
@ParameterizedTest
@MethodSource("handshakeScenarios")
public void testHandshakeScenario(HandshakeScenario scenario) throws Exception {
ConnectionManager manager1 = startManager(
4000,
scenario.serverFailAt,
CLIENT_DOESNT_FAIL
);
ConnectionManager manager2 = startManager(
4001,
SERVER_DOESNT_FAIL,
scenario.clientFailAt
);
NettySender from2to1;
try {
from2to1 = manager2.channel(manager1.consistentId(), manager1.getLocalAddress()).get(3, TimeUnit.SECONDS);
}
catch (Exception e) {
if (scenario.clientFailAt == CLIENT_DOESNT_FAIL &&
scenario.serverFailAt == SERVER_DOESNT_FAIL)
Assertions.fail(e);
return;
}
if (scenario.clientFailAt != CLIENT_DOESNT_FAIL || scenario.serverFailAt != SERVER_DOESNT_FAIL)
Assertions.fail("Handshake should've failed");
assertNotNull(from2to1);
// Ensure the handshake has finished on both sides.
from2to1.send(messageFactory.testMessage().msg("test").build()).get(3, TimeUnit.SECONDS);
NettySender from1to2 = manager1.channel(manager2.consistentId(), manager2.getLocalAddress()).get(3, TimeUnit.SECONDS);
assertNotNull(from1to2);
assertEquals(from2to1.channel().localAddress(), from1to2.channel().remoteAddress());
}
/**
* Tests special handshake scenario: the client assumes a handshake has been finished, but the server fails
* on client's response. The server will then close a connection and the client should get the
* "connection closed event".
*
* @throws Exception If failed.
*/
@Test
public void testHandshakeFailsOnServerWhenClientResponded() throws Exception {
ConnectionManager manager1 = startManager(
4000,
SERVER_CLIENT_RESPONDED,
CLIENT_DOESNT_FAIL
);
ConnectionManager manager2 = startManager(
4001,
SERVER_DOESNT_FAIL,
CLIENT_DOESNT_FAIL
);
NettySender from2to1 = manager2.channel(manager1.consistentId(), manager1.getLocalAddress()).get(3, TimeUnit.SECONDS);
from2to1.channel().closeFuture().get(3, TimeUnit.SECONDS);
}
/**
* Tests that one server can handle multiple incoming connections and perform the handshake operation successfully.
*
* @throws Exception If failed.
*/
@Test
public void testHandshakeWithMultipleClients() throws Exception {
ConnectionManager server = startManager(4000);
List<ConnectionManager> clients = new ArrayList<>();
int clientCount = 10;
for (int i = 0; i < clientCount; i++)
clients.add(startManager(4001 + i));
// A key is the client's consistent id, a value is the channel between the client and the server
var channelsToServer = new HashMap<String, NettySender>();
for (ConnectionManager client : clients) {
channelsToServer.put(
client.consistentId(),
client.channel(server.consistentId(), server.getLocalAddress()).get(3, TimeUnit.SECONDS)
);
}
assertTrue(waitForCondition(() -> server.channels().size() == clientCount, TimeUnit.SECONDS.toMillis(3)));
Map<String, NettySender> channels = server.channels();
// Assert that server's channels are inbound connections opened from clients
channelsToServer.forEach((consistentId, toServer) -> {
NettySender toClient = channels.get(consistentId);
assertNotNull(toClient);
assertEquals(toServer.channel().localAddress(), toClient.channel().remoteAddress());
});
}
/**
* @return Generates handshake scenarios.
*/
private static List<HandshakeScenario> handshakeScenarios() {
ServerStageFail[] serverOpts = ServerStageFail.values();
ClientStageFail[] clientOpts = ClientStageFail.values();
List<HandshakeScenario> res = new ArrayList<>();
for (ServerStageFail serverOpt : serverOpts)
for (ClientStageFail clientOpt : clientOpts)
// The case in if statement is handled in separate test
if (serverOpt != SERVER_CLIENT_RESPONDED && clientOpt != CLIENT_DOESNT_FAIL)
res.add(new HandshakeScenario(serverOpt, clientOpt));
return res;
}
/** Handshake scenario. */
private static class HandshakeScenario {
/** Stage to fail server handshake at. */
private final ServerStageFail serverFailAt;
/** Stage to fail client handshake at. */
private final ClientStageFail clientFailAt;
/** Constructor. */
private HandshakeScenario(ServerStageFail serverFailAt, ClientStageFail clientFailAt) {
this.serverFailAt = serverFailAt;
this.clientFailAt = clientFailAt;
}
/** {@inheritDoc} */
@Override public String toString() {
return String.format("server=%s, client=%s", serverFailAt, clientFailAt);
}
}
/**
* {@link RecoveryServerHandshakeManager} that can fail at specific stage of the handshake.
*/
static class FailingRecoveryServerHandshakeManager extends RecoveryServerHandshakeManager {
/**
* At what stage to fail the handshake.
*/
private final ServerStageFail failAtStage;
/** Constructor. */
private FailingRecoveryServerHandshakeManager(
UUID launchId,
String consistentId,
ServerStageFail failAtStage,
NetworkMessagesFactory messageFactory
) {
super(launchId, consistentId, messageFactory);
this.failAtStage = failAtStage;
}
/** {@inheritDoc} */
@Override public HandshakeAction init(Channel channel) {
if (failAtStage == SERVER_INIT) {
handshakeFuture().completeExceptionally(new RuntimeException());
return HandshakeAction.FAIL;
}
return super.init(channel);
}
/** {@inheritDoc} */
@Override public HandshakeAction onConnectionOpen(Channel channel) {
if (failAtStage == SERVER_CONNECTION_OPENED) {
handshakeFuture().completeExceptionally(new RuntimeException());
return HandshakeAction.FAIL;
}
return super.onConnectionOpen(channel);
}
/** {@inheritDoc} */
@Override public HandshakeAction onMessage(Channel channel, NetworkMessage message) {
if (failAtStage == SERVER_CLIENT_RESPONDED) {
handshakeFuture().completeExceptionally(new RuntimeException());
return HandshakeAction.FAIL;
}
return super.onMessage(channel, message);
}
/** Server handshake stage to fail at. */
enum ServerStageFail {
/** Don't fail at all. */
SERVER_DOESNT_FAIL,
/** Fail on init. */
SERVER_INIT,
/** Fail on connection open */
SERVER_CONNECTION_OPENED,
/** Fail on client response. */
SERVER_CLIENT_RESPONDED
}
}
/**
* {@link RecoveryClientHandshakeManager} that can fail at specific stage of the handshake.
*/
static class FailingRecoveryClientHandshakeManager extends RecoveryClientHandshakeManager {
/**
* At what stage to fail the handshake.
*/
private final ClientStageFail failAtStage;
/** Constructor. */
private FailingRecoveryClientHandshakeManager(
UUID launchId,
String consistentId,
ClientStageFail failAtStage,
NetworkMessagesFactory messageFactory
) {
super(launchId, consistentId, messageFactory);
this.failAtStage = failAtStage;
}
/** {@inheritDoc} */
@Override public HandshakeAction init(Channel channel) {
if (failAtStage == CLIENT_INIT) {
handshakeFuture().completeExceptionally(new RuntimeException());
return HandshakeAction.FAIL;
}
return super.init(channel);
}
/** {@inheritDoc} */
@Override public HandshakeAction onConnectionOpen(Channel channel) {
if (failAtStage == CLIENT_CONNECTION_OPENED) {
handshakeFuture().completeExceptionally(new RuntimeException());
return HandshakeAction.FAIL;
}
return super.onConnectionOpen(channel);
}
/** {@inheritDoc} */
@Override public HandshakeAction onMessage(Channel channel, NetworkMessage message) {
if (failAtStage == CLIENT_SERVER_RESPONDED) {
handshakeFuture().completeExceptionally(new RuntimeException());
return HandshakeAction.FAIL;
}
return super.onMessage(channel, message);
}
/** Client handshake stage to fail at. */
enum ClientStageFail {
/** Don't fail at all. */
CLIENT_DOESNT_FAIL,
/** Fail on init. */
CLIENT_INIT,
/** Fail on connection open. */
CLIENT_CONNECTION_OPENED,
/** Fail on server response. */
CLIENT_SERVER_RESPONDED
}
}
/**
* Starts a {@link ConnectionManager} adding it to the {@link #startedManagers} list.
*
* @param port Port for the {@link ConnectionManager#server}.
* @param serverHandshakeFailAt At what stage to fail server handshake.
* @param clientHandshakeFailAt At what stage to fail client handshake.
* @return Connection manager.
*/
private ConnectionManager startManager(
int port,
ServerStageFail serverHandshakeFailAt,
ClientStageFail clientHandshakeFailAt
) {
var registry = new TestMessageSerializationRegistryImpl();
var messageFactory = new NetworkMessagesFactory();
UUID launchId = UUID.randomUUID();
String consistentId = UUID.randomUUID().toString();
var manager = new ConnectionManager(
port,
registry,
consistentId,
() -> new FailingRecoveryServerHandshakeManager(launchId, consistentId, serverHandshakeFailAt, messageFactory),
() -> new FailingRecoveryClientHandshakeManager(launchId, consistentId, clientHandshakeFailAt, messageFactory)
);
manager.start();
startedManagers.add(manager);
return manager;
}
/**
* Starts a {@link ConnectionManager} with a normal handshake manager adding it to the {@link #startedManagers} list.
*
* @param port Port.
* @return Connection manager
*/
private ConnectionManager startManager(int port) {
var registry = new TestMessageSerializationRegistryImpl();
var messageFactory = new NetworkMessagesFactory();
UUID launchId = UUID.randomUUID();
String consistentId = UUID.randomUUID().toString();
var manager = new ConnectionManager(
port,
registry,
consistentId,
() -> new RecoveryServerHandshakeManager(launchId, consistentId, messageFactory),
() -> new RecoveryClientHandshakeManager(launchId, consistentId, messageFactory)
);
manager.start();
startedManagers.add(manager);
return manager;
}
}