blob: d07ca47bc21009991d869fb1e6f6b0d224e81e69 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.spi.discovery.tcp;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.Ignite;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.GridTestThread;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
/**
* Client-based discovery SPI test with non-Ignite servers.
*/
public class TcpDiscoveryWithWrongServerTest extends GridCommonAbstractTest {
/** Non-Ignite Server port #1. */
private static final int SERVER_PORT = 47500;
/** Non-Ignite Server port #2. */
private static final int LAST_SERVER_PORT = SERVER_PORT + 5;
/** Non-Ignite Server sockets. */
private List<ServerSocket> srvSocks = new ArrayList<>();
/** Count of accepted connections to non-Ignite Server. */
private AtomicInteger connCnt = new AtomicInteger(0);
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
ipFinder.setAddresses(Collections.singleton("127.0.0.1:" + Integer.toString(SERVER_PORT) + ".." +
Integer.toString(LAST_SERVER_PORT)));
cfg.setDiscoverySpi(new TcpDiscoverySpiWithOrderedIps().setIpFinder(ipFinder));
return cfg;
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
stopTcpThreads();
stopAllGrids();
super.afterTest();
}
/**
* Starts tcp test thread
* @param workerFactory one of WorkerFactory
*/
private void startTcpThread(final WorkerFactory workerFactory, final int port) throws Exception {
final ServerSocket srvSock = new ServerSocket(port, 10, InetAddress.getByName("127.0.0.1"));
srvSocks.add(srvSock);
new GridTestThread(new Runnable() {
@Override public void run() {
try {
while(!Thread.currentThread().isInterrupted()) {
Socket clientSock = srvSock.accept();
connCnt.getAndIncrement();
// Create a new thread for socket connection.
new GridTestThread(workerFactory.newWorker(clientSock)).start();
}
}
catch (Exception e) {
if (!srvSock.isClosed())
log.error("Unexpected error", e);
}
}
}).start();
}
/**
* Stops tcp test thread
* @throws IOException IOException
*/
private void stopTcpThreads() throws IOException {
for (ServerSocket srvSock: srvSocks)
if (!srvSock.isClosed())
srvSock.close();
}
/**
* Test that Client successfully ignores wrong responses during Discovery Handshake Procedure.
*
* @throws Exception in case of error.
*/
@Test
public void testWrongHandshakeResponse() throws Exception {
startTcpThread(new SomeResponseWorker(), SERVER_PORT);
startTcpThread(new SomeResponseWorker(), LAST_SERVER_PORT);
simpleTest();
}
/**
* Test that Client successfully ignores wrong responses during Discovery Handshake Procedure.
*
* @throws Exception in case of error.
*/
@Test
public void testNoHandshakeResponse() throws Exception {
startTcpThread(new NoResponseWorker(), SERVER_PORT);
startTcpThread(new NoResponseWorker(), LAST_SERVER_PORT);
simpleTest();
}
/**
* Test that Client successfully ignores when server closes sockets after Discovery Handshake Request.
*
* @throws Exception in case of error.
*/
@Test
public void testDisconnectOnRequest() throws Exception {
startTcpThread(new DisconnectOnRequestWorker(), SERVER_PORT);
startTcpThread(new DisconnectOnRequestWorker(), LAST_SERVER_PORT);
simpleTest();
}
/**
* Test that Client successfully ignores when server closes sockets immediately.
*
* @throws Exception in case of error.
*/
@Test
public void testEarlyDisconnect() throws Exception {
startTcpThread(new EarlyDisconnectWorker(), SERVER_PORT);
startTcpThread(new EarlyDisconnectWorker(), LAST_SERVER_PORT);
simpleTest();
}
/**
* Some simple sanity check with the Server and Client
* It is expected that both client and server could successfully perform Discovery Procedure when there is
* unknown (test) server in the ipFinder list.
*/
private void simpleTest() {
try {
Ignite srv = startGrid("server");
Ignite client = startClientGrid("client");
awaitPartitionMapExchange();
assertEquals(2, srv.cluster().nodes().size());
assertEquals(2, client.cluster().nodes().size());
assertTrue(connCnt.get() >= 2);
srv.getOrCreateCache(DEFAULT_CACHE_NAME).put(1, 1);
assertEquals(1, client.getOrCreateCache(DEFAULT_CACHE_NAME).get(1));
}
catch (Exception e) {
fail("Failed with unexpected exception: " + e.getMessage());
}
}
/**
* Just a factory for runnable workers
*/
private interface WorkerFactory {
/**
* Creates a new worker for socket
* @param clientSock socket for worker
* @return runnable Worker
*/
Runnable newWorker(Socket clientSock);
}
/**
* SocketWorker
*/
private abstract class SocketWorker implements Runnable {
/** Client socket. */
Socket clientSock;
/**
* @param clientSock Client socket.
*/
SocketWorker(Socket clientSock) {
this.clientSock = clientSock;
}
/** {@inheritDoc} */
@Override public void run() {
try {
InputStream input = clientSock.getInputStream();
OutputStream output = clientSock.getOutputStream();
byte[] buf = new byte[1024];
while (!clientSock.isClosed() && input.read(buf) > 0)
action(input, output);
if (!clientSock.isClosed())
clientSock.close();
}
catch (IOException e) {
log.error("Unexpected error", e);
}
}
/**
* @param input socket input stream
* @param output socket output stream
* @throws IOException IOException
*/
public abstract void action(InputStream input, OutputStream output) throws IOException;
}
/**
* SomeResponseWorker.
*/
private class SomeResponseWorker implements WorkerFactory {
/** {@inheritDoc} */
@Override public Runnable newWorker(Socket clientSock) {
return new SocketWorker(clientSock) {
@Override public void action(InputStream input, OutputStream output) throws IOException {
output.write("Some response".getBytes());
log.error("TEST: Some response was sent to " + clientSock.getRemoteSocketAddress());
}
};
}
}
/**
* NoResponseWorker.
*/
private class NoResponseWorker implements WorkerFactory {
/** {@inheritDoc} */
@Override public Runnable newWorker(Socket clientSock) {
return new SocketWorker(clientSock) {
@Override public void action(InputStream input, OutputStream output) throws IOException {
log.error("TEST: No response was sent to " + clientSock.getRemoteSocketAddress());
}
};
}
}
/**
* DisconnectOnRequestWorker.
*/
private class DisconnectOnRequestWorker implements WorkerFactory {
/** {@inheritDoc} */
@Override public Runnable newWorker(Socket clientSock) {
return new SocketWorker(clientSock) {
@Override public void action(InputStream input, OutputStream output) throws IOException {
clientSock.close();
log.error("TEST: Socket closed for " + clientSock.getRemoteSocketAddress());
}
};
}
}
/**
* EarlyDisconnectWorker.
*/
private class EarlyDisconnectWorker implements WorkerFactory {
/** {@inheritDoc} */
@Override public Runnable newWorker(Socket clientSock) {
return new SocketWorker(clientSock) {
@Override public void action(InputStream input, OutputStream output) throws IOException {
// No-op
}
@Override public void run() {
try {
clientSock.close();
log.error("TEST: Socket closed for " + clientSock.getRemoteSocketAddress());
}
catch (IOException e) {
log.error("Unexpected error", e);
}
}
};
}
}
/**
* TcpDiscoverySpi with non-shuffled resolved IP addresses. We should ensure that in this test non-Ignite server
* is the first element of the addresses list
*/
class TcpDiscoverySpiWithOrderedIps extends TcpDiscoverySpi {
/** {@inheritDoc} */
@Override protected Collection<InetSocketAddress> resolvedAddresses() throws IgniteSpiException {
Collection<InetSocketAddress> shuffled = super.resolvedAddresses();
List<InetSocketAddress> res = new ArrayList<>(shuffled);
Collections.sort(res, new Comparator<InetSocketAddress>() {
@Override public int compare(InetSocketAddress o1, InetSocketAddress o2) {
return o1.toString().compareTo(o2.toString());
}
});
return res;
}
}
}