blob: 7a8e3fd8ef582f52776d5a8c31eae2468db2caaf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.spi.discovery.zk.internal;
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import javax.management.JMX;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.Event;
import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.discovery.zk.ZookeeperDiscoverySpi;
import org.apache.ignite.spi.discovery.zk.ZookeeperDiscoverySpiMBean;
import org.apache.ignite.spi.discovery.zk.ZookeeperDiscoverySpiTestUtil;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.zookeeper.ZkTestClientCnxnSocketNIO;
import org.junit.Test;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED;
import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_RECONNECTED;
/**
* Tests for Zookeeper SPI discovery.
*/
public class ZookeeperDiscoveryClientDisconnectTest extends ZookeeperDiscoverySpiTestBase {
/** */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
// we reduce fealure detection tp speedup failure detection on catch(Exception) clause in createTcpClient().
cfg.setFailureDetectionTimeout(2000);
cfg.setClientFailureDetectionTimeout(2000);
return cfg;
}
/**
* Test reproduces failure in case of client resolution failure
* {@link org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi#createTcpClient} from server side, further
* client reconnect and proper grid work.
*
* @throws Exception If failed.
*/
@Test
public void testClientReconnects() throws Exception {
blockCommSpi = true;
Ignite srv1 = startGrid("server1-block");
IgniteEx cli = startClientGrid("client-block");
IgniteCache<Object, Object> cache = cli.getOrCreateCache(DEFAULT_CACHE_NAME);
cache.put(1, 1);
assertEquals(cache.get(1), 1);
assertEquals(1, srv1.cluster().forClients().nodes().size());
MBeanServer srv = ManagementFactory.getPlatformMBeanServer();
IgniteEx ignite = grid("server1-block");
ObjectName spiName = U.makeMBeanName(ignite.context().igniteInstanceName(), "SPIs",
ZookeeperDiscoverySpi.class.getSimpleName());
ZookeeperDiscoverySpiMBean bean = JMX.newMBeanProxy(srv, spiName, ZookeeperDiscoverySpiMBean.class);
assertNotNull(bean);
assertEquals(0, bean.getCommErrorProcNum());
}
/**
* @throws Exception If failed.
*/
@Test
public void testConnectionCheck() throws Exception {
final int NODES = 5;
startGridsMultiThreaded(NODES);
for (int i = 0; i < NODES; i++) {
Ignite node = ignite(i);
TcpCommunicationSpi spi = (TcpCommunicationSpi)node.configuration().getCommunicationSpi();
List<ClusterNode> nodes = new ArrayList<>(node.cluster().nodes());
BitSet res = spi.checkConnection(nodes).get();
for (int j = 0; j < NODES; j++)
assertTrue(res.get(j));
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testReconnectDisabled_ConnectionLost() throws Exception {
clientReconnectDisabled = true;
startGrid(0);
sesTimeout = 3000;
testSockNio = true;
Ignite client = startClientGrid(1);
final CountDownLatch latch = new CountDownLatch(1);
client.events().localListen(new IgnitePredicate<Event>() {
@Override public boolean apply(Event evt) {
latch.countDown();
return false;
}
}, EventType.EVT_NODE_SEGMENTED);
ZkTestClientCnxnSocketNIO nio = ZkTestClientCnxnSocketNIO.forNode(client);
nio.closeSocket(true);
try {
ZookeeperDiscoverySpiTestHelper.waitNoAliveZkNodes(log,
zkCluster.getConnectString(),
Collections.singletonList(ZookeeperDiscoverySpiTestHelper.aliveZkNodePath(client)),
10_000);
}
finally {
nio.allowConnect();
}
assertTrue(latch.await(10, SECONDS));
}
/**
* @throws Exception If failed.
*/
@Test
public void testServersLeft_FailOnTimeout() throws Exception {
startGrid(0);
final int CLIENTS = 5;
joinTimeout = 3000;
startClientGridsMultiThreaded(1, CLIENTS);
waitForTopology(CLIENTS + 1);
final CountDownLatch latch = new CountDownLatch(CLIENTS);
for (int i = 0; i < CLIENTS; i++) {
Ignite node = ignite(i + 1);
node.events().localListen(new IgnitePredicate<Event>() {
@Override public boolean apply(Event evt) {
latch.countDown();
return false;
}
}, EventType.EVT_NODE_SEGMENTED);
}
stopGrid(getTestIgniteInstanceName(0), true, false);
assertTrue(latch.await(10, SECONDS));
evts.clear();
}
/**
*
*/
@Test
public void testStartNoServers_FailOnTimeout() {
joinTimeout = 3000;
long start = System.currentTimeMillis();
Throwable err = GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override public Void call() throws Exception {
startClientGrid(0);
return null;
}
}, IgniteCheckedException.class, null);
assertTrue(System.currentTimeMillis() >= start + joinTimeout);
IgniteSpiException spiErr = X.cause(err, IgniteSpiException.class);
assertNotNull(spiErr);
assertTrue(spiErr.getMessage().contains("Failed to connect to cluster within configured timeout"));
}
/**
* @throws Exception If failed.
*/
@Test
public void testStartNoServer_WaitForServers1() throws Exception {
startNoServer_WaitForServers(0);
}
/**
* @throws Exception If failed.
*/
@Test
public void testStartNoServer_WaitForServers2() throws Exception {
startNoServer_WaitForServers(10_000);
}
/**
* @param joinTimeout Join timeout.
* @throws Exception If failed.
*/
private void startNoServer_WaitForServers(long joinTimeout) throws Exception {
this.joinTimeout = joinTimeout;
IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
startClientGrid(0);
return null;
}
});
U.sleep(3000);
helper.waitSpi(getTestIgniteInstanceName(0), spis);
startGrid(1);
fut.get();
waitForTopology(2);
}
/**
* @throws Exception If failed.
*/
@Test
public void testDisconnectOnServersLeft_1() throws Exception {
disconnectOnServersLeft(1, 1);
}
/**
* @throws Exception If failed.
*/
@Test
public void testDisconnectOnServersLeft_2() throws Exception {
disconnectOnServersLeft(5, 1);
}
/**
* @throws Exception If failed.
*/
@Test
public void testDisconnectOnServersLeft_3() throws Exception {
disconnectOnServersLeft(1, 10);
}
/**
* @throws Exception If failed.
*/
@Test
public void testDisconnectOnServersLeft_4() throws Exception {
disconnectOnServersLeft(5, 10);
}
/**
* @throws Exception If failed.
*/
@Test
public void testDisconnectOnServersLeft_5() throws Exception {
joinTimeout = 10_000;
disconnectOnServersLeft(5, 10);
}
/**
* @param srvs Number of servers.
* @param clients Number of clients.
* @throws Exception If failed.
*/
private void disconnectOnServersLeft(int srvs, int clients) throws Exception {
startGridsMultiThreaded(srvs);
startClientGridsMultiThreaded(srvs, clients);
for (int i = 0; i < GridTestUtils.SF.applyLB(5, 2); i++) {
info("Iteration: " + i);
final CountDownLatch disconnectLatch = new CountDownLatch(clients);
final CountDownLatch reconnectLatch = new CountDownLatch(clients);
IgnitePredicate<Event> p = new IgnitePredicate<Event>() {
@Override public boolean apply(Event evt) {
if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
log.info("Disconnected: " + evt);
disconnectLatch.countDown();
}
else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
log.info("Reconnected: " + evt);
reconnectLatch.countDown();
return false;
}
return true;
}
};
for (int c = 0; c < clients; c++) {
Ignite client = ignite(srvs + c);
assertTrue(client.configuration().isClientMode());
client.events().localListen(p, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
}
log.info("Stop all servers.");
GridTestUtils.runMultiThreaded(new IgniteInClosure<Integer>() {
@Override public void apply(Integer threadIdx) {
stopGrid(getTestIgniteInstanceName(threadIdx), true, false);
}
}, srvs, "stop-server");
ZookeeperDiscoverySpiTestHelper.waitReconnectEvent(log, disconnectLatch);
evts.clear();
log.info("Restart servers.");
startGridsMultiThreaded(0, srvs);
ZookeeperDiscoverySpiTestHelper.waitReconnectEvent(log, reconnectLatch);
waitForTopology(srvs + clients);
log.info("Reconnect finished.");
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testStartNoZk() throws Exception {
stopZkCluster();
sesTimeout = 30_000;
zkCluster = ZookeeperDiscoverySpiTestUtil.createTestingCluster(3);
try {
final AtomicInteger idx = new AtomicInteger();
IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
startGrid(idx.getAndIncrement());
return null;
}
}, 5, "start-node");
U.sleep(5000);
assertFalse(fut.isDone());
zkCluster.start();
fut.get();
waitForTopology(5);
}
finally {
zkCluster.start();
}
}
}