blob: 09a9433c82bb64987ed378b0506d2469cd8b681d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.spi.discovery.zk.internal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.CommunicationFailureContext;
import org.apache.ignite.configuration.CommunicationFailureResolver;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.cache.distributed.TestCacheNodeExcludingFilter;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.T3;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteOutClosure;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.discovery.zk.ZookeeperDiscoverySpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.zookeeper.ZkTestClientCnxnSocketNIO;
import org.junit.Ignore;
import org.junit.Test;
import static java.util.concurrent.TimeUnit.SECONDS;
/**
* Tests for Zookeeper SPI discovery.
*/
public class ZookeeperDiscoveryCommunicationFailureTest extends ZookeeperDiscoverySpiTestBase {
/**
* @throws Exception If failed.
*/
@Test
public void testNoOpCommunicationFailureResolve_1() throws Exception {
communicationFailureResolve_Simple(2);
}
/**
* @throws Exception If failed.
*/
@Test
public void testNoOpCommunicationErrorResolve_2() throws Exception {
communicationFailureResolve_Simple(10);
}
/**
* @param nodes Nodes number.
* @throws Exception If failed.
*/
private void communicationFailureResolve_Simple(int nodes) throws Exception {
assert nodes > 1;
sesTimeout = 2000;
commFailureRslvr = NoOpCommunicationFailureResolver.FACTORY;
startGridsMultiThreaded(nodes);
ThreadLocalRandom rnd = ThreadLocalRandom.current();
for (int i = 0; i < 3; i++) {
info("Iteration: " + i);
int idx1 = rnd.nextInt(nodes);
int idx2;
do {
idx2 = rnd.nextInt(nodes);
}
while (idx1 == idx2);
ZookeeperDiscoverySpi spi = spi(ignite(idx1));
spi.resolveCommunicationFailure(ignite(idx2).cluster().localNode(), new Exception("test"));
checkInternalStructuresCleanup();
}
}
/**
* Tests case when one node fails before sending communication status.
*
* @throws Exception If failed.
*/
@Test
public void testNoOpCommunicationErrorResolve_3() throws Exception {
sesTimeout = 2000;
commFailureRslvr = NoOpCommunicationFailureResolver.FACTORY;
startGridsMultiThreaded(3);
sesTimeout = 10_000;
testSockNio = true;
sesTimeout = 5000;
startGrid(3);
IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Object>() {
@Override public Object call() {
ZookeeperDiscoverySpi spi = spi(ignite(0));
spi.resolveCommunicationFailure(ignite(1).cluster().localNode(), new Exception("test"));
return null;
}
});
U.sleep(1000);
ZkTestClientCnxnSocketNIO nio = ZkTestClientCnxnSocketNIO.forNode(ignite(3));
nio.closeSocket(true);
try {
stopGrid(3);
fut.get();
}
finally {
nio.allowConnect();
}
waitForTopology(3);
}
/**
* Tests case when Coordinator fails while resolve process is in progress.
*
* @throws Exception If failed.
*/
@Test
public void testNoOpCommunicationErrorResolve_4() throws Exception {
testCommSpi = true;
sesTimeout = 2000;
commFailureRslvr = NoOpCommunicationFailureResolver.FACTORY;
startGrid(0);
startGridsMultiThreaded(1, 3);
ZkTestCommunicationSpi commSpi = ZkTestCommunicationSpi.testSpi(ignite(3));
commSpi.pingLatch = new CountDownLatch(1);
IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Object>() {
@Override public Object call() {
ZookeeperDiscoverySpi spi = spi(ignite(1));
spi.resolveCommunicationFailure(ignite(2).cluster().localNode(), new Exception("test"));
return null;
}
});
U.sleep(1000);
assertFalse(fut.isDone());
stopGrid(0);
commSpi.pingLatch.countDown();
fut.get();
waitForTopology(3);
}
/**
* Tests that nodes join is delayed while resolve is in progress.
*
* @throws Exception If failed.
*/
@Test
public void testNoOpCommunicationErrorResolve_5() throws Exception {
testCommSpi = true;
sesTimeout = 2000;
commFailureRslvr = NoOpCommunicationFailureResolver.FACTORY;
startGrid(0);
startGridsMultiThreaded(1, 3);
ZkTestCommunicationSpi commSpi = ZkTestCommunicationSpi.testSpi(ignite(3));
commSpi.pingStartLatch = new CountDownLatch(1);
commSpi.pingLatch = new CountDownLatch(1);
IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Object>() {
@Override public Object call() {
ZookeeperDiscoverySpi spi = spi(ignite(1));
spi.resolveCommunicationFailure(ignite(2).cluster().localNode(), new Exception("test"));
return null;
}
});
assertTrue(commSpi.pingStartLatch.await(10, SECONDS));
try {
assertFalse(fut.isDone());
final AtomicInteger nodeIdx = new AtomicInteger(3);
IgniteInternalFuture<?> startFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
startGrid(nodeIdx.incrementAndGet());
return null;
}
}, 3, "start-node");
U.sleep(1000);
assertFalse(startFut.isDone());
assertEquals(4, ignite(0).cluster().nodes().size());
commSpi.pingLatch.countDown();
startFut.get();
fut.get();
waitForTopology(7);
}
finally {
commSpi.pingLatch.countDown();
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testCommunicationErrorResolve_KillNode_1() throws Exception {
communicationFailureResolve_KillNodes(2, Collections.singleton(2L));
}
/**
* @throws Exception If failed.
*/
@Test
public void testCommunicationErrorResolve_KillNode_2() throws Exception {
communicationFailureResolve_KillNodes(3, Collections.singleton(2L));
}
/**
* @throws Exception If failed.
*/
@Test
public void testCommunicationErrorResolve_KillNode_3() throws Exception {
communicationFailureResolve_KillNodes(10, Arrays.asList(2L, 4L, 6L));
}
/**
* @throws Exception If failed.
*/
@Test
public void testCommunicationErrorResolve_KillCoordinator_1() throws Exception {
communicationFailureResolve_KillNodes(2, Collections.singleton(1L));
}
/**
* @throws Exception If failed.
*/
@Test
public void testCommunicationErrorResolve_KillCoordinator_2() throws Exception {
communicationFailureResolve_KillNodes(3, Collections.singleton(1L));
}
/**
* @throws Exception If failed.
*/
@Test
public void testCommunicationErrorResolve_KillCoordinator_3() throws Exception {
communicationFailureResolve_KillNodes(10, Arrays.asList(1L, 4L, 6L));
}
/**
* @throws Exception If failed.
*/
@Test
public void testCommunicationErrorResolve_KillCoordinator_4() throws Exception {
communicationFailureResolve_KillNodes(10, Arrays.asList(1L, 2L, 3L));
}
/**
* @param startNodes Number of nodes to start.
* @param killNodes Nodes to kill by resolve process.
* @throws Exception If failed.
*/
private void communicationFailureResolve_KillNodes(int startNodes, Collection<Long> killNodes) throws Exception {
testCommSpi = true;
commFailureRslvr = TestNodeKillCommunicationFailureResolver.factory(killNodes);
startGrids(startNodes);
ZkTestCommunicationSpi commSpi = ZkTestCommunicationSpi.testSpi(ignite(0));
commSpi.checkRes = new BitSet(startNodes);
ZookeeperDiscoverySpi spi = null;
UUID killNodeId = null;
for (Ignite node : G.allGrids()) {
ZookeeperDiscoverySpi spi0 = spi(node);
if (!killNodes.contains(node.cluster().localNode().order()))
spi = spi0;
else
killNodeId = node.cluster().localNode().id();
}
assertNotNull(spi);
assertNotNull(killNodeId);
try {
spi.resolveCommunicationFailure(spi.getNode(killNodeId), new Exception("test"));
fail("Exception is not thrown");
}
catch (IgniteSpiException e) {
assertTrue("Unexpected exception: " + e, e.getCause() instanceof ClusterTopologyCheckedException);
}
int expNodes = startNodes - killNodes.size();
waitForTopology(expNodes);
for (Ignite node : G.allGrids())
assertFalse(killNodes.contains(node.cluster().localNode().order()));
startGrid(startNodes);
waitForTopology(expNodes + 1);
}
/**
* @throws Exception If failed.
*/
@Test
public void testCommunicationFailureResolve_KillCoordinator_5() throws Exception {
sesTimeout = 2000;
testCommSpi = true;
commFailureRslvr = KillCoordinatorCommunicationFailureResolver.FACTORY;
startGrids(10);
int crd = 0;
int nodeIdx = 10;
for (int i = 0; i < GridTestUtils.SF.applyLB(4, 2); i++) {
info("Iteration: " + i);
for (Ignite node : G.allGrids())
ZkTestCommunicationSpi.testSpi(node).initCheckResult(10);
UUID crdId = ignite(crd).cluster().localNode().id();
ZookeeperDiscoverySpi spi = spi(ignite(crd + 1));
try {
spi.resolveCommunicationFailure(spi.getNode(crdId), new Exception("test"));
fail("Exception is not thrown");
}
catch (IgniteSpiException e) {
assertTrue("Unexpected exception: " + e, e.getCause() instanceof ClusterTopologyCheckedException);
}
waitForTopology(9);
startGrid(nodeIdx++);
waitForTopology(10);
crd++;
}
}
/**
* @throws Exception If failed.
*/
@Ignore("https://issues.apache.org/jira/browse/IGNITE-10988")
@Test
public void testCommunicationFailureResolve_KillRandom() throws Exception {
sesTimeout = 2000;
testCommSpi = true;
commFailureRslvr = KillRandomCommunicationFailureResolver.FACTORY;
startGridsMultiThreaded(10);
startClientGridsMultiThreaded(10, 5);
int nodesCnt = 15;
waitForTopology(nodesCnt);
int nodeIdx = 15;
for (int i = 0; i < GridTestUtils.SF.applyLB(10, 2); i++) {
info("Iteration: " + i);
ZookeeperDiscoverySpi spi = null;
for (Ignite node : G.allGrids()) {
ZkTestCommunicationSpi.testSpi(node).initCheckResult(100);
spi = spi(node);
}
assert spi != null;
try {
spi.resolveCommunicationFailure(spi.getRemoteNodes().iterator().next(), new Exception("test"));
}
catch (IgniteSpiException ignore) {
// No-op.
}
if (ThreadLocalRandom.current().nextBoolean())
startClientGrid(nodeIdx++);
else
startGrid(nodeIdx++);
nodesCnt = nodesCnt - KillRandomCommunicationFailureResolver.LAST_KILLED_NODES.size() + 1;
waitForTopology(nodesCnt);
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testDefaultCommunicationFailureResolver1() throws Exception {
testCommSpi = true;
sesTimeout = 5000;
startGrids(3);
ZkTestCommunicationSpi.testSpi(ignite(0)).initCheckResult(3, 0, 1);
ZkTestCommunicationSpi.testSpi(ignite(1)).initCheckResult(3, 0, 1);
ZkTestCommunicationSpi.testSpi(ignite(2)).initCheckResult(3, 2);
UUID killedId = nodeId(2);
assertNotNull(ignite(0).cluster().node(killedId));
ZookeeperDiscoverySpi spi = spi(ignite(0));
spi.resolveCommunicationFailure(spi.getNode(ignite(1).cluster().localNode().id()), new Exception("test"));
waitForTopology(2);
assertNull(ignite(0).cluster().node(killedId));
}
/**
* @throws Exception If failed.
*/
@Test
public void testDefaultCommunicationFailureResolver2() throws Exception {
testCommSpi = true;
sesTimeout = 5000;
startGrids(3);
startClientGridsMultiThreaded(3, 2);
ZkTestCommunicationSpi.testSpi(ignite(0)).initCheckResult(5, 0, 1);
ZkTestCommunicationSpi.testSpi(ignite(1)).initCheckResult(5, 0, 1);
ZkTestCommunicationSpi.testSpi(ignite(2)).initCheckResult(5, 2, 3, 4);
ZkTestCommunicationSpi.testSpi(ignite(3)).initCheckResult(5, 2, 3, 4);
ZkTestCommunicationSpi.testSpi(ignite(4)).initCheckResult(5, 2, 3, 4);
ZookeeperDiscoverySpi spi = spi(ignite(0));
spi.resolveCommunicationFailure(spi.getNode(ignite(1).cluster().localNode().id()), new Exception("test"));
waitForTopology(2);
}
/**
* @throws Exception If failed.
*/
@Test
public void testDefaultCommunicationFailureResolver3() throws Exception {
defaultCommunicationFailureResolver_BreakCommunication(3, 1);
}
/**
* @throws Exception If failed.
*/
@Test
public void testDefaultCommunicationFailureResolver4() throws Exception {
defaultCommunicationFailureResolver_BreakCommunication(3, 0);
}
/**
* @throws Exception If failed.
*/
@Test
public void testDefaultCommunicationFailureResolver5() throws Exception {
defaultCommunicationFailureResolver_BreakCommunication(10, 1, 3, 6);
}
/**
* @param startNodes Initial nodes number.
* @param breakNodes Node indices where communication server is closed.
* @throws Exception If failed.
*/
private void defaultCommunicationFailureResolver_BreakCommunication(int startNodes, final int...breakNodes) throws Exception {
sesTimeout = 5000;
startGridsMultiThreaded(startNodes);
final CyclicBarrier b = new CyclicBarrier(breakNodes.length);
GridTestUtils.runMultiThreaded(new IgniteInClosure<Integer>() {
@Override public void apply(Integer threadIdx) {
try {
b.await();
int nodeIdx = breakNodes[threadIdx];
info("Close communication: " + nodeIdx);
((TcpCommunicationSpi)ignite(nodeIdx).configuration().getCommunicationSpi()).simulateNodeFailure();
}
catch (Exception e) {
fail("Unexpected error: " + e);
}
}
}, breakNodes.length, "break-communication");
waitForTopology(startNodes - breakNodes.length);
}
/**
* @throws Exception If failed.
*/
@Test
public void testCommunicationFailureResolve_CachesInfo1() throws Exception {
testCommSpi = true;
sesTimeout = 5000;
final CacheInfoCommunicationFailureResolver rslvr = new CacheInfoCommunicationFailureResolver();
commFailureRslvr = new IgniteOutClosure<CommunicationFailureResolver>() {
@Override public CommunicationFailureResolver apply() {
return rslvr;
}
};
startGrids(2);
awaitPartitionMapExchange();
Map<String, T3<Integer, Integer, Integer>> expCaches = new HashMap<>();
expCaches.put(DEFAULT_CACHE_NAME, new T3<>(RendezvousAffinityFunction.DFLT_PARTITION_COUNT, 0, 1));
checkResolverCachesInfo(ignite(0), expCaches);
List<CacheConfiguration> caches = new ArrayList<>();
CacheConfiguration c1 = new CacheConfiguration("c1");
c1.setBackups(1);
c1.setAffinity(new RendezvousAffinityFunction(false, 64));
caches.add(c1);
CacheConfiguration c2 = new CacheConfiguration("c2");
c2.setBackups(2);
c2.setAffinity(new RendezvousAffinityFunction(false, 128));
caches.add(c2);
CacheConfiguration c3 = new CacheConfiguration("c3");
c3.setCacheMode(CacheMode.REPLICATED);
c3.setAffinity(new RendezvousAffinityFunction(false, 256));
caches.add(c3);
ignite(0).createCaches(caches);
expCaches.put("c1", new T3<>(64, 1, 2));
expCaches.put("c2", new T3<>(128, 2, 2));
expCaches.put("c3", new T3<>(256, 1, 2));
checkResolverCachesInfo(ignite(0), expCaches);
startGrid(2);
startGrid(3);
awaitPartitionMapExchange();
expCaches.put("c2", new T3<>(128, 2, 3));
expCaches.put("c3", new T3<>(256, 1, 4));
checkResolverCachesInfo(ignite(0), expCaches);
CacheConfiguration<Object, Object> c4 = new CacheConfiguration<>("c4");
c4.setCacheMode(CacheMode.PARTITIONED);
c4.setBackups(0);
c4.setAffinity(new RendezvousAffinityFunction(false, 256));
c4.setNodeFilter(new TestCacheNodeExcludingFilter(getTestIgniteInstanceName(0), getTestIgniteInstanceName(1)));
ignite(2).createCache(c4);
expCaches.put("c4", new T3<>(256, 0, 1));
checkResolverCachesInfo(ignite(0), expCaches);
stopGrid(0); // Stop current coordinator, check new coordinator will initialize required caches information.
awaitPartitionMapExchange();
expCaches.put("c3", new T3<>(256, 1, 3));
checkResolverCachesInfo(ignite(1), expCaches);
startGrid(0);
expCaches.put("c3", new T3<>(256, 1, 4));
checkResolverCachesInfo(ignite(1), expCaches);
stopGrid(1);
expCaches.put("c3", new T3<>(256, 1, 3));
checkResolverCachesInfo(ignite(3), expCaches);
}
/**
* @throws Exception If failed.
*/
@Test
public void testCommunicationFailureResolve_CachesInfo2() throws Exception {
testCommSpi = true;
sesTimeout = 5000;
final CacheInfoCommunicationFailureResolver rslvr = new CacheInfoCommunicationFailureResolver();
commFailureRslvr = new IgniteOutClosure<CommunicationFailureResolver>() {
@Override public CommunicationFailureResolver apply() {
return rslvr;
}
};
Ignite srv0 = startGrid(0);
CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>("c1");
ccfg.setBackups(1);
srv0.createCache(ccfg);
// Block rebalance to make sure node0 will be the only owner.
TestRecordingCommunicationSpi.spi(srv0).blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
@Override public boolean apply(ClusterNode node, Message msg) {
return msg instanceof GridDhtPartitionSupplyMessage &&
((GridDhtPartitionSupplyMessage) msg).groupId() == CU.cacheId("c1");
}
});
startGrid(1);
U.sleep(1000);
ZookeeperDiscoverySpi spi = spi(srv0);
rslvr.latch = new CountDownLatch(1);
ZkTestCommunicationSpi.testSpi(srv0).initCheckResult(2, 0);
spi.resolveCommunicationFailure(spi.getRemoteNodes().iterator().next(), new Exception("test"));
assertTrue(rslvr.latch.await(10, SECONDS));
List<List<ClusterNode>> cacheOwners = rslvr.ownersMap.get("c1");
ClusterNode node0 = srv0.cluster().localNode();
for (int p = 0; p < RendezvousAffinityFunction.DFLT_PARTITION_COUNT; p++) {
List<ClusterNode> owners = cacheOwners.get(p);
assertEquals(1, owners.size());
assertEquals(node0, owners.get(0));
}
TestRecordingCommunicationSpi.spi(srv0).stopBlock();
awaitPartitionMapExchange();
Map<String, T3<Integer, Integer, Integer>> expCaches = new HashMap<>();
expCaches.put(DEFAULT_CACHE_NAME, new T3<>(RendezvousAffinityFunction.DFLT_PARTITION_COUNT, 0, 1));
expCaches.put("c1", new T3<>(RendezvousAffinityFunction.DFLT_PARTITION_COUNT, 1, 2));
checkResolverCachesInfo(srv0, expCaches);
}
/**
* @param crd Coordinator node.
* @param expCaches Expected caches info.
* @throws Exception If failed.
*/
private void checkResolverCachesInfo(Ignite crd, Map<String, T3<Integer, Integer, Integer>> expCaches)
throws Exception
{
CacheInfoCommunicationFailureResolver rslvr =
(CacheInfoCommunicationFailureResolver)crd.configuration().getCommunicationFailureResolver();
assertNotNull(rslvr);
ZookeeperDiscoverySpi spi = spi(crd);
rslvr.latch = new CountDownLatch(1);
ZkTestCommunicationSpi.testSpi(crd).initCheckResult(crd.cluster().nodes().size(), 0);
spi.resolveCommunicationFailure(spi.getRemoteNodes().iterator().next(), new Exception("test"));
assertTrue(rslvr.latch.await(10, SECONDS));
rslvr.checkCachesInfo(expCaches);
rslvr.reset();
}
/**
* @throws Exception If failed.
*/
@SuppressWarnings("unchecked")
@Test
public void testCommunicationFailureResolve_ConcurrentDiscoveyEvents() throws Exception {
sesTimeout = 5000;
commFailureRslvr = NoOpCommunicationFailureResolver.FACTORY;
final int INIT_NODES = 5;
startGridsMultiThreaded(INIT_NODES);
final CyclicBarrier b = new CyclicBarrier(4);
GridCompoundFuture<?, ?> fut = new GridCompoundFuture<>();
final AtomicBoolean stop = new AtomicBoolean();
fut.add((IgniteInternalFuture)GridTestUtils.runAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
b.await();
ThreadLocalRandom rnd = ThreadLocalRandom.current();
for (int i = 0; i < 10; i++) {
startGrid(i + INIT_NODES);
//noinspection BusyWait
Thread.sleep(rnd.nextLong(1000) + 10);
if (stop.get())
break;
}
return null;
}
}, "test-node-start"));
fut.add((IgniteInternalFuture)GridTestUtils.runAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
b.await();
ThreadLocalRandom rnd = ThreadLocalRandom.current();
while (!stop.get()) {
startGrid(100);
//noinspection BusyWait
Thread.sleep(rnd.nextLong(1000) + 10);
stopGrid(100);
//noinspection BusyWait
Thread.sleep(rnd.nextLong(1000) + 10);
}
return null;
}
}, "test-node-restart"));
fut.add((IgniteInternalFuture)GridTestUtils.runAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
b.await();
ThreadLocalRandom rnd = ThreadLocalRandom.current();
int idx = 0;
while (!stop.get()) {
CacheConfiguration ccfg = new CacheConfiguration("c-" + idx++);
ccfg.setBackups(rnd.nextInt(5));
ignite(rnd.nextInt(INIT_NODES)).createCache(ccfg);
//noinspection BusyWait
Thread.sleep(rnd.nextLong(1000) + 10);
ignite(rnd.nextInt(INIT_NODES)).destroyCache(ccfg.getName());
//noinspection BusyWait
Thread.sleep(rnd.nextLong(1000) + 10);
}
return null;
}
}, "test-create-cache"));
fut.add((IgniteInternalFuture)GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
try {
b.await();
ThreadLocalRandom rnd = ThreadLocalRandom.current();
for (int i = 0; i < 5; i++) {
info("resolveCommunicationFailure: " + i);
ZookeeperDiscoverySpi spi = spi(ignite(rnd.nextInt(INIT_NODES)));
spi.resolveCommunicationFailure(ignite(rnd.nextInt(INIT_NODES)).cluster().localNode(),
new Exception("test"));
}
return null;
}
finally {
stop.set(true);
}
}
}, 5, "test-resolve-failure"));
fut.markInitialized();
fut.get();
}
/**
* @throws Exception If failed.
*/
@Test
public void testCommunicationFailureResolve_ConcurrentMultinode() throws Exception {
sesTimeout = 5000;
commFailureRslvr = NoOpCommunicationFailureResolver.FACTORY;
startGridsMultiThreaded(5);
startClientGridsMultiThreaded(5, 5);
final int NODES = 10;
GridTestUtils.runMultiThreaded(new Callable<Void>() {
@Override public Void call() {
ThreadLocalRandom rnd = ThreadLocalRandom.current();
for (int i = 0; i < 5; i++) {
info("resolveCommunicationFailure: " + i);
ZookeeperDiscoverySpi spi = spi(ignite(rnd.nextInt(NODES)));
spi.resolveCommunicationFailure(spi.getRemoteNodes().iterator().next(), new Exception("test"));
}
return null;
}
}, 30, "test-resolve-failure");
}
/** */
private static class CacheInfoCommunicationFailureResolver implements CommunicationFailureResolver {
/** */
@LoggerResource
private IgniteLogger log;
/** */
Map<String, CacheConfiguration<?, ?>> caches;
/** */
Map<String, List<List<ClusterNode>>> affMap;
/** */
Map<String, List<List<ClusterNode>>> ownersMap;
/** */
volatile CountDownLatch latch;
/** {@inheritDoc} */
@Override public void resolve(CommunicationFailureContext ctx) {
assert latch != null;
assert latch.getCount() == 1L : latch.getCount();
caches = ctx.startedCaches();
log.info("Resolver called, started caches: " + caches.keySet());
assertNotNull(caches);
affMap = new HashMap<>();
ownersMap = new HashMap<>();
for (String cache : caches.keySet()) {
affMap.put(cache, ctx.cacheAffinity(cache));
ownersMap.put(cache, ctx.cachePartitionOwners(cache));
}
latch.countDown();
}
/**
* @param expCaches Expected caches information (when late assignment doen and rebalance finished).
*/
void checkCachesInfo(Map<String, T3<Integer, Integer, Integer>> expCaches) {
assertNotNull(caches);
assertNotNull(affMap);
assertNotNull(ownersMap);
for (Map.Entry<String, T3<Integer, Integer, Integer>> e : expCaches.entrySet()) {
String cacheName = e.getKey();
int parts = e.getValue().get1();
int backups = e.getValue().get2();
int expNodes = e.getValue().get3();
assertTrue(cacheName, caches.containsKey(cacheName));
CacheConfiguration ccfg = caches.get(cacheName);
assertEquals(cacheName, ccfg.getName());
if (ccfg.getCacheMode() == CacheMode.REPLICATED)
assertEquals(Integer.MAX_VALUE, ccfg.getBackups());
else
assertEquals(backups, ccfg.getBackups());
assertEquals(parts, ccfg.getAffinity().partitions());
List<List<ClusterNode>> aff = affMap.get(cacheName);
assertNotNull(cacheName, aff);
assertEquals(parts, aff.size());
List<List<ClusterNode>> owners = ownersMap.get(cacheName);
assertNotNull(cacheName, owners);
assertEquals(parts, owners.size());
for (int i = 0; i < parts; i++) {
List<ClusterNode> partAff = aff.get(i);
assertEquals(cacheName, expNodes, partAff.size());
List<ClusterNode> partOwners = owners.get(i);
assertEquals(cacheName, expNodes, partOwners.size());
assertTrue(cacheName, partAff.containsAll(partOwners));
assertTrue(cacheName, partOwners.containsAll(partAff));
}
}
}
/** */
void reset() {
caches = null;
affMap = null;
ownersMap = null;
}
}
/** */
private static class NoOpCommunicationFailureResolver implements CommunicationFailureResolver {
/** */
static final IgniteOutClosure<CommunicationFailureResolver> FACTORY
= (IgniteOutClosure<CommunicationFailureResolver>)NoOpCommunicationFailureResolver::new;
/** {@inheritDoc} */
@Override public void resolve(CommunicationFailureContext ctx) {
// No-op.
}
}
/** */
private static class KillCoordinatorCommunicationFailureResolver implements CommunicationFailureResolver {
/** */
static final IgniteOutClosure<CommunicationFailureResolver> FACTORY
= (IgniteOutClosure<CommunicationFailureResolver>)KillCoordinatorCommunicationFailureResolver::new;
/** */
@LoggerResource
private IgniteLogger log;
/** {@inheritDoc} */
@Override public void resolve(CommunicationFailureContext ctx) {
List<ClusterNode> nodes = ctx.topologySnapshot();
ClusterNode node = nodes.get(0);
log.info("Resolver kills node: " + node.id());
ctx.killNode(node);
}
}
/** */
private static class KillRandomCommunicationFailureResolver implements CommunicationFailureResolver {
/** */
static final IgniteOutClosure<CommunicationFailureResolver> FACTORY
= (IgniteOutClosure<CommunicationFailureResolver>)KillRandomCommunicationFailureResolver::new;
/** Last killed nodes. */
static final Set<ClusterNode> LAST_KILLED_NODES = new HashSet<>();
/** */
@LoggerResource
private IgniteLogger log;
/** {@inheritDoc} */
@Override public void resolve(CommunicationFailureContext ctx) {
LAST_KILLED_NODES.clear();
List<ClusterNode> nodes = ctx.topologySnapshot();
ThreadLocalRandom rnd = ThreadLocalRandom.current();
int killNodes = rnd.nextInt(nodes.size() / 2);
log.info("Resolver kills nodes [total=" + nodes.size() + ", kill=" + killNodes + ']');
long srvCnt = nodes.stream().filter(node -> !node.isClient()).count();
Set<Integer> idxs = new HashSet<>();
while (idxs.size() < killNodes) {
int idx = rnd.nextInt(nodes.size());
if(!nodes.get(idx).isClient() && !idxs.contains(idx) && --srvCnt < 1)
continue;
idxs.add(idx);
}
for (int idx : idxs) {
ClusterNode node = nodes.get(idx);
log.info("Resolver kills node: " + node.id());
LAST_KILLED_NODES.add(node);
ctx.killNode(node);
}
}
}
/** */
private static class TestNodeKillCommunicationFailureResolver implements CommunicationFailureResolver {
/**
* @param killOrders Killed nodes order.
* @return Factory.
*/
static IgniteOutClosure<CommunicationFailureResolver> factory(final Collection<Long> killOrders) {
return new IgniteOutClosure<CommunicationFailureResolver>() {
@Override public CommunicationFailureResolver apply() {
return new TestNodeKillCommunicationFailureResolver(killOrders);
}
};
}
/** */
final Collection<Long> killNodeOrders;
/**
* @param killNodeOrders Killed nodes order.
*/
TestNodeKillCommunicationFailureResolver(Collection<Long> killNodeOrders) {
this.killNodeOrders = killNodeOrders;
}
/** {@inheritDoc} */
@Override public void resolve(CommunicationFailureContext ctx) {
List<ClusterNode> nodes = ctx.topologySnapshot();
assertTrue(!nodes.isEmpty());
for (ClusterNode node : nodes) {
if (killNodeOrders.contains(node.order()))
ctx.killNode(node);
}
}
}
}