| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.spi.discovery.tcp; |
| |
| import java.io.ByteArrayOutputStream; |
| import java.io.IOException; |
| import java.net.InetSocketAddress; |
| import java.net.Socket; |
| import java.net.SocketTimeoutException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.LinkedHashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.UUID; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.ThreadLocalRandom; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import org.apache.ignite.Ignite; |
| import org.apache.ignite.IgniteCache; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.IgniteException; |
| import org.apache.ignite.IgniteIllegalStateException; |
| import org.apache.ignite.Ignition; |
| import org.apache.ignite.configuration.CacheConfiguration; |
| import org.apache.ignite.configuration.IgniteConfiguration; |
| import org.apache.ignite.events.DiscoveryEvent; |
| import org.apache.ignite.events.Event; |
| import org.apache.ignite.events.EventType; |
| import org.apache.ignite.internal.IgniteEx; |
| import org.apache.ignite.internal.IgniteInternalFuture; |
| import org.apache.ignite.internal.IgniteKernal; |
| import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; |
| import org.apache.ignite.internal.processors.continuous.StartRoutineAckDiscoveryMessage; |
| import org.apache.ignite.internal.processors.port.GridPortRecord; |
| import org.apache.ignite.internal.util.lang.GridAbsPredicate; |
| import org.apache.ignite.internal.util.typedef.F; |
| import org.apache.ignite.internal.util.typedef.G; |
| import org.apache.ignite.internal.util.typedef.X; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.lang.IgniteBiPredicate; |
| import org.apache.ignite.lang.IgniteInClosure; |
| import org.apache.ignite.lang.IgnitePredicate; |
| import org.apache.ignite.lang.IgniteUuid; |
| import org.apache.ignite.spi.IgniteSpiException; |
| import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; |
| import org.apache.ignite.spi.discovery.DiscoverySpi; |
| import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode; |
| import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder; |
| import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; |
| import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; |
| import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage; |
| import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage; |
| import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage; |
| import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFailedMessage; |
| import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeLeftMessage; |
| import org.apache.ignite.testframework.GridTestUtils; |
| import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; |
| import org.eclipse.jetty.util.ConcurrentHashSet; |
| import org.jetbrains.annotations.Nullable; |
| |
| import static java.util.concurrent.TimeUnit.SECONDS; |
| import static org.apache.ignite.events.EventType.EVT_JOB_MAPPED; |
| import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; |
| import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; |
| import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; |
| import static org.apache.ignite.events.EventType.EVT_NODE_METRICS_UPDATED; |
| import static org.apache.ignite.events.EventType.EVT_TASK_FAILED; |
| import static org.apache.ignite.events.EventType.EVT_TASK_FINISHED; |
| import static org.apache.ignite.spi.IgnitePortProtocol.UDP; |
| |
| /** |
| * Test for {@link TcpDiscoverySpi}. |
| */ |
| public class TcpDiscoverySelfTest extends GridCommonAbstractTest { |
| /** */ |
| private TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); |
| |
| /** */ |
| private Map<String, TcpDiscoverySpi> discoMap = new HashMap<>(); |
| |
| /** */ |
| private UUID nodeId; |
| |
| /** */ |
| private static ThreadLocal<TcpDiscoverySpi> nodeSpi = new ThreadLocal<>(); |
| |
| /** |
| * @throws Exception If fails. |
| */ |
| public TcpDiscoverySelfTest() throws Exception { |
| super(false); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { |
| IgniteConfiguration cfg = super.getConfiguration(gridName); |
| |
| TcpDiscoverySpi spi = nodeSpi.get(); |
| |
| if (spi == null) { |
| spi = gridName.contains("testPingInterruptedOnNodeFailedFailingNode") ? |
| new TestTcpDiscoverySpi() : new TcpDiscoverySpi(); |
| } |
| else |
| nodeSpi.set(null); |
| |
| discoMap.put(gridName, spi); |
| |
| spi.setIpFinder(ipFinder); |
| |
| spi.setNetworkTimeout(2500); |
| |
| spi.setHeartbeatFrequency(1000); |
| |
| spi.setMaxMissedHeartbeats(3); |
| |
| spi.setIpFinderCleanFrequency(5000); |
| |
| spi.setJoinTimeout(5000); |
| |
| cfg.setDiscoverySpi(spi); |
| |
| cfg.setCacheConfiguration(); |
| |
| cfg.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED); |
| |
| cfg.setIncludeProperties(); |
| |
| if (!gridName.contains("LoopbackProblemTest")) |
| cfg.setLocalHost("127.0.0.1"); |
| |
| if (gridName.contains("testFailureDetectionOnNodePing")) { |
| spi.setReconnectCount(1); // To make test faster: on Windows 1 connect takes 1 second. |
| spi.setHeartbeatFrequency(40000); |
| } |
| |
| cfg.setConnectorConfiguration(null); |
| |
| if (nodeId != null) |
| cfg.setNodeId(nodeId); |
| |
| if (gridName.contains("NonSharedIpFinder")) { |
| TcpDiscoveryVmIpFinder finder = new TcpDiscoveryVmIpFinder(); |
| |
| finder.setAddresses(Arrays.asList("127.0.0.1:47501")); |
| |
| spi.setIpFinder(finder); |
| } |
| else if (gridName.contains("MulticastIpFinder")) { |
| TcpDiscoveryMulticastIpFinder finder = new TcpDiscoveryMulticastIpFinder(); |
| |
| finder.setAddressRequestAttempts(10); |
| finder.setMulticastGroup(GridTestUtils.getNextMulticastGroup(getClass())); |
| finder.setMulticastPort(GridTestUtils.getNextMulticastPort(getClass())); |
| |
| spi.setIpFinder(finder); |
| |
| // Loopback multicast discovery is not working on Mac OS |
| // (possibly due to http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=7122846). |
| if (U.isMacOs()) |
| spi.setLocalAddress(F.first(U.allLocalIps())); |
| } |
| else if (gridName.contains("testPingInterruptedOnNodeFailedPingingNode")) |
| cfg.setFailureDetectionTimeout(30_000); |
| |
| return cfg; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void afterTest() throws Exception { |
| discoMap = null; |
| |
| super.afterTest(); |
| } |
| |
| /** |
| * @throws Exception If any error occurs. |
| */ |
| public void testSingleNodeStartStop() throws Exception { |
| try { |
| startGrid(1); |
| } |
| finally { |
| stopGrid(1); |
| } |
| } |
| |
| /** |
| * @throws Exception If any error occurs. |
| */ |
| public void testThreeNodesStartStop() throws Exception { |
| try { |
| IgniteEx ignite1 = startGrid(1); |
| IgniteEx ignite2 = startGrid(2); |
| IgniteEx ignite3 = startGrid(3); |
| |
| TcpDiscoverySpi spi1 = (TcpDiscoverySpi)ignite1.configuration().getDiscoverySpi(); |
| TcpDiscoverySpi spi2 = (TcpDiscoverySpi)ignite2.configuration().getDiscoverySpi(); |
| TcpDiscoverySpi spi3 = (TcpDiscoverySpi)ignite3.configuration().getDiscoverySpi(); |
| |
| TcpDiscoveryNode node = (TcpDiscoveryNode)spi1.getNode(ignite2.localNode().id()); |
| |
| assertNotNull(node); |
| assertNotNull(node.lastSuccessfulAddress()); |
| |
| LinkedHashSet<InetSocketAddress> addrs = spi1.getNodeAddresses(node); |
| |
| assertEquals(addrs.iterator().next(), node.lastSuccessfulAddress()); |
| |
| assertTrue(spi1.pingNode(ignite3.localNode().id())); |
| |
| node = (TcpDiscoveryNode)spi1.getNode(ignite3.localNode().id()); |
| |
| assertNotNull(node); |
| assertNotNull(node.lastSuccessfulAddress()); |
| |
| addrs = spi1.getNodeAddresses(node); |
| assertEquals(addrs.iterator().next(), node.lastSuccessfulAddress()); |
| |
| node = (TcpDiscoveryNode)spi2.getNode(ignite1.localNode().id()); |
| |
| assertNotNull(node); |
| assertNotNull(node.lastSuccessfulAddress()); |
| |
| node = (TcpDiscoveryNode)spi2.getNode(ignite3.localNode().id()); |
| |
| assertNotNull(node); |
| assertNotNull(node.lastSuccessfulAddress()); |
| |
| node = (TcpDiscoveryNode)spi3.getNode(ignite1.localNode().id()); |
| |
| assertNotNull(node); |
| assertNotNull(node.lastSuccessfulAddress()); |
| } |
| finally { |
| stopAllGrids(); |
| } |
| } |
| |
| /** |
| * @throws Exception If any errors occur. |
| */ |
| public void testNodeConnectMessageSize() throws Exception { |
| try { |
| Ignite g1 = startGrid(1); |
| |
| final AtomicInteger gridNameIdx = new AtomicInteger(1); |
| |
| GridTestUtils.runMultiThreaded(new Callable<Object>() { |
| @Nullable @Override public Object call() throws Exception { |
| startGrid(gridNameIdx.incrementAndGet()); |
| |
| return null; |
| } |
| }, 4, "grid-starter"); |
| |
| Collection<TcpDiscoveryNode> nodes = ((ServerImpl)discoMap.get(g1.name()).impl).ring().allNodes(); |
| |
| ByteArrayOutputStream bos = new ByteArrayOutputStream(); |
| |
| g1.configuration().getMarshaller().marshal(nodes, bos); |
| |
| info(">>> Approximate node connect message size [topSize=" + nodes.size() + |
| ", msgSize=" + bos.size() / 1024.0 + "KB]"); |
| } |
| finally { |
| stopAllGrids(false); |
| } |
| } |
| |
| /** |
| * @throws Exception If any error occurs. |
| */ |
| public void testPing() throws Exception { |
| try { |
| startGrid(1); |
| startGrid(2); |
| startGrid(3); |
| |
| info("Nodes were started"); |
| |
| for (Map.Entry<String, TcpDiscoverySpi> e : discoMap.entrySet()) { |
| DiscoverySpi spi = e.getValue(); |
| |
| for (Ignite g : G.allGrids()) { |
| boolean res = spi.pingNode(g.cluster().localNode().id()); |
| |
| assert res : e.getKey() + " failed to ping " + g.cluster().localNode().id() + " of " + g.name(); |
| |
| info(e.getKey() + " pinged " + g.cluster().localNode().id() + " of " + g.name()); |
| } |
| } |
| |
| info("All nodes pinged successfully."); |
| } |
| finally { |
| stopAllGrids(); |
| } |
| } |
| |
| /** |
| * @throws Exception If any error occurs. |
| */ |
| public void testFailureDetectionOnNodePing1() throws Exception { |
| try { |
| Ignite g1 = startGrid("testFailureDetectionOnNodePingCoordinator"); |
| startGrid("testFailureDetectionOnNodePing2"); |
| Ignite g3 = startGrid("testFailureDetectionOnNodePing3"); |
| |
| testFailureDetectionOnNodePing(g1, g3); |
| } |
| finally { |
| stopAllGrids(); |
| } |
| } |
| |
| /** |
| * @throws Exception If any error occurs. |
| */ |
| public void testFailureDetectionOnNodePing2() throws Exception { |
| try { |
| startGrid("testFailureDetectionOnNodePingCoordinator"); |
| Ignite g2 = startGrid("testFailureDetectionOnNodePing2"); |
| Ignite g3 = startGrid("testFailureDetectionOnNodePing3"); |
| |
| testFailureDetectionOnNodePing(g3, g2); |
| } |
| finally { |
| stopAllGrids(); |
| } |
| } |
| |
| /** |
| * @throws Exception If any error occurs. |
| */ |
| public void testFailureDetectionOnNodePing3() throws Exception { |
| try { |
| Ignite g1 = startGrid("testFailureDetectionOnNodePingCoordinator"); |
| Ignite g2 = startGrid("testFailureDetectionOnNodePing2"); |
| startGrid("testFailureDetectionOnNodePing3"); |
| |
| testFailureDetectionOnNodePing(g2, g1); |
| } |
| finally { |
| stopAllGrids(); |
| } |
| } |
| |
| /** |
| * @throws Exception If any error occurs. |
| */ |
| private void testFailureDetectionOnNodePing(Ignite pingingNode, Ignite failedNode) throws Exception { |
| final CountDownLatch cnt = new CountDownLatch(1); |
| |
| final UUID failedNodeId = failedNode.cluster().localNode().id(); |
| |
| pingingNode.events().localListen( |
| new IgnitePredicate<Event>() { |
| @Override public boolean apply(Event evt) { |
| cnt.countDown(); |
| |
| return true; |
| } |
| }, |
| EventType.EVT_NODE_FAILED |
| ); |
| |
| info("Nodes were started"); |
| |
| discoMap.get(failedNode.name()).simulateNodeFailure(); |
| |
| TcpDiscoverySpi spi = discoMap.get(pingingNode.name()); |
| |
| boolean res = spi.pingNode(failedNodeId); |
| |
| assertFalse("Ping is ok for node " + failedNodeId + ", but had to fail.", res); |
| |
| // Heartbeat interval is 40 seconds, but we should detect node failure faster. |
| assert cnt.await(7, SECONDS); |
| } |
| |
| /** |
| * @throws Exception If any error occurs. |
| */ |
| public void testPingInterruptedOnNodeFailed() throws Exception { |
| try { |
| final Ignite pingingNode = startGrid("testPingInterruptedOnNodeFailedPingingNode"); |
| final Ignite failedNode = startGrid("testPingInterruptedOnNodeFailedFailingNode"); |
| startGrid("testPingInterruptedOnNodeFailedSimpleNode"); |
| |
| ((TestTcpDiscoverySpi)failedNode.configuration().getDiscoverySpi()).ignorePingResponse = true; |
| |
| final UUID failedNodeId = failedNode.cluster().localNode().id(); |
| |
| final CountDownLatch pingLatch = new CountDownLatch(1); |
| |
| final CountDownLatch eventLatch = new CountDownLatch(1); |
| |
| final AtomicBoolean pingRes = new AtomicBoolean(true); |
| |
| final AtomicBoolean failRes = new AtomicBoolean(false); |
| |
| long startTs = System.currentTimeMillis(); |
| |
| pingingNode.events().localListen( |
| new IgnitePredicate<Event>() { |
| @Override public boolean apply(Event event) { |
| if (((DiscoveryEvent)event).eventNode().id().equals(failedNodeId)) { |
| failRes.set(true); |
| eventLatch.countDown(); |
| } |
| |
| return true; |
| } |
| }, |
| EventType.EVT_NODE_FAILED); |
| |
| IgniteInternalFuture<?> pingFut = multithreadedAsync( |
| new Callable<Object>() { |
| @Override public Object call() throws Exception { |
| pingLatch.countDown(); |
| |
| pingRes.set(pingingNode.configuration().getDiscoverySpi().pingNode( |
| failedNodeId)); |
| |
| return null; |
| } |
| }, 1); |
| |
| IgniteInternalFuture<?> failingFut = multithreadedAsync( |
| new Callable<Object>() { |
| @Override public Object call() throws Exception { |
| pingLatch.await(); |
| |
| Thread.sleep(3000); |
| |
| ((TestTcpDiscoverySpi)failedNode.configuration().getDiscoverySpi()).simulateNodeFailure(); |
| |
| return null; |
| } |
| }, 1); |
| |
| failingFut.get(); |
| pingFut.get(); |
| |
| assertFalse(pingRes.get()); |
| |
| assertTrue(System.currentTimeMillis() - startTs < |
| pingingNode.configuration().getFailureDetectionTimeout() / 2); |
| |
| assertTrue(eventLatch.await(7, TimeUnit.SECONDS)); |
| assertTrue(failRes.get()); |
| } |
| finally { |
| stopAllGrids(); |
| } |
| } |
| |
| /** |
| * @throws Exception If any error occurs. |
| */ |
| public void testPingInterruptedOnNodeLeft() throws Exception { |
| try { |
| final Ignite pingingNode = startGrid("testPingInterruptedOnNodeFailedPingingNode"); |
| final Ignite leftNode = startGrid("testPingInterruptedOnNodeFailedFailingNode"); |
| startGrid("testPingInterruptedOnNodeFailedSimpleNode"); |
| |
| ((TestTcpDiscoverySpi)leftNode.configuration().getDiscoverySpi()).ignorePingResponse = true; |
| |
| final CountDownLatch pingLatch = new CountDownLatch(1); |
| |
| final AtomicBoolean pingRes = new AtomicBoolean(true); |
| |
| long startTs = System.currentTimeMillis(); |
| |
| IgniteInternalFuture<?> pingFut = multithreadedAsync( |
| new Callable<Object>() { |
| @Override public Object call() throws Exception { |
| pingLatch.countDown(); |
| |
| pingRes.set(pingingNode.configuration().getDiscoverySpi().pingNode( |
| leftNode.cluster().localNode().id())); |
| |
| return null; |
| } |
| }, 1); |
| |
| IgniteInternalFuture<?> stoppingFut = multithreadedAsync( |
| new Callable<Object>() { |
| @Override public Object call() throws Exception { |
| pingLatch.await(); |
| |
| Thread.sleep(3000); |
| |
| stopGrid("testPingInterruptedOnNodeFailedFailingNode"); |
| |
| return null; |
| } |
| }, 1); |
| |
| stoppingFut.get(); |
| pingFut.get(); |
| |
| assertFalse(pingRes.get()); |
| |
| assertTrue(System.currentTimeMillis() - startTs < |
| pingingNode.configuration().getFailureDetectionTimeout() / 2); |
| } |
| finally { |
| stopAllGrids(); |
| } |
| } |
| |
| /** |
| * @throws Exception If any error occurs. |
| */ |
| public void testNodeAdded() throws Exception { |
| try { |
| final Ignite g1 = startGrid(1); |
| |
| final CountDownLatch cnt = new CountDownLatch(2); |
| |
| g1.events().localListen( |
| new IgnitePredicate<Event>() { |
| @Override public boolean apply(Event evt) { |
| info("Node joined: " + evt.message()); |
| |
| DiscoveryEvent discoEvt = (DiscoveryEvent)evt; |
| |
| TcpDiscoveryNode node = ((TcpDiscoveryNode)discoMap.get(g1.name()). |
| getNode(discoEvt.eventNode().id())); |
| |
| assert node != null && node.visible(); |
| |
| cnt.countDown(); |
| |
| return true; |
| } |
| }, |
| EventType.EVT_NODE_JOINED |
| ); |
| |
| startGrid(2); |
| startGrid(3); |
| |
| info("Nodes were started"); |
| |
| assert cnt.await(1, SECONDS); |
| } |
| finally { |
| stopAllGrids(); |
| } |
| } |
| |
| /** |
| * @throws Exception If any error occurs. |
| */ |
| public void testOrdinaryNodeLeave() throws Exception { |
| try { |
| Ignite g1 = startGrid(1); |
| startGrid(2); |
| startGrid(3); |
| |
| final CountDownLatch cnt = new CountDownLatch(2); |
| |
| g1.events().localListen( |
| new IgnitePredicate<Event>() { |
| @Override public boolean apply(Event evt) { |
| cnt.countDown(); |
| |
| return true; |
| } |
| }, |
| EVT_NODE_LEFT, EVT_NODE_FAILED); |
| |
| info("Nodes were started"); |
| |
| stopGrid(3); |
| stopGrid(2); |
| |
| boolean res = cnt.await(1, SECONDS); |
| |
| assert res; |
| } |
| finally { |
| stopAllGrids(); |
| } |
| } |
| |
| /** |
| * @throws Exception If any error occurs. |
| */ |
| public void testCoordinatorNodeLeave() throws Exception { |
| try { |
| startGrid(1); |
| Ignite g2 = startGrid(2); |
| |
| final CountDownLatch cnt = new CountDownLatch(1); |
| |
| g2.events().localListen(new IgnitePredicate<Event>() { |
| @Override public boolean apply(Event evt) { |
| cnt.countDown(); |
| |
| return true; |
| } |
| }, EVT_NODE_LEFT, EVT_NODE_FAILED); |
| |
| info("Nodes were started"); |
| |
| stopGrid(1); |
| |
| assert cnt.await(1, SECONDS); |
| |
| // Start new grid, ensure that added to topology |
| final CountDownLatch cnt2 = new CountDownLatch(1); |
| |
| g2.events().localListen(new IgnitePredicate<Event>() { |
| @Override public boolean apply(Event evt) { |
| cnt2.countDown(); |
| |
| return true; |
| } |
| }, EVT_NODE_JOINED); |
| |
| startGrid(3); |
| |
| assert cnt2.await(1, SECONDS); |
| } |
| finally { |
| stopAllGrids(); |
| } |
| } |
| |
| /** |
| * @throws Exception If any error occurs. |
| */ |
| public void testOrdinaryNodeFailure() throws Exception { |
| try { |
| Ignite g1 = startGrid(1); |
| Ignite g2 = startGrid(2); |
| Ignite g3 = startGrid(3); |
| |
| final CountDownLatch cnt = new CountDownLatch(2); |
| |
| g1.events().localListen( |
| new IgnitePredicate<Event>() { |
| @Override public boolean apply(Event evt) { |
| cnt.countDown(); |
| |
| return true; |
| } |
| }, |
| EventType.EVT_NODE_FAILED |
| ); |
| |
| info("Nodes were started"); |
| |
| discoMap.get(g2.name()).simulateNodeFailure(); |
| discoMap.get(g3.name()).simulateNodeFailure(); |
| |
| assert cnt.await(25, SECONDS); |
| } |
| finally { |
| stopAllGrids(); |
| } |
| } |
| |
| /** |
| * @throws Exception If any error occurs. |
| */ |
| public void testCoordinatorNodeFailure() throws Exception { |
| try { |
| Ignite g1 = startGrid(1); |
| Ignite g2 = startGrid(2); |
| |
| final CountDownLatch cnt = new CountDownLatch(1); |
| |
| g2.events().localListen(new IgnitePredicate<Event>() { |
| @Override public boolean apply(Event evt) { |
| cnt.countDown(); |
| |
| return true; |
| } |
| }, EventType.EVT_NODE_FAILED); |
| |
| info("Nodes were started"); |
| |
| discoMap.get(g1.name()).simulateNodeFailure(); |
| |
| assert cnt.await(20, SECONDS); |
| } |
| finally { |
| stopAllGrids(); |
| } |
| } |
| |
| /** |
| * @throws Exception If any error occurs. |
| */ |
| public void testMetricsSending() throws Exception { |
| final AtomicBoolean stopping = new AtomicBoolean(); |
| |
| try { |
| final CountDownLatch latch1 = new CountDownLatch(1); |
| |
| final Ignite g1 = startGrid(1); |
| |
| IgnitePredicate<Event> lsnr1 = new IgnitePredicate<Event>() { |
| @Override public boolean apply(Event evt) { |
| info(evt.message()); |
| |
| latch1.countDown(); |
| |
| return true; |
| } |
| }; |
| |
| g1.events().localListen(lsnr1, EVT_NODE_METRICS_UPDATED); |
| |
| assert latch1.await(10, SECONDS); |
| |
| g1.events().stopLocalListen(lsnr1); |
| |
| final CountDownLatch latch1_1 = new CountDownLatch(1); |
| final CountDownLatch latch1_2 = new CountDownLatch(1); |
| final CountDownLatch latch2_1 = new CountDownLatch(1); |
| final CountDownLatch latch2_2 = new CountDownLatch(1); |
| |
| final Ignite g2 = startGrid(2); |
| |
| g2.events().localListen( |
| new IgnitePredicate<Event>() { |
| @Override public boolean apply(Event evt) { |
| if (stopping.get()) |
| return true; |
| |
| info(evt.message()); |
| |
| UUID id = ((DiscoveryEvent) evt).eventNode().id(); |
| |
| if (id.equals(g1.cluster().localNode().id())) |
| latch2_1.countDown(); |
| else if (id.equals(g2.cluster().localNode().id())) |
| latch2_2.countDown(); |
| else |
| assert false : "Event fired for unknown node."; |
| |
| return true; |
| } |
| }, |
| EVT_NODE_METRICS_UPDATED |
| ); |
| |
| g1.events().localListen(new IgnitePredicate<Event>() { |
| @Override public boolean apply(Event evt) { |
| if (stopping.get()) |
| return true; |
| |
| info(evt.message()); |
| |
| UUID id = ((DiscoveryEvent) evt).eventNode().id(); |
| |
| if (id.equals(g1.cluster().localNode().id())) |
| latch1_1.countDown(); |
| else if (id.equals(g2.cluster().localNode().id())) |
| latch1_2.countDown(); |
| else |
| assert false : "Event fired for unknown node."; |
| |
| return true; |
| } |
| }, EVT_NODE_METRICS_UPDATED); |
| |
| assert latch1_1.await(10, SECONDS); |
| assert latch1_2.await(10, SECONDS); |
| assert latch2_1.await(10, SECONDS); |
| assert latch2_2.await(10, SECONDS); |
| } |
| finally { |
| stopping.set(true); |
| |
| stopAllGrids(); |
| } |
| } |
| |
| /** |
| * @throws Exception If any error occurs. |
| */ |
| public void testFailBeforeNodeAddedSent() throws Exception { |
| try { |
| Ignite g1 = startGrid(1); |
| |
| final CountDownLatch joinCnt = new CountDownLatch(2); |
| final CountDownLatch failCnt = new CountDownLatch(1); |
| |
| g1.events().localListen(new IgnitePredicate<Event>() { |
| @Override public boolean apply(Event evt) { |
| if (evt.type() == EVT_NODE_JOINED) |
| joinCnt.countDown(); |
| else if (evt.type() == EVT_NODE_FAILED) |
| failCnt.countDown(); |
| else |
| assert false : "Unexpected event type: " + evt; |
| |
| return true; |
| } |
| }, EVT_NODE_JOINED, EVT_NODE_FAILED); |
| |
| final Ignite g = startGrid("FailBeforeNodeAddedSentSpi"); |
| |
| discoMap.get(g.name()).addSendMessageListener(new IgniteInClosure<TcpDiscoveryAbstractMessage>() { |
| @Override public void apply(TcpDiscoveryAbstractMessage msg) { |
| if (msg instanceof TcpDiscoveryNodeAddedMessage) { |
| discoMap.get(g.name()).simulateNodeFailure(); |
| |
| throw new RuntimeException("Avoid message sending: " + msg.getClass()); |
| } |
| } |
| }); |
| |
| startGrid(3); |
| |
| assert joinCnt.await(10, SECONDS); |
| assert failCnt.await(10, SECONDS); |
| } |
| finally { |
| stopAllGrids(); |
| } |
| } |
| |
| /** |
| * @throws Exception If any error occurs. |
| */ |
| public void testFailBeforeNodeLeftSent() throws Exception { |
| try { |
| startGrid(1); |
| startGrid(2); |
| |
| final Ignite g = startGrid("FailBeforeNodeLeftSentSpi"); |
| |
| discoMap.get(g.name()).addSendMessageListener(new IgniteInClosure<TcpDiscoveryAbstractMessage>() { |
| @Override public void apply(TcpDiscoveryAbstractMessage msg) { |
| if (msg instanceof TcpDiscoveryNodeLeftMessage) { |
| discoMap.get(g.name()).simulateNodeFailure(); |
| |
| throw new RuntimeException("Avoid message sending: " + msg.getClass()); |
| } |
| } |
| }); |
| |
| Ignite g3 = startGrid(3); |
| |
| final CountDownLatch cnt = new CountDownLatch(1); |
| |
| g3.events().localListen(new IgnitePredicate<Event>() { |
| @Override public boolean apply(Event evt) { |
| cnt.countDown(); |
| |
| return true; |
| } |
| }, EVT_NODE_FAILED); |
| |
| stopGrid(1); |
| |
| assert cnt.await(20, SECONDS); |
| } |
| finally { |
| stopAllGrids(); |
| } |
| } |
| |
| /** |
| * @throws Exception If any error occurs. |
| */ |
| public void testIpFinderCleaning() throws Exception { |
| try { |
| ipFinder.registerAddresses(Arrays.asList(new InetSocketAddress("1.1.1.1", 1024), |
| new InetSocketAddress("1.1.1.2", 1024))); |
| |
| Ignite g1 = startGrid(1); |
| |
| long failureDetectTimeout = g1.configuration().getFailureDetectionTimeout(); |
| |
| long timeout = (long)(discoMap.get(g1.name()).getIpFinderCleanFrequency() * 1.5) + failureDetectTimeout; |
| |
| GridTestUtils.waitForCondition(new GridAbsPredicate() { |
| @Override public boolean apply() { |
| return ipFinder.getRegisteredAddresses().size() == 1; |
| } |
| }, timeout); |
| |
| if (ipFinder.getRegisteredAddresses().size() != 1) { |
| log.error("Failed to wait for IP cleanup, will dump threads."); |
| |
| U.dumpThreads(log); |
| } |
| |
| assert ipFinder.getRegisteredAddresses().size() == 1 : "ipFinder=" + ipFinder.getRegisteredAddresses(); |
| |
| // Check that missing addresses are returned back. |
| ipFinder.unregisterAddresses(ipFinder.getRegisteredAddresses()); // Unregister valid address. |
| |
| ipFinder.registerAddresses(Arrays.asList(new InetSocketAddress("1.1.1.1", 1024), |
| new InetSocketAddress("1.1.1.2", 1024))); |
| |
| GridTestUtils.waitForCondition(new GridAbsPredicate() { |
| @Override public boolean apply() { |
| return ipFinder.getRegisteredAddresses().size() == 1; |
| } |
| }, timeout); |
| |
| assert ipFinder.getRegisteredAddresses().size() == 1 : "ipFinder=" + ipFinder.getRegisteredAddresses(); |
| } |
| finally { |
| stopAllGrids(); |
| } |
| } |
| |
| /** |
| * @throws Exception If any error occurs. |
| */ |
| public void testNonSharedIpFinder() throws Exception { |
| try { |
| GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { |
| @Override public Object call() throws Exception { |
| Thread.sleep(4000); |
| |
| return startGrid("NonSharedIpFinder-2"); |
| } |
| }, 1, "grid-starter"); |
| |
| // This node should wait until any node "from ipFinder" appears, see log messages. |
| Ignite g = startGrid("NonSharedIpFinder-1"); |
| |
| assert g.cluster().localNode().order() == 2; |
| } |
| finally { |
| stopAllGrids(); |
| } |
| } |
| |
| /** |
| * @throws Exception If any error occurs. |
| */ |
| public void testMulticastIpFinder() throws Exception { |
| try { |
| for (int i = 0; i < 5; i++) { |
| Ignite g = startGrid("MulticastIpFinder-" + i); |
| |
| assertEquals(i + 1, g.cluster().nodes().size()); |
| |
| TcpDiscoverySpi spi = (TcpDiscoverySpi)g.configuration().getDiscoverySpi(); |
| |
| TcpDiscoveryMulticastIpFinder ipFinder = (TcpDiscoveryMulticastIpFinder)spi.getIpFinder(); |
| |
| boolean found = false; |
| |
| for (GridPortRecord rec : ((IgniteKernal) g).context().ports().records()) { |
| if ((rec.protocol() == UDP) && rec.port() == ipFinder.getMulticastPort()) { |
| found = true; |
| |
| break; |
| } |
| } |
| |
| assertTrue("TcpDiscoveryMulticastIpFinder should register port." , found); |
| } |
| } |
| finally { |
| stopAllGrids(); |
| } |
| } |
| |
| /** |
| * @throws Exception If any error occurs. |
| */ |
| public void testInvalidAddressIpFinder() throws Exception { |
| ipFinder.setShared(false); |
| |
| ipFinder.setAddresses(Collections.singletonList("some-host")); |
| |
| try { |
| GridTestUtils.assertThrows( |
| log, |
| new Callable<Object>() { |
| @Nullable @Override public Object call() throws Exception { |
| startGrid(1); |
| |
| return null; |
| } |
| }, |
| IgniteCheckedException.class, |
| null); |
| } |
| finally { |
| stopAllGrids(); |
| } |
| } |
| |
| /** |
| * @throws Exception If any error occurs. |
| */ |
| public void testJoinTimeout() throws Exception { |
| try { |
| // This start will fail as expected. |
| Throwable t = GridTestUtils.assertThrows(log, new Callable<Object>() { |
| @Override public Object call() throws Exception { |
| startGrid("NonSharedIpFinder-1"); |
| |
| return null; |
| } |
| }, IgniteCheckedException.class, null); |
| |
| assert X.hasCause(t, IgniteSpiException.class) : "Unexpected exception: " + t; |
| } |
| finally { |
| stopAllGrids(); |
| } |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| public void testDirtyIpFinder() throws Exception { |
| try { |
| // Dirty IP finder |
| for (int i = 47500; i < 47520; i++) |
| ipFinder.registerAddresses(Arrays.asList(new InetSocketAddress("127.0.0.1", i), |
| new InetSocketAddress("unknown-host", i))); |
| |
| assert ipFinder.isShared(); |
| |
| startGrid(1); |
| } |
| finally { |
| stopAllGrids(); |
| } |
| } |
| |
| /** |
| * @throws Exception If any error occurs. |
| */ |
| public void testDuplicateId() throws Exception { |
| try { |
| // Random ID. |
| startGrid(1); |
| |
| nodeId = UUID.randomUUID(); |
| |
| startGrid(2); |
| |
| // Duplicate ID. |
| GridTestUtils.assertThrows( |
| log, |
| new Callable<Object>() { |
| @Nullable @Override public Object call() throws Exception { |
| // Exception will be thrown and output to log. |
| startGrid(3); |
| |
| return null; |
| } |
| }, |
| IgniteCheckedException.class, |
| null); |
| } |
| finally { |
| stopAllGrids(); |
| } |
| } |
| |
| /** |
| * @throws Exception If any error occurs. |
| */ |
| public void testLoopbackProblemFirstNodeOnLoopback() throws Exception { |
| // On Windows and Mac machines two nodes can reside on the same port |
| // (if one node has localHost="127.0.0.1" and another has localHost="0.0.0.0"). |
| // So two nodes do not even discover each other. |
| if (U.isWindows() || U.isMacOs()) |
| return; |
| |
| try { |
| startGridNoOptimize(1); |
| |
| GridTestUtils.assertThrows( |
| log, |
| new Callable<Object>() { |
| @Nullable @Override public Object call() throws Exception { |
| // Exception will be thrown because we start node which does not use loopback address, |
| // but the first node does. |
| startGridNoOptimize("LoopbackProblemTest"); |
| |
| return null; |
| } |
| }, |
| IgniteException.class, |
| null); |
| } |
| finally { |
| stopAllGrids(); |
| } |
| } |
| |
| /** |
| * @throws Exception If any error occurs. |
| */ |
| public void testLoopbackProblemSecondNodeOnLoopback() throws Exception { |
| if (U.isWindows() || U.isMacOs()) |
| return; |
| |
| try { |
| startGridNoOptimize("LoopbackProblemTest"); |
| |
| GridTestUtils.assertThrows( |
| log, |
| new Callable<Object>() { |
| @Nullable @Override public Object call() throws Exception { |
| // Exception will be thrown because we start node which uses loopback address, |
| // but the first node does not. |
| startGridNoOptimize(1); |
| |
| return null; |
| } |
| }, |
| IgniteException.class, |
| null); |
| } |
| finally { |
| stopAllGrids(); |
| } |
| } |
| |
| /** |
| * @throws Exception If any error occurs. |
| */ |
| public void testGridStartTime() throws Exception { |
| try { |
| startGridsMultiThreaded(5); |
| |
| Long startTime = null; |
| |
| IgniteKernal firstGrid = null; |
| |
| Collection<IgniteKernal> grids = new ArrayList<>(); |
| |
| for (int i = 0; i < 5 ; i++) { |
| IgniteKernal grid = (IgniteKernal)grid(i); |
| |
| assertTrue(grid.context().discovery().gridStartTime() > 0); |
| |
| if (i > 0) |
| assertEquals(startTime, (Long)grid.context().discovery().gridStartTime()); |
| else |
| startTime = grid.context().discovery().gridStartTime(); |
| |
| if (grid.localNode().order() == 1) |
| firstGrid = grid; |
| else |
| grids.add(grid); |
| } |
| |
| assertNotNull(firstGrid); |
| |
| stopGrid(firstGrid.name()); |
| |
| for (IgniteKernal grid : grids) |
| assertEquals(startTime, (Long)grid.context().discovery().gridStartTime()); |
| |
| grids.add((IgniteKernal)startGrid(5)); |
| |
| for (IgniteKernal grid : grids) |
| assertEquals(startTime, (Long)grid.context().discovery().gridStartTime()); |
| } |
| finally { |
| stopAllGrids(); |
| } |
| } |
| |
| /** |
| * @throws Exception If failed |
| */ |
| public void testCustomEventRace1_1() throws Exception { |
| try { |
| customEventRace1(true, false); |
| } |
| finally { |
| stopAllGrids(); |
| } |
| } |
| |
| /** |
| * @throws Exception If failed |
| */ |
| public void testCustomEventRace1_2() throws Exception { |
| try { |
| customEventRace1(false, false); |
| } |
| finally { |
| stopAllGrids(); |
| } |
| } |
| |
| /** |
| * @throws Exception If failed |
| */ |
| public void testCustomEventRace1_3() throws Exception { |
| try { |
| customEventRace1(true, true); |
| } |
| finally { |
| stopAllGrids(); |
| } |
| } |
| |
| /** |
| * @param cacheStartFrom1 If {code true} starts cache from node1. |
| * @param stopCrd If {@code true} stops coordinator. |
| * @throws Exception If failed |
| */ |
| private void customEventRace1(final boolean cacheStartFrom1, boolean stopCrd) throws Exception { |
| TestCustomEventRaceSpi spi0 = new TestCustomEventRaceSpi(); |
| |
| nodeSpi.set(spi0); |
| |
| final Ignite ignite0 = startGrid(0); |
| |
| nodeSpi.set(new TestCustomEventRaceSpi()); |
| |
| final Ignite ignite1 = startGrid(1); |
| |
| CountDownLatch latch1 = new CountDownLatch(1); |
| CountDownLatch latch2 = new CountDownLatch(1); |
| |
| spi0.nodeAdded1 = latch1; |
| spi0.nodeAdded2 = latch2; |
| spi0.debug = true; |
| |
| IgniteInternalFuture<?> fut1 = GridTestUtils.runAsync(new Callable<Void>() { |
| @Override public Void call() throws Exception { |
| log.info("Start 2"); |
| |
| nodeSpi.set(new TestCustomEventRaceSpi()); |
| |
| Ignite ignite2 = startGrid(2); |
| |
| return null; |
| } |
| }); |
| |
| latch1.await(); |
| |
| final String CACHE_NAME = "cache"; |
| |
| IgniteInternalFuture<?> fut2 = GridTestUtils.runAsync(new Callable<Void>() { |
| @Override public Void call() throws Exception { |
| CacheConfiguration ccfg = new CacheConfiguration(); |
| |
| ccfg.setName(CACHE_NAME); |
| |
| Ignite ignite = cacheStartFrom1 ? ignite1 : ignite0; |
| |
| ignite.createCache(ccfg); |
| |
| return null; |
| } |
| }); |
| |
| if (stopCrd) { |
| spi0.stop = true; |
| |
| latch2.countDown(); |
| |
| ignite0.close(); |
| } |
| else { |
| U.sleep(500); |
| |
| latch2.countDown(); |
| } |
| |
| fut1.get(); |
| fut2.get(); |
| |
| IgniteCache<Object, Object> cache = grid(2).cache(CACHE_NAME); |
| |
| assertNotNull(cache); |
| |
| cache.put(1, 1); |
| |
| assertEquals(1, cache.get(1)); |
| |
| nodeSpi.set(new TestCustomEventRaceSpi()); |
| |
| Ignite ignite = startGrid(3); |
| |
| cache = ignite.cache(CACHE_NAME); |
| |
| cache.put(2, 2); |
| |
| assertEquals(1, cache.get(1)); |
| assertEquals(2, cache.get(2)); |
| } |
| |
| /** |
| * @throws Exception If failed |
| */ |
| public void testCustomEventCoordinatorFailure1() throws Exception { |
| try { |
| customEventCoordinatorFailure(true); |
| } |
| finally { |
| stopAllGrids(); |
| } |
| } |
| |
| /** |
| * @throws Exception If failed |
| */ |
| public void testCustomEventCoordinatorFailure2() throws Exception { |
| try { |
| customEventCoordinatorFailure(false); |
| } |
| finally { |
| stopAllGrids(); |
| } |
| } |
| |
| /** |
| * @throws Exception If failed |
| */ |
| public void testNodeShutdownOnRingMessageWorkerFailure() throws Exception { |
| try { |
| TestMessageWorkerFailureSpi1 spi0 = new TestMessageWorkerFailureSpi1(); |
| |
| nodeSpi.set(spi0); |
| |
| final Ignite ignite0 = startGrid(0); |
| |
| nodeSpi.set(new TcpDiscoverySpi()); |
| |
| Ignite ignite1 = startGrid(1); |
| |
| final AtomicBoolean disconnected = new AtomicBoolean(); |
| |
| final CountDownLatch latch = new CountDownLatch(1); |
| |
| final UUID failedNodeId = ignite0.cluster().localNode().id(); |
| |
| ignite1.events().localListen(new IgnitePredicate<Event>() { |
| @Override public boolean apply(Event evt) { |
| if (evt.type() == EventType.EVT_NODE_FAILED && |
| failedNodeId.equals(((DiscoveryEvent)evt).eventNode().id())) |
| disconnected.set(true); |
| |
| latch.countDown(); |
| |
| return false; |
| } |
| }, EventType.EVT_NODE_FAILED); |
| |
| spi0.stop = true; |
| |
| latch.await(15, TimeUnit.SECONDS); |
| |
| assertTrue(disconnected.get()); |
| |
| try { |
| ignite0.cluster().localNode().id(); |
| } |
| catch (IllegalStateException e) { |
| if (e.getMessage().contains("Grid is in invalid state to perform this operation")) |
| return; |
| } |
| |
| fail(); |
| } |
| finally { |
| stopAllGrids(); |
| } |
| } |
| /** |
| * @throws Exception If failed |
| */ |
| public void testNodeShutdownOnRingMessageWorkerStartNotFinished() throws Exception { |
| try { |
| Ignite ignite0 = startGrid(0); |
| |
| TestMessageWorkerFailureSpi2 spi0 = new TestMessageWorkerFailureSpi2(); |
| |
| nodeSpi.set(spi0); |
| |
| try { |
| startGrid(1); |
| |
| fail(); |
| } |
| catch (Exception e) { |
| log.error("Expected error: " + e, e); |
| } |
| |
| Ignite ignite1 = startGrid(1); |
| |
| assertEquals(2, ignite1.cluster().nodes().size()); |
| assertEquals(4, ignite1.cluster().topologyVersion()); |
| |
| assertEquals(2, ignite0.cluster().nodes().size()); |
| assertEquals(4, ignite0.cluster().topologyVersion()); |
| } |
| finally { |
| stopAllGrids(); |
| } |
| } |
| |
| |
| /** |
| * @param twoNodes If {@code true} starts two nodes, otherwise three. |
| * @throws Exception If failed |
| */ |
| private void customEventCoordinatorFailure(boolean twoNodes) throws Exception { |
| TestCustomEventCoordinatorFailureSpi spi0 = new TestCustomEventCoordinatorFailureSpi(); |
| |
| nodeSpi.set(spi0); |
| |
| Ignite ignite0 = startGrid(0); |
| |
| nodeSpi.set(new TestCustomEventCoordinatorFailureSpi()); |
| |
| Ignite ignite1 = startGrid(1); |
| |
| nodeSpi.set(new TestCustomEventCoordinatorFailureSpi()); |
| |
| Ignite ignite2 = twoNodes ? null : startGrid(2); |
| |
| final Ignite createCacheNode = ignite2 != null ? ignite2 : ignite1; |
| |
| CountDownLatch latch = new CountDownLatch(1); |
| |
| spi0.latch = latch; |
| |
| final String CACHE_NAME = "test-cache"; |
| |
| IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() { |
| @Override public Void call() throws Exception { |
| log.info("Create test cache"); |
| |
| CacheConfiguration ccfg = new CacheConfiguration(); |
| |
| ccfg.setName(CACHE_NAME); |
| |
| createCacheNode.createCache(ccfg); |
| |
| return null; |
| } |
| }, "create-cache-thread"); |
| |
| ((TcpCommunicationSpi)ignite0.configuration().getCommunicationSpi()).simulateNodeFailure(); |
| |
| latch.await(); |
| |
| ignite0.close(); |
| |
| fut.get(); |
| |
| IgniteCache<Object, Object> cache = grid(1).cache(CACHE_NAME); |
| |
| assertNotNull(cache); |
| |
| cache.put(1, 1); |
| |
| assertEquals(1, cache.get(1)); |
| |
| log.info("Try start one more node."); |
| |
| nodeSpi.set(new TestCustomEventCoordinatorFailureSpi()); |
| |
| Ignite ignite = startGrid(twoNodes ? 2 : 3); |
| |
| cache = ignite.cache(CACHE_NAME); |
| |
| assertNotNull(cache); |
| |
| cache.put(2, 2); |
| |
| assertEquals(1, cache.get(1)); |
| assertEquals(2, cache.get(2)); |
| } |
| |
| /** |
| * Coordinator is added in failed list during node start. |
| * |
| * @throws Exception If failed. |
| */ |
| public void testFailedNodes1() throws Exception { |
| try { |
| final int FAIL_ORDER = 3; |
| |
| nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER)); |
| |
| final Ignite ignite0 = startGrid(0); |
| |
| nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER)); |
| |
| startGrid(1); |
| |
| nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER)); |
| |
| Ignite ignite2 = startGrid(2); |
| |
| assertEquals(2, ignite2.cluster().nodes().size()); |
| |
| waitNodeStop(ignite0.name()); |
| |
| tryCreateCache(2); |
| } |
| finally { |
| stopAllGrids(); |
| } |
| } |
| |
| /** |
| * Coordinator is added in failed list, concurrent nodes start. |
| * |
| * @throws Exception If failed. |
| */ |
| public void testFailedNodes2() throws Exception { |
| try { |
| final int FAIL_ORDER = 3; |
| |
| nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER)); |
| |
| Ignite ignite0 = startGrid(0); |
| |
| nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER)); |
| |
| startGrid(1); |
| |
| final AtomicInteger nodeIdx = new AtomicInteger(1); |
| |
| GridTestUtils.runMultiThreaded(new Callable<Void>() { |
| @Override public Void call() throws Exception { |
| int idx = nodeIdx.incrementAndGet(); |
| |
| nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER)); |
| |
| startGrid(idx); |
| |
| return null; |
| } |
| }, 3, "start-node"); |
| |
| Ignite ignite2 = ignite(2); |
| |
| waitForRemoteNodes(ignite2, 3); |
| |
| waitNodeStop(ignite0.name()); |
| |
| tryCreateCache(4); |
| } |
| finally { |
| stopAllGrids(); |
| } |
| } |
| |
| /** |
| * Coordinator is added in failed list during node start, test with two nodes. |
| * |
| * @throws Exception If failed. |
| */ |
| public void testFailedNodes3() throws Exception { |
| try { |
| nodeSpi.set(new TestFailedNodesSpi(-1)); |
| |
| Ignite ignite0 = startGrid(0); |
| |
| nodeSpi.set(new TestFailedNodesSpi(2)); |
| |
| Ignite ignite1 = startGrid(1); |
| |
| assertEquals(1, ignite1.cluster().nodes().size()); |
| |
| waitNodeStop(ignite0.name()); |
| |
| ignite1.getOrCreateCache(new CacheConfiguration<>()).put(1, 1); |
| |
| startGrid(2); |
| |
| assertEquals(2, ignite1.cluster().nodes().size()); |
| |
| tryCreateCache(2); |
| } |
| finally { |
| stopAllGrids(); |
| } |
| } |
| |
| /** |
| * Coordinator is added in failed list during node start, but node detected failure dies before |
| * sending {@link TcpDiscoveryNodeFailedMessage}. |
| * |
| * @throws Exception If failed. |
| */ |
| public void testFailedNodes4() throws Exception { |
| try { |
| final int FAIL_ORDER = 3; |
| |
| nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER)); |
| |
| final Ignite ignite0 = startGrid(0); |
| |
| nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER)); |
| |
| Ignite ignite1 = startGrid(1); |
| |
| TestFailedNodesSpi spi = new TestFailedNodesSpi(FAIL_ORDER); |
| |
| spi.stopBeforeSndFail = true; |
| |
| nodeSpi.set(spi); |
| |
| Ignite ignite2 = startGrid(2); |
| |
| waitNodeStop(ignite2.name()); |
| |
| log.info("Try start new node."); |
| |
| Ignite ignite3 = startGrid(3); |
| |
| waitNodeStop(ignite0.name()); |
| |
| assertEquals(2, ignite1.cluster().nodes().size()); |
| assertEquals(2, ignite3.cluster().nodes().size()); |
| |
| tryCreateCache(2); |
| } |
| finally { |
| stopAllGrids(); |
| } |
| } |
| |
| /** |
| * Adds some node in failed list after join process finished. |
| * |
| * @throws Exception If failed. |
| */ |
| public void testFailedNodes5() throws Exception { |
| try { |
| ThreadLocalRandom rnd = ThreadLocalRandom.current(); |
| |
| for (int iter = 0; iter < 3; iter++) { |
| final int NODES = iter == 0 ? 2 : rnd.nextInt(3, 6); |
| |
| for (int i = 0; i < NODES; i++) { |
| nodeSpi.set(new TestFailedNodesSpi(-1)); |
| |
| startGrid(i); |
| } |
| |
| Map<Long, Ignite> nodes = new HashMap<>(); |
| |
| for (int i = 0; i < NODES; i++) { |
| Ignite ignite = ignite(i); |
| |
| nodes.put(ignite.cluster().localNode().order(), ignite); |
| } |
| |
| Ignite ignite = ignite(rnd.nextInt(NODES)); |
| |
| log.info("Iteration [iter=" + iter + ", nodes=" + NODES + ", failFrom=" + ignite.name() + ']'); |
| |
| TestFailedNodesSpi spi = (TestFailedNodesSpi)ignite.configuration().getDiscoverySpi(); |
| |
| spi.failSingleMsg = true; |
| |
| long order = ignite.cluster().localNode().order(); |
| |
| long nextOrder = order == NODES ? 1 : order + 1; |
| |
| Ignite failingNode = nodes.get(nextOrder); |
| |
| assertNotNull(failingNode); |
| |
| waitNodeStop(failingNode.name()); |
| |
| Ignite newNode = startGrid(NODES); |
| |
| assertEquals(NODES, newNode.cluster().nodes().size()); |
| |
| tryCreateCache(NODES); |
| |
| stopAllGrids(); |
| } |
| } |
| finally { |
| stopAllGrids(); |
| } |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| public void testCustomEventAckNotSend() throws Exception { |
| try { |
| TestCustomerEventAckSpi spi0 = new TestCustomerEventAckSpi(); |
| |
| nodeSpi.set(spi0); |
| |
| Ignite ignite0 = startGrid(0); |
| |
| nodeSpi.set(new TestCustomerEventAckSpi()); |
| |
| Ignite ignite1 = startGrid(1); |
| |
| spi0.stopBeforeSndAck = true; |
| |
| ignite1.message().remoteListen("test", new DummyPredicate()); |
| |
| waitNodeStop(ignite0.name()); |
| |
| startGrid(2); |
| } |
| finally { |
| stopAllGrids(); |
| } |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| public void testDiscoveryEventsDiscard() throws Exception { |
| try { |
| TestEventDiscardSpi spi = new TestEventDiscardSpi(); |
| |
| nodeSpi.set(spi); |
| |
| Ignite ignite0 = startGrid(0); |
| |
| startGrid(1); |
| |
| ignite0.createCache(new CacheConfiguration<>()); // Send custom message. |
| |
| ignite0.destroyCache(null); // Send custom message. |
| |
| stopGrid(1); |
| |
| log.info("Start new node."); |
| |
| spi.checkDuplicates = true; |
| |
| startGrid(1); |
| |
| spi.checkDuplicates = false; |
| |
| assertFalse(spi.failed); |
| } |
| finally { |
| stopAllGrids(); |
| } |
| } |
| |
| /** |
| * @param nodeName Node name. |
| * @throws Exception If failed. |
| */ |
| private void waitNodeStop(final String nodeName) throws Exception { |
| boolean wait = GridTestUtils.waitForCondition(new GridAbsPredicate() { |
| @Override public boolean apply() { |
| try { |
| Ignition.ignite(nodeName); |
| |
| return false; |
| } |
| catch (IgniteIllegalStateException e) { |
| return true; |
| } |
| } |
| }, 10_000); |
| |
| if (!wait) |
| U.dumpThreads(log); |
| |
| assertTrue("Failed to wait for node stop.", wait); |
| } |
| |
| /** |
| * @param expNodes Expected nodes number. |
| */ |
| private void tryCreateCache(int expNodes) { |
| List<Ignite> allNodes = G.allGrids(); |
| |
| assertEquals(expNodes, allNodes.size()); |
| |
| int cntr = 0; |
| |
| for (Ignite ignite : allNodes) { |
| CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(); |
| |
| ccfg.setName("cache-" + cntr++); |
| |
| log.info("Try create cache [node=" + ignite.name() + ", cache=" + ccfg.getName() + ']'); |
| |
| ignite.getOrCreateCache(ccfg).put(1, 1); |
| } |
| } |
| |
| /** |
| * |
| */ |
| static class DummyPredicate implements IgniteBiPredicate<UUID, Object> { |
| /** {@inheritDoc} */ |
| @Override public boolean apply(UUID uuid, Object o) { |
| return true; |
| } |
| } |
| |
| |
| /** |
| * |
| */ |
| private static class TestEventDiscardSpi extends TcpDiscoverySpi { |
| /** */ |
| private ConcurrentHashSet<IgniteUuid> msgIds = new ConcurrentHashSet<>(); |
| |
| /** */ |
| private volatile boolean checkDuplicates; |
| |
| /** */ |
| private volatile boolean failed; |
| |
| /** {@inheritDoc} */ |
| @Override protected void writeToSocket(Socket sock, |
| TcpDiscoveryAbstractMessage msg, |
| long timeout) throws IOException, IgniteCheckedException { |
| boolean add = msgIds.add(msg.id()); |
| |
| if (checkDuplicates && !add) { |
| log.error("Send duplicated message: " + msg); |
| |
| failed = true; |
| } |
| |
| super.writeToSocket(sock, msg, timeout); |
| } |
| } |
| |
| /** |
| * |
| */ |
| private static class TestCustomerEventAckSpi extends TcpDiscoverySpi { |
| /** */ |
| private volatile boolean stopBeforeSndAck; |
| |
| /** {@inheritDoc} */ |
| @Override protected void writeToSocket(Socket sock, |
| TcpDiscoveryAbstractMessage msg, |
| long timeout) throws IOException, IgniteCheckedException { |
| if (stopBeforeSndAck) { |
| if (msg instanceof TcpDiscoveryCustomEventMessage) { |
| try { |
| DiscoveryCustomMessage custMsg = GridTestUtils.getFieldValue( |
| ((TcpDiscoveryCustomEventMessage)msg).message(marsh), "delegate"); |
| |
| if (custMsg instanceof StartRoutineAckDiscoveryMessage) { |
| log.info("Skip message send and stop node: " + msg); |
| |
| sock.close(); |
| |
| GridTestUtils.runAsync(new Callable<Object>() { |
| @Override public Object call() throws Exception { |
| ignite.close(); |
| |
| return null; |
| } |
| }, "stop-node"); |
| |
| return; |
| } |
| } |
| catch (Throwable e) { |
| fail("Unexpected error: " + e); |
| } |
| } |
| } |
| |
| super.writeToSocket(sock, msg, timeout); |
| } |
| } |
| |
| /** |
| * Simulate scenario when node detects node failure trying to send message, but node still alive. |
| */ |
| private static class TestFailedNodesSpi extends TcpDiscoverySpi { |
| /** */ |
| private AtomicBoolean failMsg = new AtomicBoolean(); |
| |
| /** */ |
| private int failOrder; |
| |
| /** */ |
| private boolean stopBeforeSndFail; |
| |
| /** */ |
| private boolean stop; |
| |
| /** */ |
| private volatile boolean failSingleMsg; |
| |
| /** |
| * @param failOrder Spi fails connection if local node order equals to this order. |
| */ |
| TestFailedNodesSpi(int failOrder) { |
| this.failOrder = failOrder; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void writeToSocket(Socket sock, |
| TcpDiscoveryAbstractMessage msg, |
| long timeout) throws IOException, IgniteCheckedException { |
| if (stop) |
| return; |
| |
| if (failSingleMsg) { |
| failSingleMsg = false; |
| |
| log.info("IO error on message send [locNode=" + locNode + ", msg=" + msg + ']'); |
| |
| sock.close(); |
| |
| throw new SocketTimeoutException(); |
| } |
| |
| if (locNode.internalOrder() == failOrder && |
| (msg instanceof TcpDiscoveryNodeAddedMessage) && |
| failMsg.compareAndSet(false, true)) { |
| log.info("IO error on message send [locNode=" + locNode + ", msg=" + msg + ']'); |
| |
| sock.close(); |
| |
| throw new SocketTimeoutException(); |
| } |
| |
| if (stopBeforeSndFail && |
| locNode.internalOrder() == failOrder && |
| (msg instanceof TcpDiscoveryNodeFailedMessage)) { |
| stop = true; |
| |
| log.info("Skip messages send and stop node [locNode=" + locNode + ", msg=" + msg + ']'); |
| |
| sock.close(); |
| |
| GridTestUtils.runAsync(new Callable<Object>() { |
| @Override public Object call() throws Exception { |
| ignite.close(); |
| |
| return null; |
| } |
| }, "stop-node"); |
| |
| return; |
| } |
| |
| super.writeToSocket(sock, msg, timeout); |
| } |
| } |
| |
| /** |
| * |
| */ |
| private static class TestCustomEventCoordinatorFailureSpi extends TcpDiscoverySpi { |
| /** */ |
| private volatile CountDownLatch latch; |
| |
| /** */ |
| private boolean stop; |
| |
| /** {@inheritDoc} */ |
| @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, |
| long timeout) throws IOException, IgniteCheckedException { |
| if (msg instanceof TcpDiscoveryCustomEventMessage && latch != null) { |
| log.info("Stop node on custom event: " + msg); |
| |
| latch.countDown(); |
| |
| stop = true; |
| } |
| |
| if (stop) |
| return; |
| |
| super.writeToSocket(sock, msg, timeout); |
| } |
| } |
| |
| /** |
| * |
| */ |
| private static class TestCustomEventRaceSpi extends TcpDiscoverySpi { |
| /** */ |
| private volatile CountDownLatch nodeAdded1; |
| |
| /** */ |
| private volatile CountDownLatch nodeAdded2; |
| |
| /** */ |
| private volatile boolean stop; |
| |
| /** */ |
| private boolean debug; |
| |
| /** {@inheritDoc} */ |
| @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, |
| long timeout) throws IOException, IgniteCheckedException { |
| if (msg instanceof TcpDiscoveryNodeAddedMessage) { |
| if (nodeAdded1 != null) { |
| nodeAdded1.countDown(); |
| |
| if (debug) |
| log.info("--- Wait node added: " + msg); |
| |
| U.await(nodeAdded2); |
| |
| nodeAdded1 = null; |
| nodeAdded2 = null; |
| } |
| |
| if (stop) |
| return; |
| |
| if (debug) |
| log.info("--- Send node added: " + msg); |
| } |
| |
| if (debug && msg instanceof TcpDiscoveryNodeAddFinishedMessage) |
| log.info("--- Send node finished: " + msg); |
| |
| if (debug && msg instanceof TcpDiscoveryCustomEventMessage) |
| log.info("--- Send custom event: " + msg); |
| |
| super.writeToSocket(sock, msg, timeout); |
| } |
| } |
| |
| /** |
| * |
| */ |
| private static class TestMessageWorkerFailureSpi1 extends TcpDiscoverySpi { |
| /** */ |
| private volatile boolean stop; |
| |
| /** {@inheritDoc} */ |
| @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, |
| long timeout) throws IOException, IgniteCheckedException { |
| |
| if (stop) |
| throw new RuntimeException("Failing ring message worker explicitly"); |
| |
| super.writeToSocket(sock, msg, timeout); |
| } |
| } |
| |
| /** |
| * |
| */ |
| private static class TestMessageWorkerFailureSpi2 extends TcpDiscoverySpi { |
| /** */ |
| private volatile boolean stop; |
| |
| /** {@inheritDoc} */ |
| @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, |
| long timeout) throws IOException, IgniteCheckedException { |
| if (stop) |
| throw new RuntimeException("Failing ring message worker explicitly"); |
| |
| super.writeToSocket(sock, msg, timeout); |
| |
| if (msg instanceof TcpDiscoveryNodeAddedMessage) |
| stop = true; |
| } |
| } |
| |
| /** |
| * Starts new grid with given index. Method optimize is not invoked. |
| * |
| * @param idx Index of the grid to start. |
| * @return Started grid. |
| * @throws Exception If anything failed. |
| */ |
| private Ignite startGridNoOptimize(int idx) throws Exception { |
| return startGridNoOptimize(getTestGridName(idx)); |
| } |
| |
| /** |
| * Starts new grid with given name. Method optimize is not invoked. |
| * |
| * @param gridName Grid name. |
| * @return Started grid. |
| * @throws Exception If failed. |
| */ |
| private Ignite startGridNoOptimize(String gridName) throws Exception { |
| return G.start(getConfiguration(gridName)); |
| } |
| } |