blob: ba17a2fdca42be88da2156fff9eab55c7e2b7b6b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.spi.discovery.zk.internal;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.Event;
import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.cache.GridCacheAbstractFullApiSelfTest;
import org.apache.ignite.internal.processors.query.DummyQueryIndexing;
import org.apache.ignite.internal.processors.query.GridQueryProcessor;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.discovery.zk.ZookeeperDiscoverySpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZKUtil;
import org.apache.zookeeper.ZkTestClientCnxnSocketNIO;
import org.apache.zookeeper.ZooKeeper;
import org.junit.Ignore;
import org.junit.Test;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED;
import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_RECONNECTED;
/**
* Tests for Zookeeper SPI discovery.
*/
public class ZookeeperDiscoveryTopologyChangeAndReconnectTest extends ZookeeperDiscoverySpiTestBase {
/** {@code True} if indexing disabled. */
private boolean indexingDisabled;
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
cfg.setIncludeEventTypes(EventType.EVTS_ALL);
if (indexingDisabled)
GridQueryProcessor.idxCls = DummyQueryIndexing.class;
return cfg;
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
super.afterTest();
indexingDisabled = false;
GridQueryProcessor.idxCls = null;
}
/**
* @throws Exception If failed.
*/
@Test
public void testTopologyChangeMultithreaded() throws Exception {
topologyChangeWithRestarts(false, false);
}
/**
* @throws Exception If failed.
*/
@Ignore("https://issues.apache.org/jira/browse/IGNITE-9138")
@Test
public void testTopologyChangeMultithreaded_RestartZk() throws Exception {
try {
topologyChangeWithRestarts(true, false);
}
finally {
zkCluster.close();
zkCluster = null;
}
}
/**
* @throws Exception If failed.
*/
@Ignore("https://issues.apache.org/jira/browse/IGNITE-9138")
@Test
public void testTopologyChangeMultithreaded_RestartZk_CloseClients() throws Exception {
try {
topologyChangeWithRestarts(true, true);
}
finally {
zkCluster.close();
zkCluster = null;
}
}
/**
* @param restartZk If {@code true} in background restarts on of ZK servers.
* @param closeClientSock If {@code true} in background closes zk clients' sockets.
* @throws Exception If failed.
*/
private void topologyChangeWithRestarts(boolean restartZk, boolean closeClientSock) throws Exception {
sesTimeout = 30_000;
if (closeClientSock)
testSockNio = true;
long stopTime = System.currentTimeMillis() + GridTestUtils.SF.applyLB(30_000, 5_000);
AtomicBoolean stop = new AtomicBoolean();
IgniteInternalFuture<?> fut1;
IgniteInternalFuture<?> fut2;
try {
fut1 = restartZk ? startRestartZkServers(stopTime, stop) : null;
fut2 = closeClientSock ? startCloseZkClientSocket(stopTime, stop) : null;
int INIT_NODES = 10;
startGridsMultiThreaded(INIT_NODES);
final int MAX_NODES = 20;
final List<Integer> startedNodes = new ArrayList<>();
for (int i = 0; i < INIT_NODES; i++)
startedNodes.add(i);
ThreadLocalRandom rnd = ThreadLocalRandom.current();
final AtomicInteger startIdx = new AtomicInteger(INIT_NODES);
while (System.currentTimeMillis() < stopTime) {
if (startedNodes.size() >= MAX_NODES) {
int stopNodes = rnd.nextInt(5) + 1;
log.info("Next, stop nodes: " + stopNodes);
final List<Integer> idxs = new ArrayList<>();
while (idxs.size() < stopNodes) {
int stopIdx = rnd.nextInt(startedNodes.size());
if (!idxs.contains(stopIdx))
idxs.add(startedNodes.get(stopIdx));
}
GridTestUtils.runMultiThreaded(new IgniteInClosure<Integer>() {
@Override public void apply(Integer threadIdx) {
int stopNodeIdx = idxs.get(threadIdx);
info("Stop node: " + stopNodeIdx);
stopGrid(stopNodeIdx);
}
}, stopNodes, "stop-node");
startedNodes.removeAll(idxs);
}
else {
int startNodes = rnd.nextInt(5) + 1;
log.info("Next, start nodes: " + startNodes);
GridTestUtils.runMultiThreaded(new Callable<Void>() {
@Override public Void call() throws Exception {
int idx = startIdx.incrementAndGet();
log.info("Start node: " + idx);
startGrid(idx);
synchronized (startedNodes) {
startedNodes.add(idx);
}
return null;
}
}, startNodes, "start-node");
}
U.sleep(rnd.nextInt(100) + 1);
}
}
finally {
stop.set(true);
}
if (fut1 != null)
fut1.get();
if (fut2 != null)
fut2.get();
}
/**
* @throws Exception If failed.
*/
@Test
public void testRandomTopologyChanges() throws Exception {
randomTopologyChanges(false, false);
}
/**
* @throws Exception If failed.
*/
private void checkZkNodesCleanup() throws Exception {
final ZookeeperClient zkClient = new ZookeeperClient(getTestResources().getLogger(),
zkCluster.getConnectString(),
30_000,
null);
final String basePath = ZookeeperDiscoverySpiTestHelper.IGNITE_ZK_ROOT + "/";
final String aliveDir = basePath + ZkIgnitePaths.ALIVE_NODES_DIR + "/";
try {
List<String> znodes = listSubTree(zkClient.zk(), ZookeeperDiscoverySpiTestHelper.IGNITE_ZK_ROOT);
boolean foundAlive = false;
for (String znode : znodes) {
if (znode.startsWith(aliveDir)) {
foundAlive = true;
break;
}
}
assertTrue(foundAlive); // Sanity check to make sure we check correct directory.
assertTrue("Failed to wait for unused znodes cleanup", GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
try {
List<String> znodes = listSubTree(zkClient.zk(), ZookeeperDiscoverySpiTestHelper.IGNITE_ZK_ROOT);
for (String znode : znodes) {
if (znode.startsWith(aliveDir) || znode.length() < basePath.length())
continue;
znode = znode.substring(basePath.length());
if (!znode.contains("/")) // Ignore roots.
continue;
// TODO ZK: https://issues.apache.org/jira/browse/IGNITE-8193
if (znode.startsWith("jd/"))
continue;
log.info("Found unexpected znode: " + znode);
return false;
}
return true;
}
catch (Exception e) {
error("Unexpected error: " + e, e);
fail("Unexpected error: " + e);
}
return false;
}
}, 10_000));
}
finally {
zkClient.close();
}
}
/**
* @throws Exception If failed.
*/
@Ignore("https://issues.apache.org/jira/browse/IGNITE-9138")
@Test
public void testRandomTopologyChanges_RestartZk() throws Exception {
randomTopologyChanges(true, false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testRandomTopologyChanges_CloseClients() throws Exception {
randomTopologyChanges(false, true);
}
/**
* @throws Exception If failed.
*/
@Test
public void testDeployService1() throws Exception {
startGridsMultiThreaded(3);
grid(0).services(grid(0).cluster()).deployNodeSingleton("test", new GridCacheAbstractFullApiSelfTest.DummyServiceImpl());
}
/**
* @throws Exception If failed.
*/
@Test
public void testDeployService2() throws Exception {
startGrid(0);
startClientGrid(1);
grid(0).services(grid(0).cluster()).deployNodeSingleton("test", new GridCacheAbstractFullApiSelfTest.DummyServiceImpl());
}
/**
* @throws Exception If failed.
*/
@Test
public void testDeployService3() throws Exception {
IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable<Object>() {
@Override public Object call() throws Exception {
startClientGrid(0);
return null;
}
}, "start-node");
startGrid(1);
fut.get();
grid(0).services(grid(0).cluster()).deployNodeSingleton("test", new GridCacheAbstractFullApiSelfTest.DummyServiceImpl());
}
/**
* Test with large user attribute on coordinator node.
*
* @throws Exception If failed.
*/
@Test
public void testLargeUserAttribute1() throws Exception {
initLargeAttribute();
startGrid(0);
checkZkNodesCleanup();
userAttrs = null;
startGrid(1);
helper.waitForEventsAcks(ignite(0));
waitForTopology(2);
}
/**
* Test with large user attribute on non-coordinator node.
*
* @throws Exception If failed.
*/
@Test
public void testLargeUserAttribute2() throws Exception {
startGrid(0);
initLargeAttribute();
startGrid(1);
helper.waitForEventsAcks(ignite(0));
checkZkNodesCleanup();
}
/**
* Test with large user attributes on random nodes.
* Also tests that big messages (more than 1MB) properly separated and processed by zk.
*
* @throws Exception If failed.
*/
@Test
public void testLargeUserAttribute3() throws Exception {
Set<Integer> idxs = ThreadLocalRandom.current()
.ints(0, 10)
.distinct()
.limit(3)
.boxed()
.collect(Collectors.toSet());
for (int i = 0; i < 10; i++) {
info("Iteration: " + i);
if (idxs.contains(i))
initLargeAttribute();
else
userAttrs = null;
if (i > 5)
startClientGrid(i);
else
startGrid(i);
}
waitForTopology(10);
}
/**
*
*/
private void initLargeAttribute() {
userAttrs = new HashMap<>();
int[] attr = new int[1024 * 1024 + ThreadLocalRandom.current().nextInt(1024 * 512)];
for (int i = 0; i < attr.length; i++)
attr[i] = i;
userAttrs.put("testAttr", attr);
}
/**
* @throws Exception If failed.
*/
@Test
public void testLargeCustomEvent() throws Exception {
Ignite srv0 = startGrid(0);
// Send large message, single node in topology.
IgniteCache<Object, Object> cache = srv0.createCache(largeCacheConfiguration("c1"));
for (int i = 0; i < 100; i++)
cache.put(i, i);
assertEquals(1, cache.get(1));
helper.waitForEventsAcks(ignite(0));
startGridsMultiThreaded(1, 3);
srv0.destroyCache("c1");
// Send large message, multiple nodes in topology.
cache = srv0.createCache(largeCacheConfiguration("c1"));
for (int i = 0; i < 100; i++)
cache.put(i, i);
waitForTopology(4);
ignite(3).createCache(largeCacheConfiguration("c2"));
}
/**
* @throws Exception If failed.
*/
@Test
public void testClientReconnectSessionExpire1_1() throws Exception {
clientReconnectSessionExpire(false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testClientReconnectSessionExpire1_2() throws Exception {
clientReconnectSessionExpire(true);
}
/**
* @param closeSock Test mode flag.
* @throws Exception If failed.
*/
private void clientReconnectSessionExpire(boolean closeSock) throws Exception {
startGrid(0);
sesTimeout = 2000;
testSockNio = true;
Ignite client = startClientGrid(1);
client.cache(DEFAULT_CACHE_NAME).put(1, 1);
reconnectClientNodes(log, Collections.singletonList(client), closeSock);
assertEquals(1, client.cache(DEFAULT_CACHE_NAME).get(1));
client.compute().broadcast(new ZookeeperDiscoverySpiTestHelper.DummyCallable(null));
}
/**
* @throws Exception If failed.
*/
@Test
public void testForceClientReconnect() throws Exception {
final int SRVS = 3;
startGrids(SRVS);
startClientGrid(SRVS);
reconnectClientNodes(Collections.singletonList(ignite(SRVS)), new Callable<Void>() {
@Override public Void call() throws Exception {
ZookeeperDiscoverySpi spi = helper.waitSpi(getTestIgniteInstanceName(SRVS), spis);
spi.clientReconnect();
return null;
}
});
waitForTopology(SRVS + 1);
}
/**
* @throws Exception If failed.
*/
@Test
public void testForcibleClientFail() throws Exception {
final int SRVS = 3;
startGrids(SRVS);
startClientGrid(SRVS);
reconnectClientNodes(Collections.singletonList(ignite(SRVS)), new Callable<Void>() {
@Override public Void call() throws Exception {
ZookeeperDiscoverySpi spi = helper.waitSpi(getTestIgniteInstanceName(0), spis);
spi.failNode(ignite(SRVS).cluster().localNode().id(), "Test forcible node fail");
return null;
}
});
waitForTopology(SRVS + 1);
}
/**
* @throws Exception If failed.
*/
@Test
public void testDuplicatedNodeId() throws Exception {
indexingDisabled = true;
UUID nodeId0 = nodeId = UUID.randomUUID();
startGrid(0);
int failingNodeIdx = 100;
for (int i = 0; i < 2; i++) {
final int idx = failingNodeIdx++;
nodeId = nodeId0;
info("Start node with duplicated ID [iter=" + i + ", nodeId=" + nodeId + ']');
GridTestUtils.assertThrowsAnyCause(log,
() -> startGrid(idx), IgniteSpiException.class, "Node with the same ID already exists");
nodeId = null;
info("Start node with unique ID [iter=" + i + ']');
Ignite ignite = startGrid(idx);
nodeId0 = ignite.cluster().localNode().id();
waitForTopology(i + 2);
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testPing() throws Exception {
sesTimeout = 5000;
startGrids(3);
final ZookeeperDiscoverySpi spi = helper.waitSpi(getTestIgniteInstanceName(1), spis);
final UUID nodeId = ignite(2).cluster().localNode().id();
IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
@Override public void run() {
assertTrue(spi.pingNode(nodeId));
}
}, 32, "ping");
fut.get();
fut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
@Override public void run() {
spi.pingNode(nodeId);
}
}, 32, "ping");
U.sleep(100);
stopGrid(2);
fut.get();
fut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
@Override public void run() {
assertFalse(spi.pingNode(nodeId));
}
}, 32, "ping");
fut.get();
}
/**
* @throws Exception If failed.
*/
@Test
public void testWithPersistence1() throws Exception {
startWithPersistence(false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testWithPersistence2() throws Exception {
startWithPersistence(true);
}
/**
* Reconnect client node.
*
* @param log Logger.
* @param clients Clients.
* @param closeSock {@code True} to simulate reconnect by closing zk client's socket.
* @throws Exception If failed.
*/
private static void reconnectClientNodes(final IgniteLogger log,
List<Ignite> clients,
boolean closeSock)
throws Exception {
final CountDownLatch disconnectLatch = new CountDownLatch(clients.size());
final CountDownLatch reconnectLatch = new CountDownLatch(clients.size());
IgnitePredicate<Event> p = new IgnitePredicate<Event>() {
@Override public boolean apply(Event evt) {
if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
log.info("Disconnected: " + evt);
disconnectLatch.countDown();
}
else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
log.info("Reconnected: " + evt);
reconnectLatch.countDown();
}
return true;
}
};
List<String> zkNodes = new ArrayList<>();
for (Ignite client : clients) {
client.events().localListen(p, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
zkNodes.add(ZookeeperDiscoverySpiTestHelper.aliveZkNodePath(client));
}
long timeout = 15_000;
if (closeSock) {
for (Ignite client : clients) {
ZookeeperDiscoverySpi spi = (ZookeeperDiscoverySpi)client.configuration().getDiscoverySpi();
ZkTestClientCnxnSocketNIO.forNode(client.name()).closeSocket(true);
timeout = Math.max(timeout, (long)(spi.getSessionTimeout() * 1.5f));
}
}
else {
/*
* Use hack to simulate session expire without waiting session timeout:
* create and close ZooKeeper with the same session ID as ignite node's ZooKeeper.
*/
List<ZooKeeper> dummyClients = new ArrayList<>();
for (Ignite client : clients) {
ZookeeperDiscoverySpi spi = (ZookeeperDiscoverySpi)client.configuration().getDiscoverySpi();
ZooKeeper zk = ZookeeperDiscoverySpiTestHelper.zkClient(spi);
for (String s : spi.getZkConnectionString().split(",")) {
try {
ZooKeeper dummyZk = new ZooKeeper(
s,
10_000,
null,
zk.getSessionId(),
zk.getSessionPasswd());
dummyZk.exists("/a", false);
dummyClients.add(dummyZk);
break;
}
catch (Exception e) {
log.warning("Can't connect to server " + s + " [err=" + e + ']');
}
}
}
for (ZooKeeper zk : dummyClients)
zk.close();
}
ZookeeperDiscoverySpiTestHelper.waitNoAliveZkNodes(log,
((ZookeeperDiscoverySpi)clients.get(0).configuration().getDiscoverySpi()).getZkConnectionString(),
zkNodes,
timeout);
if (closeSock) {
for (Ignite client : clients)
ZkTestClientCnxnSocketNIO.forNode(client.name()).allowConnect();
}
ZookeeperDiscoverySpiTestHelper.waitReconnectEvent(log, disconnectLatch);
ZookeeperDiscoverySpiTestHelper.waitReconnectEvent(log, reconnectLatch);
for (Ignite client : clients)
client.events().stopLocalListen(p);
}
/**
* @param zk ZooKeeper client.
* @param root Root path.
* @return All children znodes for given path.
* @throws Exception If failed/
*/
private List<String> listSubTree(ZooKeeper zk, String root) throws Exception {
for (int i = 0; i < 30; i++) {
try {
return ZKUtil.listSubTreeBFS(zk, root);
}
catch (KeeperException.NoNodeException e) {
info("NoNodeException when get znodes, will retry: " + e);
}
}
throw new Exception("Failed to get znodes: " + root);
}
/**
* @param cacheName Cache name.
* @return Configuration.
*/
private CacheConfiguration<Object, Object> largeCacheConfiguration(String cacheName) {
CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(cacheName);
ccfg.setAffinity(new TestAffinityFunction(1024 * 1024));
ccfg.setWriteSynchronizationMode(FULL_SYNC);
return ccfg;
}
/**
* @param clients Clients.
* @param c Closure to run.
* @throws Exception If failed.
*/
private void reconnectClientNodes(List<Ignite> clients, Callable<Void> c)
throws Exception {
final CountDownLatch disconnectLatch = new CountDownLatch(clients.size());
final CountDownLatch reconnectLatch = new CountDownLatch(clients.size());
IgnitePredicate<Event> p = new IgnitePredicate<Event>() {
@Override public boolean apply(Event evt) {
if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
log.info("Disconnected: " + evt);
disconnectLatch.countDown();
}
else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
log.info("Reconnected: " + evt);
reconnectLatch.countDown();
}
return true;
}
};
for (Ignite client : clients)
client.events().localListen(p, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
c.call();
ZookeeperDiscoverySpiTestHelper.waitReconnectEvent(log, disconnectLatch);
ZookeeperDiscoverySpiTestHelper.waitReconnectEvent(log, reconnectLatch);
for (Ignite client : clients)
client.events().stopLocalListen(p);
}
/**
* @param restartZk If {@code true} in background restarts on of ZK servers.
* @param closeClientSock If {@code true} in background closes zk clients' sockets.
* @throws Exception If failed.
*/
private void randomTopologyChanges(boolean restartZk, boolean closeClientSock) throws Exception {
sesTimeout = 30_000;
if (closeClientSock)
testSockNio = true;
List<Integer> startedNodes = new ArrayList<>();
List<String> startedCaches = new ArrayList<>();
int nextNodeIdx = 0;
int nextCacheIdx = 0;
long stopTime = System.currentTimeMillis() + GridTestUtils.SF.applyLB(30_000, 5_000);
int MAX_NODES = 20;
int MAX_CACHES = 10;
AtomicBoolean stop = new AtomicBoolean();
IgniteInternalFuture<?> fut1 = restartZk ? startRestartZkServers(stopTime, stop) : null;
IgniteInternalFuture<?> fut2 = closeClientSock ? startCloseZkClientSocket(stopTime, stop) : null;
try {
ThreadLocalRandom rnd = ThreadLocalRandom.current();
while (System.currentTimeMillis() < stopTime) {
if (!startedNodes.isEmpty() && rnd.nextInt(10) == 0) {
boolean startCache = startedCaches.size() < 2 ||
(startedCaches.size() < MAX_CACHES && rnd.nextInt(5) != 0);
int nodeIdx = startedNodes.get(rnd.nextInt(startedNodes.size()));
if (startCache) {
String cacheName = "cache-" + nextCacheIdx++;
log.info("Next, start new cache [cacheName=" + cacheName +
", node=" + nodeIdx +
", crd=" + (startedNodes.isEmpty() ? null : Collections.min(startedNodes)) +
", curCaches=" + startedCaches.size() + ']');
ignite(nodeIdx).createCache(new CacheConfiguration<>(cacheName));
startedCaches.add(cacheName);
}
else {
if (startedCaches.size() > 1) {
String cacheName = startedCaches.get(rnd.nextInt(startedCaches.size()));
log.info("Next, stop cache [nodeIdx=" + nodeIdx +
", node=" + nodeIdx +
", crd=" + (startedNodes.isEmpty() ? null : Collections.min(startedNodes)) +
", cacheName=" + startedCaches.size() + ']');
ignite(nodeIdx).destroyCache(cacheName);
assertTrue(startedCaches.remove(cacheName));
}
}
}
else {
boolean startNode = startedNodes.size() < 2 ||
(startedNodes.size() < MAX_NODES && rnd.nextInt(5) != 0);
if (startNode) {
int nodeIdx = nextNodeIdx++;
log.info("Next, start new node [nodeIdx=" + nodeIdx +
", crd=" + (startedNodes.isEmpty() ? null : Collections.min(startedNodes)) +
", curNodes=" + startedNodes.size() + ']');
startGrid(nodeIdx);
assertTrue(startedNodes.add(nodeIdx));
}
else {
if (startedNodes.size() > 1) {
int nodeIdx = startedNodes.get(rnd.nextInt(startedNodes.size()));
log.info("Next, stop [nodeIdx=" + nodeIdx +
", crd=" + (startedNodes.isEmpty() ? null : Collections.min(startedNodes)) +
", curNodes=" + startedNodes.size() + ']');
stopGrid(nodeIdx);
assertTrue(startedNodes.remove((Integer)nodeIdx));
}
}
}
U.sleep(rnd.nextInt(100) + 1);
}
}
finally {
stop.set(true);
}
if (fut1 != null)
fut1.get();
if (fut2 != null)
fut2.get();
}
/**
* @param stopTime Stop time.
* @param stop Stop flag.
* @return Future.
*/
private IgniteInternalFuture<?> startRestartZkServers(final long stopTime, final AtomicBoolean stop) {
return GridTestUtils.runAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
ThreadLocalRandom rnd = ThreadLocalRandom.current();
while (!stop.get() && System.currentTimeMillis() < stopTime) {
U.sleep(rnd.nextLong(2500));
int idx = rnd.nextInt(ZK_SRVS);
log.info("Restart ZK server: " + idx);
zkCluster.getServers().get(idx).restart();
waitForZkClusterReady(zkCluster);
}
return null;
}
}, "zk-restart-thread");
}
/**
* @param stopTime Stop time.
* @param stop Stop flag.
* @return Future.
*/
private IgniteInternalFuture<?> startCloseZkClientSocket(final long stopTime, final AtomicBoolean stop) {
assert testSockNio;
return GridTestUtils.runAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
ThreadLocalRandom rnd = ThreadLocalRandom.current();
while (!stop.get() && System.currentTimeMillis() < stopTime) {
U.sleep(rnd.nextLong(100) + 50);
List<Ignite> nodes = G.allGrids();
if (!nodes.isEmpty()) {
Ignite node = nodes.get(rnd.nextInt(nodes.size()));
ZkTestClientCnxnSocketNIO nio = ZkTestClientCnxnSocketNIO.forNode(node);
if (nio != null) {
info("Close zk client socket for node: " + node.name());
try {
nio.closeSocket(false);
}
catch (Exception e) {
info("Failed to close zk client socket for node: " + node.name());
}
}
}
}
return null;
}
}, "zk-restart-thread");
}
/**
* @param dfltConsistenId Default consistent ID flag.
* @throws Exception If failed.
*/
private void startWithPersistence(boolean dfltConsistenId) throws Exception {
this.dfltConsistenId = dfltConsistenId;
persistence = true;
for (int i = 0; i < 3; i++) {
info("Iteration: " + i);
startGridsMultiThreaded(4, i == 0);
startClientGridsMultiThreaded(4, 3);
waitForTopology(7);
stopGrid(1);
waitForTopology(6);
stopGrid(4);
waitForTopology(5);
stopGrid(0);
waitForTopology(4);
checkEventsConsistency();
stopAllGrids();
evts.clear();
}
}
/** */
@SuppressWarnings("MismatchedReadAndWriteOfArray")
private static class TestAffinityFunction extends RendezvousAffinityFunction {
/** */
private static final long serialVersionUID = 0L;
/** */
private int[] dummyData;
/**
* @param dataSize Dummy data size.
*/
TestAffinityFunction(int dataSize) {
dummyData = new int[dataSize];
for (int i = 0; i < dataSize; i++)
dummyData[i] = i;
}
}
}