blob: 3fc50361d8f0d63dcdb08711f1b1fc00407f11e9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal;
import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.cache.CacheException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteClientDisconnectedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.Event;
import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryJoinRequestMessage;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.Nullable;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED;
import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_RECONNECTED;
/**
*
*/
public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstractTest {
/** */
private static final long RECONNECT_TIMEOUT = 10_000;
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
TestTcpDiscoverySpi disco = new TestTcpDiscoverySpi();
disco.setIpFinder(sharedStaticIpFinder);
disco.setJoinTimeout(2 * 60_000);
disco.setSocketTimeout(1000);
disco.setNetworkTimeout(2000);
cfg.setDiscoverySpi(disco);
BlockTcpCommunicationSpi commSpi = new BlockTcpCommunicationSpi();
commSpi.setSharedMemoryPort(-1);
cfg.setCommunicationSpi(commSpi);
cfg.setIncludeEventTypes(EventType.EVTS_ALL);
return cfg;
}
/**
* @param latch Latch.
* @throws Exception If failed.
*/
protected void waitReconnectEvent(CountDownLatch latch) throws Exception {
waitReconnectEvent(log, latch);
}
/**
* @param log Logger.
* @param latch Latch.
* @throws Exception If failed.
*/
protected static void waitReconnectEvent(IgniteLogger log, CountDownLatch latch) throws Exception {
if (!latch.await(RECONNECT_TIMEOUT, MILLISECONDS)) {
log.error("Failed to wait for reconnect event, will dump threads, latch count: " + latch.getCount());
U.dumpThreads(log);
fail("Failed to wait for disconnect/reconnect event.");
}
}
/**
* @return Number of server nodes started before tests.
*/
protected abstract int serverCount();
/**
* @return Number of client nodes started before tests.
*/
protected int clientCount() {
return 0;
}
/**
* @param ignite Node.
* @return Discovery SPI.
*/
protected static TestTcpDiscoverySpi spi(Ignite ignite) {
return ((TestTcpDiscoverySpi)ignite.configuration().getDiscoverySpi());
}
/**
* @param ignite Node.
* @return Discovery SPI.
*/
protected static IgniteDiscoverySpi spi0(Ignite ignite) {
return ((IgniteDiscoverySpi)ignite.configuration().getDiscoverySpi());
}
/**
* @param ignite Node.
* @return Communication SPI.
*/
protected BlockTcpCommunicationSpi commSpi(Ignite ignite) {
return ((BlockTcpCommunicationSpi)ignite.configuration().getCommunicationSpi());
}
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();
int srvs = serverCount();
if (srvs > 0)
startGrids(srvs);
int clients = clientCount();
if (clients > 0)
startClientGridsMultiThreaded(srvs, clients);
}
/** {@inheritDoc} */
@Override protected void afterTestsStopped() throws Exception {
super.afterTestsStopped();
stopAllGrids();
}
/**
* @param client Client.
* @return Server node client connected to.
*/
protected Ignite clientRouter(Ignite client) {
if (tcpDiscovery()) {
TcpDiscoveryNode node = (TcpDiscoveryNode)client.cluster().localNode();
assertTrue(node.isClient());
assertNotNull(node.clientRouterNodeId());
Ignite srv = G.ignite(node.clientRouterNodeId());
assertNotNull(srv);
return srv;
}
else {
for (Ignite node : G.allGrids()) {
if (!node.cluster().localNode().isClient())
return node;
}
fail();
return null;
}
}
/**
* @param fut Future.
* @throws Exception If failed.
*/
protected void assertNotDone(IgniteInternalFuture<?> fut) throws Exception {
assertNotNull(fut);
if (fut.isDone())
fail("Future completed with result: " + fut.get());
}
/**
* Reconnect client node.
*
* @param client Client.
* @param srv Server.
* @param disconnectedC Closure which will be run when client node disconnected.
* @throws Exception If failed.
*/
protected void reconnectClientNode(Ignite client, Ignite srv, @Nullable Runnable disconnectedC)
throws Exception {
reconnectClientNodes(log, Collections.singletonList(client), srv, disconnectedC);
}
/**
* Reconnect client node.
*
* @param log Logger.
* @param client Client.
* @param srv Server.
* @param disconnectedC Closure which will be run when client node disconnected.
* @throws Exception If failed.
*/
public static void reconnectClientNode(IgniteLogger log,
Ignite client,
Ignite srv,
@Nullable Runnable disconnectedC)
throws Exception {
reconnectClientNodes(log, Collections.singletonList(client), srv, disconnectedC);
}
/**
* Reconnect client node.
*
* @param log Logger.
* @param clients Clients.
* @param srv Server.
* @param disconnectedC Closure which will be run when client node disconnected.
* @throws Exception If failed.
*/
protected static void reconnectClientNodes(final IgniteLogger log,
List<Ignite> clients, Ignite srv,
@Nullable Runnable disconnectedC)
throws Exception {
final IgniteDiscoverySpi srvSpi = spi0(srv);
final CountDownLatch disconnectLatch = new CountDownLatch(clients.size());
final CountDownLatch reconnectLatch = new CountDownLatch(clients.size());
log.info("Block reconnect.");
List<DiscoverySpiTestListener> blockLsnrs = new ArrayList<>();
for (Ignite client : clients) {
DiscoverySpiTestListener lsnr = new DiscoverySpiTestListener();
lsnr.startBlockJoin();
blockLsnrs.add(lsnr);
spi0(client).setInternalListener(lsnr);
}
IgnitePredicate<Event> p = new IgnitePredicate<Event>() {
@Override public boolean apply(Event evt) {
if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
log.info("Disconnected: " + evt);
disconnectLatch.countDown();
}
else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
log.info("Reconnected: " + evt);
reconnectLatch.countDown();
}
return true;
}
};
try {
for (Ignite client : clients)
client.events().localListen(p, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
for (Ignite client : clients)
srvSpi.failNode(client.cluster().localNode().id(), null);
waitReconnectEvent(log, disconnectLatch);
if (disconnectedC != null)
disconnectedC.run();
log.info("Allow reconnect.");
for (DiscoverySpiTestListener blockLsnr : blockLsnrs)
blockLsnr.stopBlockJoin();
waitReconnectEvent(log, reconnectLatch);
for (Ignite client : clients)
client.events().stopLocalListen(p);
}
finally {
for (DiscoverySpiTestListener blockLsnr : blockLsnrs)
blockLsnr.stopBlockJoin();
}
}
/**
* @param log Logger.
* @param client Client node.
* @param srvs Server nodes to stop.
* @param srvStartC Closure starting server nodes.
* @throws Exception If failed.
* @return Restarted servers.
*/
public static Collection<Ignite> reconnectServersRestart(final IgniteLogger log,
Ignite client,
Collection<Ignite> srvs,
Callable<Collection<Ignite>> srvStartC)
throws Exception {
final CountDownLatch disconnectLatch = new CountDownLatch(1);
final CountDownLatch reconnectLatch = new CountDownLatch(1);
client.events().localListen(new IgnitePredicate<Event>() {
@Override public boolean apply(Event evt) {
if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
log.info("Disconnected: " + evt);
disconnectLatch.countDown();
}
else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
log.info("Reconnected: " + evt);
reconnectLatch.countDown();
}
return true;
}
}, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
for (Ignite srv : srvs)
srv.close();
assertTrue(disconnectLatch.await(30_000, MILLISECONDS));
Collection<Ignite> startedSrvs = srvStartC.call();
assertTrue(reconnectLatch.await(10_000, MILLISECONDS));
return startedSrvs;
}
/**
* @param e Client disconnected exception.
* @return Reconnect future.
*/
protected IgniteFuture<?> check(CacheException e) {
log.info("Expected exception: " + e);
if (!(e.getCause() instanceof IgniteClientDisconnectedException))
log.error("Unexpected cause: " + e.getCause(), e);
assertTrue("Unexpected cause: " + e.getCause(), e.getCause() instanceof IgniteClientDisconnectedException);
IgniteClientDisconnectedException e0 = (IgniteClientDisconnectedException)e.getCause();
assertNotNull(e0.reconnectFuture());
return e0.reconnectFuture();
}
/**
* @param e Client disconnected exception.
*/
protected void checkAndWait(CacheException e) {
check(e).get();
}
/**
* @param e Client disconnected exception.
*/
protected void checkAndWait(IgniteClientDisconnectedException e) {
log.info("Expected exception: " + e);
assertNotNull(e.reconnectFuture());
e.reconnectFuture().get();
}
/**
*
*/
public static class TestTcpDiscoverySpi extends TcpDiscoverySpi {
/** */
volatile CountDownLatch writeLatch;
/** {@inheritDoc} */
@Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, long timeout)
throws IOException, IgniteCheckedException {
if (msg instanceof TcpDiscoveryJoinRequestMessage) {
CountDownLatch writeLatch0 = writeLatch;
if (writeLatch0 != null) {
log.info("Block join request send: " + msg);
U.await(writeLatch0);
}
}
super.writeToSocket(sock, out, msg, timeout);
}
}
/**
*
*/
protected static class BlockTcpCommunicationSpi extends TcpCommunicationSpi {
/** */
volatile Class msgCls;
/** */
AtomicBoolean collectStart = new AtomicBoolean(false);
/** */
ConcurrentHashMap<String, ClusterNode> classes = new ConcurrentHashMap<>();
/** */
@LoggerResource
private IgniteLogger log;
/** {@inheritDoc} */
@Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC)
throws IgniteSpiException {
Class msgCls0 = msgCls;
if (collectStart.get() && msg instanceof GridIoMessage)
classes.put(((GridIoMessage)msg).message().getClass().getName(), node);
if (msgCls0 != null && msg instanceof GridIoMessage
&& ((GridIoMessage)msg).message().getClass().equals(msgCls)) {
log.info("Block message: " + msg);
return;
}
super.sendMessage(node, msg, ackC);
}
/**
* @param clazz Class of messages which will be block.
*/
public void blockMessage(Class clazz) {
msgCls = clazz;
}
/**
* Unlock all message.
*/
public void unblockMessage() {
msgCls = null;
}
/**
* Start collect messages.
*/
public void start() {
collectStart.set(true);
}
/**
* Print collected messages.
*/
public void print() {
for (String s : classes.keySet())
log.error(s);
}
}
}