blob: dc66277642d3ee19dfebeebca8eac5c9dd92feb4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteClientDisconnectedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.Ignition;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.failure.AbstractFailureHandler;
import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.failure.TestFailureHandler;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.IgniteSpiOperationTimeoutException;
import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK;
/**
* Tests client to be able restore connection to cluster if coordination is not available.
*/
@WithSystemProperty(key = IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK, value = "true")
public class IgniteClientRejoinTest extends GridCommonAbstractTest {
/** Block. */
private volatile boolean block;
/** Block all. */
private volatile boolean blockAll;
/** Coordinator. */
private volatile ClusterNode crd;
/** Client reconnect disabled. */
private boolean clientReconnectDisabled;
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
stopAllGrids();
}
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
clientReconnectDisabled = false;
}
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
if (gridName.contains("client")) {
cfg.setCommunicationSpi(new TcpCommunicationSpi());
TcpDiscoverySpi spi = (TcpDiscoverySpi)cfg.getDiscoverySpi();
DiscoverySpi dspi = new DiscoverySpi();
dspi.setIpFinder(spi.getIpFinder());
cfg.setDiscoverySpi(dspi);
dspi.setJoinTimeout(60_000);
dspi.setClientReconnectDisabled(clientReconnectDisabled);
}
// TODO: IGNITE-4833
cfg.setPeerClassLoadingEnabled(false);
return cfg;
}
/**
* @throws Exception If failed.
*/
@Test
public void testClientsReconnectAfterStart() throws Exception {
Ignite srv1 = startGrid("server1");
crd = ((IgniteKernal)srv1).localNode();
Ignite srv2 = startGrid("server2");
final CountDownLatch latch = new CountDownLatch(1);
List<Ignite> clientNodes = new ArrayList<>();
final int CLIENTS_NUM = 5;
for (int i = 0; i < CLIENTS_NUM; i++)
clientNodes.add(startClientGrid("client" + i));
blockAll = true;
GridTestUtils.runAsync(new Callable<Object>() {
@Override public Object call() throws Exception {
U.sleep(5_000);
block = true;
blockAll = false;
System.out.println(">>> Allow with blocked coordinator.");
latch.countDown();
return null;
}
});
IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
@Override public Object call() throws Exception {
latch.await();
U.sleep((new Random().nextInt(15) + 30) * 1000);
block = false;
System.out.println(">>> Allow coordinator.");
return null;
}
});
fut.get();
for (Ignite client : clientNodes) {
while (true) {
try {
IgniteCache<Integer, Integer> cache = client.getOrCreateCache("some");
for (int i = 0; i < 100; i++)
cache.put(i, i);
for (int i = 0; i < 100; i++)
assertEquals((Integer)i, cache.get(i));
cache.clear();
break;
}
catch (IgniteClientDisconnectedException e) {
e.reconnectFuture().get();
}
}
}
assertEquals(CLIENTS_NUM, srv1.cluster().forClients().nodes().size());
assertEquals(CLIENTS_NUM, srv2.cluster().forClients().nodes().size());
}
/**
* @throws Exception If failed.
*/
@Test
public void testClientsReconnect() throws Exception {
Ignite srv1 = startGrid("server1");
crd = ((IgniteKernal)srv1).localNode();
Ignite srv2 = startGrid("server2");
block = true;
List<IgniteInternalFuture<Ignite>> futs = new ArrayList<>();
final CountDownLatch latch = new CountDownLatch(1);
final int CLIENTS_NUM = 5;
for (int i = 0; i < CLIENTS_NUM; i++) {
final int idx = i;
IgniteInternalFuture<Ignite> fut = GridTestUtils.runAsync(new Callable<Ignite>() {
@Override public Ignite call() throws Exception {
latch.await();
String nodeName = "client" + idx;
IgniteConfiguration cfg = getConfiguration(nodeName)
.setFailureHandler(new AbstractFailureHandler() {
@Override protected boolean handle(Ignite ignite, FailureContext failureCtx) {
// This should _not_ fire when exchange-worker terminates before reconnect.
Runtime.getRuntime().halt(Ignition.KILL_EXIT_CODE);
return false;
}
});
return startClientGrid(nodeName, optimize(cfg));
}
});
futs.add(fut);
}
GridTestUtils.runAsync(new Callable<Boolean>() {
@Override public Boolean call() throws Exception {
latch.countDown();
Random rnd = new Random();
U.sleep((rnd.nextInt(15) + 15) * 1000);
block = false;
System.out.println(">>> ALLOW connection to coordinator.");
return true;
}
});
for (IgniteInternalFuture<Ignite> clientFut : futs) {
Ignite client = clientFut.get();
IgniteCache<Integer, Integer> cache = client.getOrCreateCache(client.name());
for (int i = 0; i < 100; i++)
cache.put(i, i);
for (int i = 0; i < 100; i++)
assert i == cache.get(i);
}
assertEquals(CLIENTS_NUM, srv1.cluster().forClients().nodes().size());
assertEquals(CLIENTS_NUM, srv2.cluster().forClients().nodes().size());
}
/**
* @throws Exception If failed.
*/
@Test
public void testClientsReconnectDisabled() throws Exception {
clientReconnectDisabled = true;
Ignite srv1 = startGrid("server1");
if (!tcpDiscovery())
return;
crd = ((IgniteKernal)srv1).localNode();
Ignite srv2 = startGrid("server2");
block = true;
List<IgniteInternalFuture<Ignite>> futs = new ArrayList<>();
final CountDownLatch latch = new CountDownLatch(1);
final int CLIENTS_NUM = 5;
final CountDownLatch failureHndLatch = new CountDownLatch(CLIENTS_NUM);
for (int i = 0; i < CLIENTS_NUM; i++) {
final int idx = i;
IgniteInternalFuture<Ignite> fut = GridTestUtils.runAsync(new Callable<Ignite>() {
@Override public Ignite call() throws Exception {
latch.await();
String igniteInstanceName = "client" + idx;
return startClientGrid(igniteInstanceName, getConfiguration(igniteInstanceName)
.setFailureHandler(new TestFailureHandler(true, failureHndLatch)));
}
});
futs.add(fut);
}
latch.countDown();
for (final IgniteInternalFuture<Ignite> clientFut : futs) {
//noinspection ThrowableNotThrown
GridTestUtils.assertThrows(log, new Callable<Object>() {
@Override public Object call() throws Exception {
clientFut.get();
return null;
}
}, IgniteCheckedException.class, null);
}
assertTrue(failureHndLatch.await(1000, TimeUnit.MILLISECONDS));
assertEquals(0, srv1.cluster().forClients().nodes().size());
assertEquals(0, srv2.cluster().forClients().nodes().size());
}
/** {@inheritDoc} */
@Override protected long getTestTimeout() {
return 3 * 60_000;
}
/**
*
*/
private class TcpCommunicationSpi extends org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi {
/** {@inheritDoc} */
@Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException {
if (blockAll || block && node.id().equals(crd.id()))
throw new IgniteSpiException(new SocketException("Test communication exception"));
super.sendMessage(node, msg);
}
/** {@inheritDoc} */
@Override public void sendMessage(ClusterNode node, Message msg,
IgniteInClosure<IgniteException> ackC) throws IgniteSpiException {
if (blockAll || block && node.id().equals(crd.id()))
throw new IgniteSpiException(new SocketException("Test communication exception"));
super.sendMessage(node, msg, ackC);
}
}
/**
*
*/
private class DiscoverySpi extends TcpDiscoverySpi {
/** {@inheritDoc} */
@Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, byte[] data,
long timeout) throws IOException {
if (blockAll || block && sock.getPort() == 47500)
throw new SocketException("Test discovery exception");
super.writeToSocket(sock, msg, data, timeout);
}
/** {@inheritDoc} */
@Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
long timeout) throws IOException, IgniteCheckedException {
if (blockAll || block && sock.getPort() == 47500)
throw new SocketException("Test discovery exception");
super.writeToSocket(sock, msg, timeout);
}
/** {@inheritDoc} */
@Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg,
long timeout) throws IOException, IgniteCheckedException {
if (blockAll || block && sock.getPort() == 47500)
throw new SocketException("Test discovery exception");
super.writeToSocket(sock, out, msg, timeout);
}
/** {@inheritDoc} */
@Override protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res,
long timeout) throws IOException {
if (blockAll || block && sock.getPort() == 47500)
throw new SocketException("Test discovery exception");
super.writeToSocket(msg, sock, res, timeout);
}
/** {@inheritDoc} */
@Override protected Socket openSocket(Socket sock, InetSocketAddress remAddr,
IgniteSpiOperationTimeoutHelper timeoutHelper) throws IOException, IgniteSpiOperationTimeoutException {
if (blockAll || block && sock.getPort() == 47500)
throw new SocketException("Test discovery exception");
return super.openSocket(sock, remAddr, timeoutHelper);
}
}
}