/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.ignite.spi.discovery.tcp;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteIllegalStateException;
import org.apache.ignite.Ignition;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.processors.continuous.StartRoutineAckDiscoveryMessage;
import org.apache.ignite.internal.processors.port.GridPortRecord;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.discovery.DiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFailedMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeLeftMessage;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.eclipse.jetty.util.ConcurrentHashSet;
import org.jetbrains.annotations.Nullable;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.ignite.events.EventType.EVT_JOB_MAPPED;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.events.EventType.EVT_NODE_METRICS_UPDATED;
import static org.apache.ignite.events.EventType.EVT_TASK_FAILED;
import static org.apache.ignite.events.EventType.EVT_TASK_FINISHED;
import static org.apache.ignite.spi.IgnitePortProtocol.UDP;

/**
 * Test for {@link TcpDiscoverySpi}.
 */
public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
    /** */
    private TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);

    /** */
    private Map<String, TcpDiscoverySpi> discoMap = new HashMap<>();

    /** */
    private UUID nodeId;

    /** */
    private static ThreadLocal<TcpDiscoverySpi> nodeSpi = new ThreadLocal<>();

    /**
     * @throws Exception If fails.
     */
    public TcpDiscoverySelfTest() throws Exception {
        super(false);
    }

    /** {@inheritDoc} */
    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
        IgniteConfiguration cfg = super.getConfiguration(gridName);

        TcpDiscoverySpi spi = nodeSpi.get();

        if (spi == null) {
            spi = gridName.contains("testPingInterruptedOnNodeFailedFailingNode") ?
                new TestTcpDiscoverySpi() : new TcpDiscoverySpi();
        }
        else
            nodeSpi.set(null);

        discoMap.put(gridName, spi);

        spi.setIpFinder(ipFinder);

        spi.setNetworkTimeout(2500);

        spi.setHeartbeatFrequency(1000);

        spi.setMaxMissedHeartbeats(3);

        spi.setIpFinderCleanFrequency(5000);

        spi.setJoinTimeout(5000);

        cfg.setDiscoverySpi(spi);

        cfg.setCacheConfiguration();

        cfg.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED);

        cfg.setIncludeProperties();

        if (!gridName.contains("LoopbackProblemTest"))
            cfg.setLocalHost("127.0.0.1");

        if (gridName.contains("testFailureDetectionOnNodePing")) {
            spi.setReconnectCount(1); // To make test faster: on Windows 1 connect takes 1 second.
            spi.setHeartbeatFrequency(40000);
        }

        cfg.setConnectorConfiguration(null);

        if (nodeId != null)
            cfg.setNodeId(nodeId);

        if (gridName.contains("NonSharedIpFinder")) {
            TcpDiscoveryVmIpFinder finder = new TcpDiscoveryVmIpFinder();

            finder.setAddresses(Arrays.asList("127.0.0.1:47501"));

            spi.setIpFinder(finder);
        }
        else if (gridName.contains("MulticastIpFinder")) {
            TcpDiscoveryMulticastIpFinder finder = new TcpDiscoveryMulticastIpFinder();

            finder.setAddressRequestAttempts(10);
            finder.setMulticastGroup(GridTestUtils.getNextMulticastGroup(getClass()));
            finder.setMulticastPort(GridTestUtils.getNextMulticastPort(getClass()));

            spi.setIpFinder(finder);

            // Loopback multicast discovery is not working on Mac OS
            // (possibly due to http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=7122846).
            if (U.isMacOs())
                spi.setLocalAddress(F.first(U.allLocalIps()));
        }
        else if (gridName.contains("testPingInterruptedOnNodeFailedPingingNode"))
            cfg.setFailureDetectionTimeout(30_000);

        return cfg;
    }

    /** {@inheritDoc} */
    @Override protected void afterTest() throws Exception {
        discoMap = null;

        super.afterTest();
    }

    /**
     * @throws Exception If any error occurs.
     */
    public void testSingleNodeStartStop() throws Exception {
        try {
            startGrid(1);
        }
        finally {
            stopGrid(1);
        }
    }

    /**
     * @throws Exception If any error occurs.
     */
    public void testThreeNodesStartStop() throws Exception {
        try {
            IgniteEx ignite1 = startGrid(1);
            IgniteEx ignite2 = startGrid(2);
            IgniteEx ignite3 = startGrid(3);

            TcpDiscoverySpi spi1 = (TcpDiscoverySpi)ignite1.configuration().getDiscoverySpi();
            TcpDiscoverySpi spi2 = (TcpDiscoverySpi)ignite2.configuration().getDiscoverySpi();
            TcpDiscoverySpi spi3 = (TcpDiscoverySpi)ignite3.configuration().getDiscoverySpi();

            TcpDiscoveryNode node = (TcpDiscoveryNode)spi1.getNode(ignite2.localNode().id());

            assertNotNull(node);
            assertNotNull(node.lastSuccessfulAddress());

            LinkedHashSet<InetSocketAddress> addrs = spi1.getNodeAddresses(node);

            assertEquals(addrs.iterator().next(), node.lastSuccessfulAddress());

            assertTrue(spi1.pingNode(ignite3.localNode().id()));

            node = (TcpDiscoveryNode)spi1.getNode(ignite3.localNode().id());

            assertNotNull(node);
            assertNotNull(node.lastSuccessfulAddress());

            addrs = spi1.getNodeAddresses(node);
            assertEquals(addrs.iterator().next(), node.lastSuccessfulAddress());

            node = (TcpDiscoveryNode)spi2.getNode(ignite1.localNode().id());

            assertNotNull(node);
            assertNotNull(node.lastSuccessfulAddress());

            node = (TcpDiscoveryNode)spi2.getNode(ignite3.localNode().id());

            assertNotNull(node);
            assertNotNull(node.lastSuccessfulAddress());

            node = (TcpDiscoveryNode)spi3.getNode(ignite1.localNode().id());

            assertNotNull(node);
            assertNotNull(node.lastSuccessfulAddress());
        }
        finally {
            stopAllGrids();
        }
    }

    /**
     * @throws Exception If any errors occur.
     */
    public void testNodeConnectMessageSize() throws Exception {
        try {
            Ignite g1 = startGrid(1);

            final AtomicInteger gridNameIdx = new AtomicInteger(1);

            GridTestUtils.runMultiThreaded(new Callable<Object>() {
                @Nullable @Override public Object call() throws Exception {
                    startGrid(gridNameIdx.incrementAndGet());

                    return null;
                }
            }, 4, "grid-starter");

            Collection<TcpDiscoveryNode> nodes = ((ServerImpl)discoMap.get(g1.name()).impl).ring().allNodes();

            ByteArrayOutputStream bos = new ByteArrayOutputStream();

            g1.configuration().getMarshaller().marshal(nodes, bos);

            info(">>> Approximate node connect message size [topSize=" + nodes.size() +
                ", msgSize=" + bos.size() / 1024.0 + "KB]");
        }
        finally {
            stopAllGrids(false);
        }
    }

    /**
     * @throws Exception If any error occurs.
     */
    public void testPing() throws Exception {
        try {
            startGrid(1);
            startGrid(2);
            startGrid(3);

            info("Nodes were started");

            for (Map.Entry<String, TcpDiscoverySpi> e : discoMap.entrySet()) {
                DiscoverySpi spi = e.getValue();

                for (Ignite g : G.allGrids()) {
                    boolean res = spi.pingNode(g.cluster().localNode().id());

                    assert res : e.getKey() + " failed to ping " + g.cluster().localNode().id() + " of " + g.name();

                    info(e.getKey() + " pinged " + g.cluster().localNode().id() + " of " + g.name());
                }
            }

            info("All nodes pinged successfully.");
        }
        finally {
            stopAllGrids();
        }
    }

    /**
     * @throws Exception If any error occurs.
     */
    public void testFailureDetectionOnNodePing1() throws Exception {
        try {
            Ignite g1 = startGrid("testFailureDetectionOnNodePingCoordinator");
            startGrid("testFailureDetectionOnNodePing2");
            Ignite g3 = startGrid("testFailureDetectionOnNodePing3");

            testFailureDetectionOnNodePing(g1, g3);
        }
        finally {
            stopAllGrids();
        }
    }

    /**
     * @throws Exception If any error occurs.
     */
    public void testFailureDetectionOnNodePing2() throws Exception {
        try {
            startGrid("testFailureDetectionOnNodePingCoordinator");
            Ignite g2 = startGrid("testFailureDetectionOnNodePing2");
            Ignite g3 = startGrid("testFailureDetectionOnNodePing3");

            testFailureDetectionOnNodePing(g3, g2);
        }
        finally {
            stopAllGrids();
        }
    }

    /**
     * @throws Exception If any error occurs.
     */
    public void testFailureDetectionOnNodePing3() throws Exception {
        try {
            Ignite g1 = startGrid("testFailureDetectionOnNodePingCoordinator");
            Ignite g2 = startGrid("testFailureDetectionOnNodePing2");
            startGrid("testFailureDetectionOnNodePing3");

            testFailureDetectionOnNodePing(g2, g1);
        }
        finally {
            stopAllGrids();
        }
    }

    /**
     * @throws Exception If any error occurs.
     */
    private void testFailureDetectionOnNodePing(Ignite pingingNode, Ignite failedNode) throws Exception {
        final CountDownLatch cnt = new CountDownLatch(1);

        final UUID failedNodeId = failedNode.cluster().localNode().id();

        pingingNode.events().localListen(
            new IgnitePredicate<Event>() {
                @Override public boolean apply(Event evt) {
                    cnt.countDown();

                    return true;
                }
            },
            EventType.EVT_NODE_FAILED
        );

        info("Nodes were started");

        discoMap.get(failedNode.name()).simulateNodeFailure();

        TcpDiscoverySpi spi = discoMap.get(pingingNode.name());

        boolean res = spi.pingNode(failedNodeId);

        assertFalse("Ping is ok for node " + failedNodeId + ", but had to fail.", res);

        // Heartbeat interval is 40 seconds, but we should detect node failure faster.
        assert cnt.await(7, SECONDS);
    }

    /**
     * @throws Exception If any error occurs.
     */
    public void testPingInterruptedOnNodeFailed() throws Exception {
        try {
            final Ignite pingingNode = startGrid("testPingInterruptedOnNodeFailedPingingNode");
            final Ignite failedNode = startGrid("testPingInterruptedOnNodeFailedFailingNode");
            startGrid("testPingInterruptedOnNodeFailedSimpleNode");

            ((TestTcpDiscoverySpi)failedNode.configuration().getDiscoverySpi()).ignorePingResponse = true;

            final UUID failedNodeId = failedNode.cluster().localNode().id();

            final CountDownLatch pingLatch = new CountDownLatch(1);

            final CountDownLatch eventLatch = new CountDownLatch(1);

            final AtomicBoolean pingRes = new AtomicBoolean(true);

            final AtomicBoolean failRes = new AtomicBoolean(false);

            long startTs = System.currentTimeMillis();

            pingingNode.events().localListen(
                new IgnitePredicate<Event>() {
                    @Override public boolean apply(Event event) {
                        if (((DiscoveryEvent)event).eventNode().id().equals(failedNodeId)) {
                            failRes.set(true);
                            eventLatch.countDown();
                        }

                        return true;
                    }
                },
                EventType.EVT_NODE_FAILED);

            IgniteInternalFuture<?> pingFut = multithreadedAsync(
                new Callable<Object>() {
                    @Override public Object call() throws Exception {
                        pingLatch.countDown();

                        pingRes.set(pingingNode.configuration().getDiscoverySpi().pingNode(
                            failedNodeId));

                        return null;
                    }
                }, 1);

            IgniteInternalFuture<?> failingFut = multithreadedAsync(
                new Callable<Object>() {
                    @Override public Object call() throws Exception {
                        pingLatch.await();

                        Thread.sleep(3000);

                        ((TestTcpDiscoverySpi)failedNode.configuration().getDiscoverySpi()).simulateNodeFailure();

                        return null;
                    }
                }, 1);

            failingFut.get();
            pingFut.get();

            assertFalse(pingRes.get());

            assertTrue(System.currentTimeMillis() - startTs <
                pingingNode.configuration().getFailureDetectionTimeout() / 2);

            assertTrue(eventLatch.await(7, TimeUnit.SECONDS));
            assertTrue(failRes.get());
        }
        finally {
            stopAllGrids();
        }
    }

    /**
     * @throws Exception If any error occurs.
     */
    public void testPingInterruptedOnNodeLeft() throws Exception {
        try {
            final Ignite pingingNode = startGrid("testPingInterruptedOnNodeFailedPingingNode");
            final Ignite leftNode = startGrid("testPingInterruptedOnNodeFailedFailingNode");
            startGrid("testPingInterruptedOnNodeFailedSimpleNode");

            ((TestTcpDiscoverySpi)leftNode.configuration().getDiscoverySpi()).ignorePingResponse = true;

            final CountDownLatch pingLatch = new CountDownLatch(1);

            final AtomicBoolean pingRes = new AtomicBoolean(true);

            long startTs = System.currentTimeMillis();

            IgniteInternalFuture<?> pingFut = multithreadedAsync(
                new Callable<Object>() {
                    @Override public Object call() throws Exception {
                        pingLatch.countDown();

                        pingRes.set(pingingNode.configuration().getDiscoverySpi().pingNode(
                            leftNode.cluster().localNode().id()));

                        return null;
                    }
                }, 1);

            IgniteInternalFuture<?> stoppingFut = multithreadedAsync(
                new Callable<Object>() {
                    @Override public Object call() throws Exception {
                        pingLatch.await();

                        Thread.sleep(3000);

                        stopGrid("testPingInterruptedOnNodeFailedFailingNode");

                        return null;
                    }
                }, 1);

            stoppingFut.get();
            pingFut.get();

            assertFalse(pingRes.get());

            assertTrue(System.currentTimeMillis() - startTs <
                pingingNode.configuration().getFailureDetectionTimeout() / 2);
        }
        finally {
            stopAllGrids();
        }
    }

    /**
     * @throws Exception If any error occurs.
     */
    public void testNodeAdded() throws Exception {
        try {
            final Ignite g1 = startGrid(1);

            final CountDownLatch cnt = new CountDownLatch(2);

            g1.events().localListen(
                new IgnitePredicate<Event>() {
                    @Override public boolean apply(Event evt) {
                        info("Node joined: " + evt.message());

                        DiscoveryEvent discoEvt = (DiscoveryEvent)evt;

                        TcpDiscoveryNode node = ((TcpDiscoveryNode)discoMap.get(g1.name()).
                            getNode(discoEvt.eventNode().id()));

                        assert node != null && node.visible();

                        cnt.countDown();

                        return true;
                    }
                },
                EventType.EVT_NODE_JOINED
            );

            startGrid(2);
            startGrid(3);

            info("Nodes were started");

            assert cnt.await(1, SECONDS);
        }
        finally {
            stopAllGrids();
        }
    }

    /**
     * @throws Exception If any error occurs.
     */
    public void testOrdinaryNodeLeave() throws Exception {
        try {
            Ignite g1 = startGrid(1);
            startGrid(2);
            startGrid(3);

            final CountDownLatch cnt = new CountDownLatch(2);

            g1.events().localListen(
                new IgnitePredicate<Event>() {
                    @Override public boolean apply(Event evt) {
                        cnt.countDown();

                        return true;
                    }
                },
                EVT_NODE_LEFT, EVT_NODE_FAILED);

            info("Nodes were started");

            stopGrid(3);
            stopGrid(2);

            boolean res = cnt.await(1, SECONDS);

            assert res;
        }
        finally {
            stopAllGrids();
        }
    }

    /**
     * @throws Exception If any error occurs.
     */
    public void testCoordinatorNodeLeave() throws Exception {
        try {
            startGrid(1);
            Ignite g2 = startGrid(2);

            final CountDownLatch cnt = new CountDownLatch(1);

            g2.events().localListen(new IgnitePredicate<Event>() {
                @Override public boolean apply(Event evt) {
                    cnt.countDown();

                    return true;
                }
            }, EVT_NODE_LEFT, EVT_NODE_FAILED);

            info("Nodes were started");

            stopGrid(1);

            assert cnt.await(1, SECONDS);

            // Start new grid, ensure that added to topology
            final CountDownLatch cnt2 = new CountDownLatch(1);

            g2.events().localListen(new IgnitePredicate<Event>() {
                @Override public boolean apply(Event evt) {
                    cnt2.countDown();

                    return true;
                }
            }, EVT_NODE_JOINED);

            startGrid(3);

            assert cnt2.await(1, SECONDS);
        }
        finally {
            stopAllGrids();
        }
    }

    /**
     * @throws Exception If any error occurs.
     */
    public void testOrdinaryNodeFailure() throws Exception {
        try {
            Ignite g1 = startGrid(1);
            Ignite g2 = startGrid(2);
            Ignite g3 = startGrid(3);

            final CountDownLatch cnt = new CountDownLatch(2);

            g1.events().localListen(
                new IgnitePredicate<Event>() {
                    @Override public boolean apply(Event evt) {
                        cnt.countDown();

                        return true;
                    }
                },
                EventType.EVT_NODE_FAILED
            );

            info("Nodes were started");

            discoMap.get(g2.name()).simulateNodeFailure();
            discoMap.get(g3.name()).simulateNodeFailure();

            assert cnt.await(25, SECONDS);
        }
        finally {
            stopAllGrids();
        }
    }

    /**
     * @throws Exception If any error occurs.
     */
    public void testCoordinatorNodeFailure() throws Exception {
        try {
            Ignite g1 = startGrid(1);
            Ignite g2 = startGrid(2);

            final CountDownLatch cnt = new CountDownLatch(1);

            g2.events().localListen(new IgnitePredicate<Event>() {
                @Override public boolean apply(Event evt) {
                    cnt.countDown();

                    return true;
                }
            }, EventType.EVT_NODE_FAILED);

            info("Nodes were started");

            discoMap.get(g1.name()).simulateNodeFailure();

            assert cnt.await(20, SECONDS);
        }
        finally {
            stopAllGrids();
        }
    }

    /**
     * @throws Exception If any error occurs.
     */
    public void testMetricsSending() throws Exception {
        final AtomicBoolean stopping = new AtomicBoolean();

        try {
            final CountDownLatch latch1 = new CountDownLatch(1);

            final Ignite g1 = startGrid(1);

            IgnitePredicate<Event> lsnr1 = new IgnitePredicate<Event>() {
                @Override public boolean apply(Event evt) {
                    info(evt.message());

                    latch1.countDown();

                    return true;
                }
            };

            g1.events().localListen(lsnr1, EVT_NODE_METRICS_UPDATED);

            assert latch1.await(10, SECONDS);

            g1.events().stopLocalListen(lsnr1);

            final CountDownLatch latch1_1 = new CountDownLatch(1);
            final CountDownLatch latch1_2 = new CountDownLatch(1);
            final CountDownLatch latch2_1 = new CountDownLatch(1);
            final CountDownLatch latch2_2 = new CountDownLatch(1);

            final Ignite g2 = startGrid(2);

            g2.events().localListen(
                new IgnitePredicate<Event>() {
                    @Override public boolean apply(Event evt) {
                        if (stopping.get())
                            return true;

                        info(evt.message());

                        UUID id = ((DiscoveryEvent) evt).eventNode().id();

                        if (id.equals(g1.cluster().localNode().id()))
                            latch2_1.countDown();
                        else if (id.equals(g2.cluster().localNode().id()))
                            latch2_2.countDown();
                        else
                            assert false : "Event fired for unknown node.";

                        return true;
                    }
                },
                EVT_NODE_METRICS_UPDATED
            );

            g1.events().localListen(new IgnitePredicate<Event>() {
                @Override public boolean apply(Event evt) {
                    if (stopping.get())
                        return true;

                    info(evt.message());

                    UUID id = ((DiscoveryEvent) evt).eventNode().id();

                    if (id.equals(g1.cluster().localNode().id()))
                        latch1_1.countDown();
                    else if (id.equals(g2.cluster().localNode().id()))
                        latch1_2.countDown();
                    else
                        assert false : "Event fired for unknown node.";

                    return true;
                }
            }, EVT_NODE_METRICS_UPDATED);

            assert latch1_1.await(10, SECONDS);
            assert latch1_2.await(10, SECONDS);
            assert latch2_1.await(10, SECONDS);
            assert latch2_2.await(10, SECONDS);
        }
        finally {
            stopping.set(true);

            stopAllGrids();
        }
    }

    /**
     * @throws Exception If any error occurs.
     */
    public void testFailBeforeNodeAddedSent() throws Exception {
        try {
            Ignite g1 = startGrid(1);

            final CountDownLatch joinCnt = new CountDownLatch(2);
            final CountDownLatch failCnt = new CountDownLatch(1);

            g1.events().localListen(new IgnitePredicate<Event>() {
                @Override public boolean apply(Event evt) {
                    if (evt.type() == EVT_NODE_JOINED)
                        joinCnt.countDown();
                    else if (evt.type() == EVT_NODE_FAILED)
                        failCnt.countDown();
                    else
                        assert false : "Unexpected event type: " + evt;

                    return true;
                }
            }, EVT_NODE_JOINED, EVT_NODE_FAILED);

            final Ignite g = startGrid("FailBeforeNodeAddedSentSpi");

            discoMap.get(g.name()).addSendMessageListener(new IgniteInClosure<TcpDiscoveryAbstractMessage>() {
                @Override public void apply(TcpDiscoveryAbstractMessage msg) {
                    if (msg instanceof TcpDiscoveryNodeAddedMessage) {
                        discoMap.get(g.name()).simulateNodeFailure();

                        throw new RuntimeException("Avoid message sending: " + msg.getClass());
                    }
                }
            });

            startGrid(3);

            assert joinCnt.await(10, SECONDS);
            assert failCnt.await(10, SECONDS);
        }
        finally {
            stopAllGrids();
        }
    }

    /**
     * @throws Exception If any error occurs.
     */
    public void testFailBeforeNodeLeftSent() throws Exception {
        try {
            startGrid(1);
            startGrid(2);

            final Ignite g = startGrid("FailBeforeNodeLeftSentSpi");

            discoMap.get(g.name()).addSendMessageListener(new IgniteInClosure<TcpDiscoveryAbstractMessage>() {
                @Override public void apply(TcpDiscoveryAbstractMessage msg) {
                    if (msg instanceof TcpDiscoveryNodeLeftMessage) {
                        discoMap.get(g.name()).simulateNodeFailure();

                        throw new RuntimeException("Avoid message sending: " + msg.getClass());
                    }
                }
            });

            Ignite g3 = startGrid(3);

            final CountDownLatch cnt = new CountDownLatch(1);

            g3.events().localListen(new IgnitePredicate<Event>() {
                @Override public boolean apply(Event evt) {
                    cnt.countDown();

                    return true;
                }
            }, EVT_NODE_FAILED);

            stopGrid(1);

            assert cnt.await(20, SECONDS);
        }
        finally {
            stopAllGrids();
        }
    }

    /**
     * @throws Exception If any error occurs.
     */
    public void testIpFinderCleaning() throws Exception {
        try {
            ipFinder.registerAddresses(Arrays.asList(new InetSocketAddress("1.1.1.1", 1024),
                new InetSocketAddress("1.1.1.2", 1024)));

            Ignite g1 = startGrid(1);

            long failureDetectTimeout = g1.configuration().getFailureDetectionTimeout();

            long timeout = (long)(discoMap.get(g1.name()).getIpFinderCleanFrequency() * 1.5) + failureDetectTimeout;

            GridTestUtils.waitForCondition(new GridAbsPredicate() {
                @Override public boolean apply() {
                    return ipFinder.getRegisteredAddresses().size() == 1;
                }
            }, timeout);

            if (ipFinder.getRegisteredAddresses().size() != 1) {
                log.error("Failed to wait for IP cleanup, will dump threads.");

                U.dumpThreads(log);
            }

            assert ipFinder.getRegisteredAddresses().size() == 1 : "ipFinder=" + ipFinder.getRegisteredAddresses();

            // Check that missing addresses are returned back.
            ipFinder.unregisterAddresses(ipFinder.getRegisteredAddresses()); // Unregister valid address.

            ipFinder.registerAddresses(Arrays.asList(new InetSocketAddress("1.1.1.1", 1024),
                new InetSocketAddress("1.1.1.2", 1024)));

            GridTestUtils.waitForCondition(new GridAbsPredicate() {
                @Override public boolean apply() {
                    return ipFinder.getRegisteredAddresses().size() == 1;
                }
            }, timeout);

            assert ipFinder.getRegisteredAddresses().size() == 1 : "ipFinder=" + ipFinder.getRegisteredAddresses();
        }
        finally {
            stopAllGrids();
        }
    }

    /**
     * @throws Exception If any error occurs.
     */
    public void testNonSharedIpFinder() throws Exception {
        try {
            GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
                @Override public Object call() throws Exception {
                    Thread.sleep(4000);

                    return startGrid("NonSharedIpFinder-2");
                }
            }, 1, "grid-starter");

            // This node should wait until any node "from ipFinder" appears, see log messages.
            Ignite g = startGrid("NonSharedIpFinder-1");

            assert g.cluster().localNode().order() == 2;
        }
        finally {
            stopAllGrids();
        }
    }

    /**
     * @throws Exception If any error occurs.
     */
    public void testMulticastIpFinder() throws Exception {
        try {
            for (int i = 0; i < 5; i++) {
                Ignite g = startGrid("MulticastIpFinder-" + i);

                assertEquals(i + 1, g.cluster().nodes().size());

                TcpDiscoverySpi spi = (TcpDiscoverySpi)g.configuration().getDiscoverySpi();

                TcpDiscoveryMulticastIpFinder ipFinder = (TcpDiscoveryMulticastIpFinder)spi.getIpFinder();

                boolean found = false;

                for (GridPortRecord rec : ((IgniteKernal) g).context().ports().records()) {
                    if ((rec.protocol() == UDP) && rec.port() == ipFinder.getMulticastPort()) {
                        found = true;

                        break;
                    }
                }

                assertTrue("TcpDiscoveryMulticastIpFinder should register port." , found);
            }
        }
        finally {
            stopAllGrids();
        }
    }

    /**
     * @throws Exception If any error occurs.
     */
    public void testInvalidAddressIpFinder() throws Exception {
        ipFinder.setShared(false);

        ipFinder.setAddresses(Collections.singletonList("some-host"));

        try {
            GridTestUtils.assertThrows(
                log,
                new Callable<Object>() {
                    @Nullable @Override public Object call() throws Exception {
                        startGrid(1);

                        return null;
                    }
                },
                IgniteCheckedException.class,
                null);
        }
        finally {
            stopAllGrids();
        }
    }

    /**
     * @throws Exception If any error occurs.
     */
    public void testJoinTimeout() throws Exception {
        try {
            // This start will fail as expected.
            Throwable t = GridTestUtils.assertThrows(log, new Callable<Object>() {
                @Override public Object call() throws Exception {
                    startGrid("NonSharedIpFinder-1");

                    return null;
                }
            }, IgniteCheckedException.class, null);

            assert X.hasCause(t, IgniteSpiException.class) : "Unexpected exception: " + t;
        }
        finally {
            stopAllGrids();
        }
    }

    /**
     * @throws Exception If failed.
     */
    public void testDirtyIpFinder() throws Exception {
        try {
            // Dirty IP finder
            for (int i = 47500; i < 47520; i++)
                ipFinder.registerAddresses(Arrays.asList(new InetSocketAddress("127.0.0.1", i),
                    new InetSocketAddress("unknown-host", i)));

            assert ipFinder.isShared();

            startGrid(1);
        }
        finally {
            stopAllGrids();
        }
    }

    /**
     * @throws Exception If any error occurs.
     */
    public void testDuplicateId() throws Exception {
        try {
            // Random ID.
            startGrid(1);

            nodeId = UUID.randomUUID();

            startGrid(2);

            // Duplicate ID.
            GridTestUtils.assertThrows(
                log,
                new Callable<Object>() {
                    @Nullable @Override public Object call() throws Exception {
                        // Exception will be thrown and output to log.
                        startGrid(3);

                        return null;
                    }
                },
                IgniteCheckedException.class,
                null);
        }
        finally {
            stopAllGrids();
        }
    }

    /**
     * @throws Exception If any error occurs.
     */
    public void testLoopbackProblemFirstNodeOnLoopback() throws Exception {
        // On Windows and Mac machines two nodes can reside on the same port
        // (if one node has localHost="127.0.0.1" and another has localHost="0.0.0.0").
        // So two nodes do not even discover each other.
        if (U.isWindows() || U.isMacOs())
            return;

        try {
            startGridNoOptimize(1);

            GridTestUtils.assertThrows(
                log,
                new Callable<Object>() {
                    @Nullable @Override public Object call() throws Exception {
                        // Exception will be thrown because we start node which does not use loopback address,
                        // but the first node does.
                        startGridNoOptimize("LoopbackProblemTest");

                        return null;
                    }
                },
                IgniteException.class,
                null);
        }
        finally {
            stopAllGrids();
        }
    }

    /**
     * @throws Exception If any error occurs.
     */
    public void testLoopbackProblemSecondNodeOnLoopback() throws Exception {
        if (U.isWindows() || U.isMacOs())
            return;

        try {
            startGridNoOptimize("LoopbackProblemTest");

            GridTestUtils.assertThrows(
                log,
                new Callable<Object>() {
                    @Nullable @Override public Object call() throws Exception {
                        // Exception will be thrown because we start node which uses loopback address,
                        // but the first node does not.
                        startGridNoOptimize(1);

                        return null;
                    }
                },
                IgniteException.class,
                null);
        }
        finally {
            stopAllGrids();
        }
    }

    /**
     * @throws Exception If any error occurs.
     */
    public void testGridStartTime() throws Exception {
        try {
            startGridsMultiThreaded(5);

            Long startTime = null;

            IgniteKernal firstGrid = null;

            Collection<IgniteKernal> grids = new ArrayList<>();

            for (int i = 0; i < 5 ; i++) {
                IgniteKernal grid = (IgniteKernal)grid(i);

                assertTrue(grid.context().discovery().gridStartTime() > 0);

                if (i > 0)
                    assertEquals(startTime, (Long)grid.context().discovery().gridStartTime());
                else
                    startTime = grid.context().discovery().gridStartTime();

                if (grid.localNode().order() == 1)
                    firstGrid = grid;
                else
                    grids.add(grid);
            }

            assertNotNull(firstGrid);

            stopGrid(firstGrid.name());

            for (IgniteKernal grid : grids)
                assertEquals(startTime, (Long)grid.context().discovery().gridStartTime());

            grids.add((IgniteKernal)startGrid(5));

            for (IgniteKernal grid : grids)
                assertEquals(startTime, (Long)grid.context().discovery().gridStartTime());
        }
        finally {
            stopAllGrids();
        }
    }

    /**
     * @throws Exception If failed
     */
    public void testCustomEventRace1_1() throws Exception {
        try {
            customEventRace1(true, false);
        }
        finally {
            stopAllGrids();
        }
    }

    /**
     * @throws Exception If failed
     */
    public void testCustomEventRace1_2() throws Exception {
        try {
            customEventRace1(false, false);
        }
        finally {
            stopAllGrids();
        }
    }

    /**
     * @throws Exception If failed
     */
    public void testCustomEventRace1_3() throws Exception {
        try {
            customEventRace1(true, true);
        }
        finally {
            stopAllGrids();
        }
    }

    /**
     * @param cacheStartFrom1 If {code true} starts cache from node1.
     * @param stopCrd If {@code true} stops coordinator.
     * @throws Exception If failed
     */
    private void customEventRace1(final boolean cacheStartFrom1, boolean stopCrd) throws Exception {
        TestCustomEventRaceSpi spi0 = new TestCustomEventRaceSpi();

        nodeSpi.set(spi0);

        final Ignite ignite0 = startGrid(0);

        nodeSpi.set(new TestCustomEventRaceSpi());

        final Ignite ignite1 = startGrid(1);

        CountDownLatch latch1 = new CountDownLatch(1);
        CountDownLatch latch2 = new CountDownLatch(1);

        spi0.nodeAdded1 = latch1;
        spi0.nodeAdded2 = latch2;
        spi0.debug = true;

        IgniteInternalFuture<?> fut1 = GridTestUtils.runAsync(new Callable<Void>() {
            @Override public Void call() throws Exception {
                log.info("Start 2");

                nodeSpi.set(new TestCustomEventRaceSpi());

                Ignite ignite2 = startGrid(2);

                return null;
            }
        });

        latch1.await();

        final String CACHE_NAME = "cache";

        IgniteInternalFuture<?> fut2 = GridTestUtils.runAsync(new Callable<Void>() {
            @Override public Void call() throws Exception {
                CacheConfiguration ccfg = new CacheConfiguration();

                ccfg.setName(CACHE_NAME);

                Ignite ignite = cacheStartFrom1 ? ignite1 : ignite0;

                ignite.createCache(ccfg);

                return null;
            }
        });

        if (stopCrd) {
            spi0.stop = true;

            latch2.countDown();

            ignite0.close();
        }
        else {
            U.sleep(500);

            latch2.countDown();
        }

        fut1.get();
        fut2.get();

        IgniteCache<Object, Object> cache = grid(2).cache(CACHE_NAME);

        assertNotNull(cache);

        cache.put(1, 1);

        assertEquals(1, cache.get(1));

        nodeSpi.set(new TestCustomEventRaceSpi());

        Ignite ignite = startGrid(3);

        cache = ignite.cache(CACHE_NAME);

        cache.put(2, 2);

        assertEquals(1, cache.get(1));
        assertEquals(2, cache.get(2));
    }

    /**
     * @throws Exception If failed
     */
    public void testCustomEventCoordinatorFailure1() throws Exception {
        try {
            customEventCoordinatorFailure(true);
        }
        finally {
            stopAllGrids();
        }
    }

    /**
     * @throws Exception If failed
     */
    public void testCustomEventCoordinatorFailure2() throws Exception {
        try {
            customEventCoordinatorFailure(false);
        }
        finally {
            stopAllGrids();
        }
    }

    /**
     * @throws Exception If failed
     */
    public void testNodeShutdownOnRingMessageWorkerFailure() throws Exception {
        try {
            TestMessageWorkerFailureSpi1 spi0 = new TestMessageWorkerFailureSpi1();

            nodeSpi.set(spi0);

            final Ignite ignite0 = startGrid(0);

            nodeSpi.set(new TcpDiscoverySpi());

            Ignite ignite1 = startGrid(1);

            final AtomicBoolean disconnected = new AtomicBoolean();

            final CountDownLatch latch = new CountDownLatch(1);

            final UUID failedNodeId = ignite0.cluster().localNode().id();

            ignite1.events().localListen(new IgnitePredicate<Event>() {
                @Override public boolean apply(Event evt) {
                    if (evt.type() == EventType.EVT_NODE_FAILED &&
                        failedNodeId.equals(((DiscoveryEvent)evt).eventNode().id()))
                        disconnected.set(true);

                    latch.countDown();

                    return false;
                }
            }, EventType.EVT_NODE_FAILED);

            spi0.stop = true;

            latch.await(15, TimeUnit.SECONDS);

            assertTrue(disconnected.get());

            try {
                ignite0.cluster().localNode().id();
            }
            catch (IllegalStateException e) {
                if (e.getMessage().contains("Grid is in invalid state to perform this operation"))
                    return;
            }

            fail();
        }
        finally {
            stopAllGrids();
        }
    }
    /**
     * @throws Exception If failed
     */
    public void testNodeShutdownOnRingMessageWorkerStartNotFinished() throws Exception {
        try {
            Ignite ignite0 = startGrid(0);

            TestMessageWorkerFailureSpi2 spi0 = new TestMessageWorkerFailureSpi2();

            nodeSpi.set(spi0);

            try {
                startGrid(1);

                fail();
            }
            catch (Exception e) {
                log.error("Expected error: " + e, e);
            }

            Ignite ignite1 = startGrid(1);

            assertEquals(2, ignite1.cluster().nodes().size());
            assertEquals(4, ignite1.cluster().topologyVersion());

            assertEquals(2, ignite0.cluster().nodes().size());
            assertEquals(4, ignite0.cluster().topologyVersion());
        }
        finally {
            stopAllGrids();
        }
    }


    /**
     * @param twoNodes If {@code true} starts two nodes, otherwise three.
     * @throws Exception If failed
     */
    private void customEventCoordinatorFailure(boolean twoNodes) throws Exception {
        TestCustomEventCoordinatorFailureSpi spi0 = new TestCustomEventCoordinatorFailureSpi();

        nodeSpi.set(spi0);

        Ignite ignite0 = startGrid(0);

        nodeSpi.set(new TestCustomEventCoordinatorFailureSpi());

        Ignite ignite1 = startGrid(1);

        nodeSpi.set(new TestCustomEventCoordinatorFailureSpi());

        Ignite ignite2 = twoNodes ? null : startGrid(2);

        final Ignite createCacheNode = ignite2 != null ? ignite2 : ignite1;

        CountDownLatch latch = new CountDownLatch(1);

        spi0.latch = latch;

        final String CACHE_NAME = "test-cache";

        IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() {
            @Override public Void call() throws Exception {
                log.info("Create test cache");

                CacheConfiguration ccfg = new CacheConfiguration();

                ccfg.setName(CACHE_NAME);

                createCacheNode.createCache(ccfg);

                return null;
            }
        }, "create-cache-thread");

        ((TcpCommunicationSpi)ignite0.configuration().getCommunicationSpi()).simulateNodeFailure();

        latch.await();

        ignite0.close();

        fut.get();

        IgniteCache<Object, Object> cache = grid(1).cache(CACHE_NAME);

        assertNotNull(cache);

        cache.put(1, 1);

        assertEquals(1, cache.get(1));

        log.info("Try start one more node.");

        nodeSpi.set(new TestCustomEventCoordinatorFailureSpi());

        Ignite ignite = startGrid(twoNodes ? 2 : 3);

        cache = ignite.cache(CACHE_NAME);

        assertNotNull(cache);

        cache.put(2, 2);

        assertEquals(1, cache.get(1));
        assertEquals(2, cache.get(2));
    }

    /**
     * Coordinator is added in failed list during node start.
     *
     * @throws Exception If failed.
     */
    public void testFailedNodes1() throws Exception {
        try {
            final int FAIL_ORDER = 3;

            nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER));

            final Ignite ignite0 = startGrid(0);

            nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER));

            startGrid(1);

            nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER));

            Ignite ignite2 = startGrid(2);

            assertEquals(2, ignite2.cluster().nodes().size());

            waitNodeStop(ignite0.name());

            tryCreateCache(2);
        }
        finally {
            stopAllGrids();
        }
    }

    /**
     * Coordinator is added in failed list, concurrent nodes start.
     *
     * @throws Exception If failed.
     */
    public void testFailedNodes2() throws Exception {
        try {
            final int FAIL_ORDER = 3;

            nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER));

            Ignite ignite0 = startGrid(0);

            nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER));

            startGrid(1);

            final AtomicInteger nodeIdx = new AtomicInteger(1);

            GridTestUtils.runMultiThreaded(new Callable<Void>() {
                @Override public Void call() throws Exception {
                    int idx = nodeIdx.incrementAndGet();

                    nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER));

                    startGrid(idx);

                    return null;
                }
            }, 3, "start-node");

            Ignite ignite2 = ignite(2);

            waitForRemoteNodes(ignite2, 3);

            waitNodeStop(ignite0.name());

            tryCreateCache(4);
        }
        finally {
            stopAllGrids();
        }
    }

    /**
     * Coordinator is added in failed list during node start, test with two nodes.
     *
     * @throws Exception If failed.
     */
    public void testFailedNodes3() throws Exception {
        try {
            nodeSpi.set(new TestFailedNodesSpi(-1));

            Ignite ignite0 = startGrid(0);

            nodeSpi.set(new TestFailedNodesSpi(2));

            Ignite ignite1 = startGrid(1);

            assertEquals(1, ignite1.cluster().nodes().size());

            waitNodeStop(ignite0.name());

            ignite1.getOrCreateCache(new CacheConfiguration<>()).put(1, 1);

            startGrid(2);

            assertEquals(2, ignite1.cluster().nodes().size());

            tryCreateCache(2);
        }
        finally {
            stopAllGrids();
        }
    }

    /**
     * Coordinator is added in failed list during node start, but node detected failure dies before
     * sending {@link TcpDiscoveryNodeFailedMessage}.
     *
     * @throws Exception If failed.
     */
    public void testFailedNodes4() throws Exception {
        try {
            final int FAIL_ORDER = 3;

            nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER));

            final Ignite ignite0 = startGrid(0);

            nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER));

            Ignite ignite1 = startGrid(1);

            TestFailedNodesSpi spi = new TestFailedNodesSpi(FAIL_ORDER);

            spi.stopBeforeSndFail = true;

            nodeSpi.set(spi);

            Ignite ignite2 = startGrid(2);

            waitNodeStop(ignite2.name());

            log.info("Try start new node.");

            Ignite ignite3 = startGrid(3);

            waitNodeStop(ignite0.name());

            assertEquals(2, ignite1.cluster().nodes().size());
            assertEquals(2, ignite3.cluster().nodes().size());

            tryCreateCache(2);
        }
        finally {
            stopAllGrids();
        }
    }

    /**
     * Adds some node in failed list after join process finished.
     *
     * @throws Exception If failed.
     */
    public void testFailedNodes5() throws Exception {
        try {
            ThreadLocalRandom rnd = ThreadLocalRandom.current();

            for (int iter = 0; iter < 3; iter++) {
                final int NODES = iter == 0 ? 2 : rnd.nextInt(3, 6);

                for (int i = 0; i < NODES; i++) {
                    nodeSpi.set(new TestFailedNodesSpi(-1));

                    startGrid(i);
                }

                Map<Long, Ignite> nodes = new HashMap<>();

                for (int i = 0; i < NODES; i++) {
                    Ignite ignite = ignite(i);

                    nodes.put(ignite.cluster().localNode().order(), ignite);
                }

                Ignite ignite = ignite(rnd.nextInt(NODES));

                log.info("Iteration [iter=" + iter + ", nodes=" + NODES + ", failFrom=" + ignite.name() + ']');

                TestFailedNodesSpi spi = (TestFailedNodesSpi)ignite.configuration().getDiscoverySpi();

                spi.failSingleMsg = true;

                long order = ignite.cluster().localNode().order();

                long nextOrder = order == NODES ? 1 : order + 1;

                Ignite failingNode = nodes.get(nextOrder);

                assertNotNull(failingNode);

                waitNodeStop(failingNode.name());

                Ignite newNode = startGrid(NODES);

                assertEquals(NODES, newNode.cluster().nodes().size());

                tryCreateCache(NODES);

                stopAllGrids();
            }
        }
        finally {
            stopAllGrids();
        }
    }

    /**
     * @throws Exception If failed.
     */
    public void testCustomEventAckNotSend() throws Exception {
        try {
            TestCustomerEventAckSpi spi0 = new TestCustomerEventAckSpi();

            nodeSpi.set(spi0);

            Ignite ignite0 = startGrid(0);

            nodeSpi.set(new TestCustomerEventAckSpi());

            Ignite ignite1 = startGrid(1);

            spi0.stopBeforeSndAck = true;

            ignite1.message().remoteListen("test", new DummyPredicate());

            waitNodeStop(ignite0.name());

            startGrid(2);
        }
        finally {
            stopAllGrids();
        }
    }

    /**
     * @throws Exception If failed.
     */
    public void testDiscoveryEventsDiscard() throws Exception {
        try {
            TestEventDiscardSpi spi = new TestEventDiscardSpi();

            nodeSpi.set(spi);

            Ignite ignite0 = startGrid(0);

            startGrid(1);

            ignite0.createCache(new CacheConfiguration<>()); // Send custom message.

            ignite0.destroyCache(null); // Send custom message.

            stopGrid(1);

            log.info("Start new node.");

            spi.checkDuplicates = true;

            startGrid(1);

            spi.checkDuplicates = false;

            assertFalse(spi.failed);
        }
        finally {
            stopAllGrids();
        }
    }

    /**
     * @param nodeName Node name.
     * @throws Exception If failed.
     */
    private void waitNodeStop(final String nodeName) throws Exception {
        boolean wait = GridTestUtils.waitForCondition(new GridAbsPredicate() {
            @Override public boolean apply() {
                try {
                    Ignition.ignite(nodeName);

                    return false;
                }
                catch (IgniteIllegalStateException e) {
                    return true;
                }
            }
        }, 10_000);

        if (!wait)
            U.dumpThreads(log);

        assertTrue("Failed to wait for node stop.", wait);
    }

    /**
     * @param expNodes Expected nodes number.
     */
    private void tryCreateCache(int expNodes) {
        List<Ignite> allNodes = G.allGrids();

        assertEquals(expNodes, allNodes.size());

        int cntr = 0;

        for (Ignite ignite : allNodes) {
            CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();

            ccfg.setName("cache-" + cntr++);

            log.info("Try create cache [node=" + ignite.name() + ", cache=" + ccfg.getName() + ']');

            ignite.getOrCreateCache(ccfg).put(1, 1);
        }
    }

    /**
     *
     */
    static class DummyPredicate implements IgniteBiPredicate<UUID, Object> {
        /** {@inheritDoc} */
        @Override public boolean apply(UUID uuid, Object o) {
            return true;
        }
    }


    /**
     *
     */
    private static class TestEventDiscardSpi extends TcpDiscoverySpi {
        /** */
        private ConcurrentHashSet<IgniteUuid> msgIds = new ConcurrentHashSet<>();

        /** */
        private volatile boolean checkDuplicates;

        /** */
        private volatile boolean failed;

        /** {@inheritDoc} */
        @Override protected void writeToSocket(Socket sock,
            TcpDiscoveryAbstractMessage msg,
            long timeout) throws IOException, IgniteCheckedException {
            boolean add = msgIds.add(msg.id());

            if (checkDuplicates && !add) {
                log.error("Send duplicated message: " + msg);

                failed = true;
            }

            super.writeToSocket(sock, msg, timeout);
        }
    }

    /**
     *
     */
    private static class TestCustomerEventAckSpi extends TcpDiscoverySpi {
        /** */
        private volatile boolean stopBeforeSndAck;

        /** {@inheritDoc} */
        @Override protected void writeToSocket(Socket sock,
            TcpDiscoveryAbstractMessage msg,
            long timeout) throws IOException, IgniteCheckedException {
            if (stopBeforeSndAck) {
                if (msg instanceof TcpDiscoveryCustomEventMessage) {
                    try {
                        DiscoveryCustomMessage custMsg = GridTestUtils.getFieldValue(
                            ((TcpDiscoveryCustomEventMessage)msg).message(marsh), "delegate");

                        if (custMsg instanceof StartRoutineAckDiscoveryMessage) {
                            log.info("Skip message send and stop node: " + msg);

                            sock.close();

                            GridTestUtils.runAsync(new Callable<Object>() {
                                @Override public Object call() throws Exception {
                                    ignite.close();

                                    return null;
                                }
                            }, "stop-node");

                            return;
                        }
                    }
                    catch (Throwable e) {
                        fail("Unexpected error: " + e);
                    }
                }
            }

            super.writeToSocket(sock, msg, timeout);
        }
    }

    /**
     * Simulate scenario when node detects node failure trying to send message, but node still alive.
     */
    private static class TestFailedNodesSpi extends TcpDiscoverySpi {
        /** */
        private AtomicBoolean failMsg = new AtomicBoolean();

        /** */
        private int failOrder;

        /** */
        private boolean stopBeforeSndFail;

        /** */
        private boolean stop;

        /** */
        private volatile boolean failSingleMsg;

        /**
         * @param failOrder Spi fails connection if local node order equals to this order.
         */
        TestFailedNodesSpi(int failOrder) {
            this.failOrder = failOrder;
        }

        /** {@inheritDoc} */
        @Override protected void writeToSocket(Socket sock,
            TcpDiscoveryAbstractMessage msg,
            long timeout) throws IOException, IgniteCheckedException {
            if (stop)
                return;

            if (failSingleMsg) {
                failSingleMsg = false;

                log.info("IO error on message send [locNode=" + locNode + ", msg=" + msg + ']');

                sock.close();

                throw new SocketTimeoutException();
            }

            if (locNode.internalOrder() == failOrder &&
                (msg instanceof TcpDiscoveryNodeAddedMessage) &&
                failMsg.compareAndSet(false, true)) {
                log.info("IO error on message send [locNode=" + locNode + ", msg=" + msg + ']');

                sock.close();

                throw new SocketTimeoutException();
            }

            if (stopBeforeSndFail &&
                locNode.internalOrder() == failOrder &&
                (msg instanceof TcpDiscoveryNodeFailedMessage)) {
                stop = true;

                log.info("Skip messages send and stop node [locNode=" + locNode + ", msg=" + msg + ']');

                sock.close();

                GridTestUtils.runAsync(new Callable<Object>() {
                    @Override public Object call() throws Exception {
                        ignite.close();

                        return null;
                    }
                }, "stop-node");

                return;
            }

            super.writeToSocket(sock, msg, timeout);
        }
    }

    /**
     *
     */
    private static class TestCustomEventCoordinatorFailureSpi extends TcpDiscoverySpi {
        /** */
        private volatile CountDownLatch latch;

        /** */
        private boolean stop;

        /** {@inheritDoc} */
        @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
            long timeout) throws IOException, IgniteCheckedException {
            if (msg instanceof TcpDiscoveryCustomEventMessage && latch != null) {
                log.info("Stop node on custom event: " + msg);

                latch.countDown();

                stop = true;
            }

            if (stop)
                return;

            super.writeToSocket(sock, msg, timeout);
        }
    }

    /**
     *
     */
    private static class TestCustomEventRaceSpi extends TcpDiscoverySpi {
        /** */
        private volatile CountDownLatch nodeAdded1;

        /** */
        private volatile CountDownLatch nodeAdded2;

        /** */
        private volatile boolean stop;

        /** */
        private boolean debug;

        /** {@inheritDoc} */
        @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
            long timeout) throws IOException, IgniteCheckedException {
            if (msg instanceof TcpDiscoveryNodeAddedMessage) {
                if (nodeAdded1 != null) {
                    nodeAdded1.countDown();

                    if (debug)
                        log.info("--- Wait node added: " + msg);

                    U.await(nodeAdded2);

                    nodeAdded1 = null;
                    nodeAdded2 = null;
                }

                if (stop)
                    return;

                if (debug)
                    log.info("--- Send node added: " + msg);
            }

            if (debug && msg instanceof TcpDiscoveryNodeAddFinishedMessage)
                log.info("--- Send node finished: " + msg);

            if (debug && msg instanceof TcpDiscoveryCustomEventMessage)
                log.info("--- Send custom event: " + msg);

            super.writeToSocket(sock, msg, timeout);
        }
    }

    /**
     *
     */
    private static class TestMessageWorkerFailureSpi1 extends TcpDiscoverySpi {
        /** */
        private volatile boolean stop;

        /** {@inheritDoc} */
        @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
            long timeout) throws IOException, IgniteCheckedException {

            if (stop)
                throw new RuntimeException("Failing ring message worker explicitly");

            super.writeToSocket(sock, msg, timeout);
        }
    }

    /**
     *
     */
    private static class TestMessageWorkerFailureSpi2 extends TcpDiscoverySpi {
        /** */
        private volatile boolean stop;

        /** {@inheritDoc} */
        @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
            long timeout) throws IOException, IgniteCheckedException {
            if (stop)
                throw new RuntimeException("Failing ring message worker explicitly");

            super.writeToSocket(sock, msg, timeout);

            if (msg instanceof TcpDiscoveryNodeAddedMessage)
                stop = true;
        }
    }

    /**
     * Starts new grid with given index. Method optimize is not invoked.
     *
     * @param idx Index of the grid to start.
     * @return Started grid.
     * @throws Exception If anything failed.
     */
    private Ignite startGridNoOptimize(int idx) throws Exception {
        return startGridNoOptimize(getTestGridName(idx));
    }

    /**
     * Starts new grid with given name. Method optimize is not invoked.
     *
     * @param gridName Grid name.
     * @return Started grid.
     * @throws Exception If failed.
     */
    private Ignite startGridNoOptimize(String gridName) throws Exception {
        return G.start(getConfiguration(gridName));
    }
}
