IGNITE-7163 Validate connection from a pre-previous node. - Fixes #4088.

Signed-off-by: Dmitriy Pavlov <dpavlov@apache.org>
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index bb76895..50bb383 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -251,6 +251,9 @@
     /** Discovery state. */
     protected TcpDiscoverySpiState spiState = DISCONNECTED;
 
+    /** Last time received message from ring. */
+    private volatile long lastRingMsgReceivedTime;
+
     /** Map with proceeding ping requests. */
     private final ConcurrentMap<InetSocketAddress, GridPingFutureAdapter<IgniteBiTuple<UUID, Boolean>>> pingMap =
         new ConcurrentHashMap<>();
@@ -313,11 +316,18 @@
     }
 
     /** {@inheritDoc} */
+    @Override public long connectionCheckInterval() {
+        return msgWorker.connCheckFreq;
+    }
+
+    /** {@inheritDoc} */
     @Override public void spiStart(String igniteInstanceName) throws IgniteSpiException {
         synchronized (mux) {
             spiState = DISCONNECTED;
         }
 
+        lastRingMsgReceivedTime = 0;
+
         utilityPool = new IgniteThreadPoolExecutor("disco-pool",
             spi.ignite().name(),
             0,
@@ -2726,8 +2736,8 @@
 
             assert connCheckFreq > 0;
 
-            if (log.isDebugEnabled())
-                log.debug("Connection check frequency is calculated: " + connCheckFreq);
+            if (log.isInfoEnabled())
+                log.info("Connection check frequency is calculated: " + connCheckFreq);
         }
 
         /**
@@ -2905,7 +2915,7 @@
 
             sendMessageToClients(msg);
 
-            Collection<TcpDiscoveryNode> failedNodes;
+            List<TcpDiscoveryNode> failedNodes;
 
             TcpDiscoverySpiState state;
 
@@ -2921,9 +2931,12 @@
 
             boolean newNextNode = false;
 
+            // Used only if spi.getEffectiveConnectionRecoveryTimeout > 0
+            CrossRingMessageSendState sndState = null;
+
             UUID locNodeId = getLocalNodeId();
 
-            while (true) {
+            ringLoop: while (true) {
                 TcpDiscoveryNode newNext = ring.nextNode(failedNodes);
 
                 if (newNext == null) {
@@ -2947,6 +2960,8 @@
                     if (log.isDebugEnabled())
                         log.debug("New next node [newNext=" + newNext + ", formerNext=" + next +
                             ", ring=" + ring + ", failedNodes=" + failedNodes + ']');
+                    else if (log.isInfoEnabled())
+                        log.info("New next node [newNext=" + newNext + ']');
 
                     if (debugMode)
                         debugLog(msg, "New next node [newNext=" + newNext + ", formerNext=" + next +
@@ -3003,12 +3018,52 @@
                                 openSock = true;
 
                                 // Handshake.
-                                spi.writeToSocket(sock, out, new TcpDiscoveryHandshakeRequest(locNodeId),
+                                TcpDiscoveryHandshakeRequest hndMsg = new TcpDiscoveryHandshakeRequest(locNodeId);
+
+                                // Topology treated as changes if next node is not available.
+                                hndMsg.changeTopology(sndState != null && !sndState.isStartingPoint());
+
+                                if (log.isDebugEnabled())
+                                    log.debug("Sending handshake [hndMsg=" + hndMsg + ", sndState=" + sndState + ']');
+
+                                spi.writeToSocket(sock, out, hndMsg,
                                     timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
 
                                 TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null,
                                     timeoutHelper.nextTimeoutChunk(ackTimeout0));
 
+                                if (log.isDebugEnabled())
+                                    log.debug("Handshake response: " + res);
+
+                                if (res.previousNodeAlive() && sndState != null) {
+                                    // Remote node checked connection to it's previous and got success.
+                                    boolean previousNode = sndState.markLastFailedNodeAlive();
+
+                                    if (previousNode)
+                                        failedNodes.remove(failedNodes.size() - 1);
+                                    else {
+                                        newNextNode = false;
+
+                                        next = ring.nextNode(failedNodes);
+                                    }
+
+                                    U.closeQuiet(sock);
+
+                                    sock = null;
+
+                                    if (sndState.isFailed()) {
+                                        segmentLocalNodeOnSendFail();
+
+                                        return; // Nothing to do here.
+                                    }
+
+                                    if (previousNode)
+                                        U.warn(log, "New next node has connection to it's previous, trying previous " +
+                                            "again. [next=" + next + ']');
+
+                                    continue ringLoop;
+                                }
+
                                 if (locNodeId.equals(res.creatorNodeId())) {
                                     if (log.isDebugEnabled())
                                         log.debug("Handshake response from local node: " + res);
@@ -3298,7 +3353,12 @@
                 } // Iterating node's addresses.
 
                 if (!sent) {
-                    if (!failedNodes.contains(next)) {
+                    if (sndState == null && spi.getEffectiveConnectionRecoveryTimeout() > 0)
+                        sndState = new CrossRingMessageSendState();
+
+                    boolean failedNextNode = sndState == null || sndState.markNextNodeFailed();
+
+                    if (failedNextNode && !failedNodes.contains(next)) {
                         failedNodes.add(next);
 
                         if (state == CONNECTED) {
@@ -3313,6 +3373,26 @@
                                 ", errMsg=" + (err != null ? err.getMessage() : "N/A") + ']');
                         }
                     }
+                    else if (!failedNextNode && sndState != null && sndState.isBackward()) {
+                        boolean prev = sndState.markLastFailedNodeAlive();
+
+                        U.warn(log, "Failed to send message to next node, try previous [msg=" + msg +
+                            ", next=" + next + ']');
+
+                        if (prev)
+                            failedNodes.remove(failedNodes.size() - 1);
+                        else {
+                            newNextNode = false;
+
+                            next = ring.nextNode(failedNodes);
+                        }
+                    }
+
+                    if (sndState != null && sndState.isFailed()) {
+                        segmentLocalNodeOnSendFail();
+
+                        return; // Nothing to do here.
+                    }
 
                     next = null;
 
@@ -3384,6 +3464,23 @@
         }
 
         /**
+         * Segment local node on failed message send.
+         */
+        private void segmentLocalNodeOnSendFail() {
+            U.warn(log, "Unable to connect to next nodes in a ring, " +
+                "it seems local node is experiencing connectivity issues. Segmenting local node " +
+                "to avoid case when one node fails a big part of cluster. To disable" +
+                " that behavior set TcpDiscoverySpi.setConnectionRecoveryTimeout() to 0. " +
+                "[connRecoveryTimeout=" + spi.connRecoveryTimeout + ", effectiveConnRecoveryTimeout="
+                + spi.getEffectiveConnectionRecoveryTimeout() + ']');
+
+            // Remove any queued messages to avoid new connect tries.
+            queue.clear();
+
+            notifyDiscovery(EVT_NODE_SEGMENTED, ring.topologyVersion(), locNode);
+        }
+
+        /**
          * @param msg Message.
          * @return Whether to redirect message to client nodes.
          */
@@ -5923,6 +6020,60 @@
 
                     if (req.client())
                         res.clientAck(true);
+                    else if (req.changeTopology()) {
+                        // Node cannot connect to it's next (for local node it's previous).
+                        // Need to check connectivity to it.
+                        long rcvdTime = lastRingMsgReceivedTime;
+                        long now = U.currentTimeMillis();
+
+                        // We got message from previous in less than double connection check interval.
+                        boolean ok = rcvdTime + msgWorker.connCheckFreq * 2 >= now;
+
+                        if (ok) {
+                            // Check case when previous node suddenly died. This will speed up
+                            // node failing.
+                            Set<TcpDiscoveryNode> failed;
+
+                            synchronized (mux) {
+                                failed = failedNodes.keySet();
+                            }
+
+                            TcpDiscoveryNode previous = ring.previousNode(failed);
+
+                            InetSocketAddress liveAddr = null;
+
+                            if (previous != null && !previous.id().equals(nodeId)) {
+                                Collection<InetSocketAddress> nodeAddrs =
+                                    spi.getNodeAddresses(previous, false);
+
+                                for (InetSocketAddress addr : nodeAddrs) {
+                                    // Connection refused may be got if node doesn't listen
+                                    // (or blocked by firewall, but anyway assume it is dead).
+                                    if (!isConnectionRefused(addr)) {
+                                        liveAddr = addr;
+
+                                        break;
+                                    }
+                                }
+
+                                if (log.isInfoEnabled())
+                                    log.info("Connection check done: [liveAddr=" + liveAddr
+                                        + ", previousNode=" + previous + ", addressesToCheck=" + nodeAddrs
+                                        + ", connectingNodeId=" + nodeId + ']');
+                            }
+
+                            // If local node was able to connect to previous, confirm that it's alive.
+                            ok = liveAddr != null && (!liveAddr.getAddress().isLoopbackAddress()
+                                || !locNode.socketAddresses().contains(liveAddr));
+                        }
+
+                        res.previousNodeAlive(ok);
+
+                        if (log.isInfoEnabled()) {
+                            log.info("Previous node alive: [alive=" + ok + ", lastMessageReceivedTime="
+                                + rcvdTime + ", now=" + now + ", connCheckFreq=" + msgWorker.connCheckFreq + ']');
+                        }
+                    }
 
                     spi.writeToSocket(sock, res, spi.getEffectiveSocketTimeout(srvSock));
 
@@ -6062,6 +6213,8 @@
                             debugLog(msg, "Message has been received: " + msg);
 
                         if (msg instanceof TcpDiscoveryConnectionCheckMessage) {
+                            ringMessageReceived();
+
                             spi.writeToSocket(msg, sock, RES_OK, sockTimeout);
 
                             continue;
@@ -6239,6 +6392,8 @@
                             continue;
                         }
                         else if (msg instanceof TcpDiscoveryRingLatencyCheckMessage) {
+                            ringMessageReceived();
+
                             if (log.isInfoEnabled())
                                 log.info("Latency check message has been read: " + msg.id());
 
@@ -6249,8 +6404,11 @@
 
                         if (msg instanceof TcpDiscoveryClientMetricsUpdateMessage)
                             metricsUpdateMsg = (TcpDiscoveryClientMetricsUpdateMessage)msg;
-                        else
+                        else {
+                            ringMessageReceived();
+
                             msgWorker.addMessage(msg);
+                        }
 
                         // Send receipt back.
                         if (clientMsgWrk != null) {
@@ -6339,6 +6497,31 @@
         }
 
         /**
+         * Update last ring message received timestamp.
+         */
+        private void ringMessageReceived() {
+            lastRingMsgReceivedTime = U.currentTimeMillis();
+        }
+
+        /**
+         * @param addr Address to check.
+         * @return {@code True} if got connection refused on connect try.
+         */
+        private boolean isConnectionRefused(SocketAddress addr) {
+            try (Socket sock = new Socket()) {
+                sock.connect(addr, 100);
+            }
+            catch (ConnectException e) {
+                return true;
+            }
+            catch (IOException e) {
+                return false;
+            }
+
+            return false;
+        }
+
+        /**
          * Processes client reconnect message.
          *
          * @param msg Client reconnect message.
@@ -6967,4 +7150,131 @@
             this.sock = sock;
         }
     }
+
+    /**
+     *
+     */
+    private enum RingMessageSendState {
+        /** */
+        STARTING_POINT,
+
+        /** */
+        FORWARD_PASS,
+
+        /** */
+        BACKWARD_PASS,
+
+        /** */
+        FAILED
+    }
+
+    /**
+     * Initial state is {@link RingMessageSendState#STARTING_POINT}.<br>
+     * States could be switched:<br>
+     * {@link RingMessageSendState#STARTING_POINT} => {@link RingMessageSendState#FORWARD_PASS} when next node failed.<br>
+     * {@link RingMessageSendState#FORWARD_PASS} => {@link RingMessageSendState#FORWARD_PASS} when new next node failed.<br>
+     * {@link RingMessageSendState#FORWARD_PASS} => {@link RingMessageSendState#BACKWARD_PASS} when new next node has
+     * connection to it's previous node and forces local node to try it again.<br>
+     * {@link RingMessageSendState#BACKWARD_PASS} => {@link RingMessageSendState#BACKWARD_PASS} when previously tried node
+     * has connection to it's previous and forces local node to try it again.<br>
+     * {@link RingMessageSendState#BACKWARD_PASS} => {@link RingMessageSendState#STARTING_POINT} when local node came back
+     * to initial next node and no topology changes should be performed.<br>
+     * {@link RingMessageSendState#BACKWARD_PASS} => {@link RingMessageSendState#FAILED} when recovery timeout is over and
+     * all new next nodes have connections to their previous nodes. That means local node has connectivity
+     * issue and should be stopped.<br>
+     */
+    private class CrossRingMessageSendState {
+        /** */
+        private RingMessageSendState state = RingMessageSendState.STARTING_POINT;
+
+        /** */
+        private int failedNodes;
+
+        /** */
+        private final long failTime;
+
+        /**
+         *
+         */
+        CrossRingMessageSendState() {
+            failTime = spi.getEffectiveConnectionRecoveryTimeout() + U.currentTimeMillis();
+        }
+
+        /**
+         * @return {@code True} if state is {@link RingMessageSendState#STARTING_POINT}.
+         */
+        boolean isStartingPoint() {
+            return state == RingMessageSendState.STARTING_POINT;
+        }
+
+        /**
+         * @return {@code True} if state is {@link RingMessageSendState#BACKWARD_PASS}.
+         */
+        boolean isBackward() {
+            return state == RingMessageSendState.BACKWARD_PASS;
+        }
+
+        /**
+         * @return {@code True} if state is {@link RingMessageSendState#FAILED}.
+         */
+        boolean isFailed() {
+            return state == RingMessageSendState.FAILED;
+        }
+
+        /**
+         * Marks next node as failed.
+         *
+         * @return {@code True} node marked as failed.
+         */
+        boolean markNextNodeFailed() {
+            if (state == RingMessageSendState.STARTING_POINT || state == RingMessageSendState.FORWARD_PASS) {
+                state = RingMessageSendState.FORWARD_PASS;
+
+                failedNodes++;
+
+                return true;
+            }
+
+            return false;
+        }
+
+        /**
+         * Marks last failed node as alive.
+         *
+         * @return {@code False} if all failed nodes marked as alive or incorrect state.
+         */
+        boolean markLastFailedNodeAlive() {
+            if (state == RingMessageSendState.FORWARD_PASS || state == RingMessageSendState.BACKWARD_PASS) {
+                state = RingMessageSendState.BACKWARD_PASS;
+
+                if (--failedNodes <= 0) {
+                    failedNodes = 0;
+
+                    if (U.currentTimeMillis() >= failTime) {
+                        state = RingMessageSendState.FAILED;
+
+                        return false;
+                    }
+
+                    state = RingMessageSendState.STARTING_POINT;
+
+                    try {
+                        Thread.sleep(200);
+                    }
+                    catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                    }
+                }
+
+                return true;
+            }
+
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(CrossRingMessageSendState.class, this);
+        }
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
index 00d83dd..64cc2b9 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
@@ -248,6 +248,13 @@
     }
 
     /**
+     * @return connection check interval.
+     */
+    public long connectionCheckInterval() {
+        return 0;
+    }
+
+    /**
      * @throws IgniteSpiException If failed.
      */
     public abstract void spiStop() throws IgniteSpiException;
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 801f2b6..0c7a56f 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -55,9 +55,9 @@
 import org.apache.ignite.configuration.AddressResolver;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
 import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener;
-import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
@@ -277,6 +277,9 @@
     /** Maximum ack timeout value for receiving message acknowledgement in milliseconds (value is <tt>600,000ms</tt>). */
     public static final long DFLT_MAX_ACK_TIMEOUT = 10 * 60 * 1000;
 
+    /** Default connection recovery timeout in ms. */
+    public static final long DFLT_CONNECTION_RECOVERY_TIMEOUT = IgniteConfiguration.DFLT_FAILURE_DETECTION_TIMEOUT;
+
     /** Ssl message pattern for StreamCorruptedException. */
     private static Pattern sslMsgPattern = Pattern.compile("invalid stream header: 150\\d0\\d00");
 
@@ -311,6 +314,9 @@
     /** Size of topology snapshots history. */
     protected int topHistSize = DFLT_TOP_HISTORY_SIZE;
 
+    /** Default connection recovery timeout in ms. */
+    protected long connRecoveryTimeout = DFLT_CONNECTION_RECOVERY_TIMEOUT;
+
     /** Grid discovery listener. */
     protected volatile DiscoverySpiListener lsnr;
 
@@ -994,6 +1000,54 @@
         return this;
     }
 
+    /**
+     * Gets timeout that defines how long server node would try to recovery connection.<br>
+     * See {@link #setConnectionRecoveryTimeout(long)} for details.
+     *
+     * @return Timeout that defines how long server node would try to recovery connection.
+     */
+    public long getConnectionRecoveryTimeout() {
+        return connRecoveryTimeout;
+    }
+
+    /**
+     * @return Connection recovery timeout that is not greater than failureDetectionTimeout if enabled.
+     */
+    long getEffectiveConnectionRecoveryTimeout() {
+        if (failureDetectionTimeoutEnabled() && failureDetectionTimeout() < connRecoveryTimeout)
+            return failureDetectionTimeout();
+
+        return connRecoveryTimeout;
+    }
+
+    /**
+     * Sets timeout that defines how long server node would try to recovery connection.
+     * <p>In case local node has temporary connectivity issues with part of the cluster,
+     * it may sequentially fail nodes one-by-one till successfully connect to one that
+     * has a fine connection with.
+     * This leads to fail of big number of nodes.
+     * </p>
+     * <p>
+     *     To overcome that issue, local node will do a sequential connection tries to next
+     *     nodes. But if new next node has connection to previous it forces local node to
+     *     retry connect to previous. These tries will last till timeout will not
+     *     finished. When timeout is over, but no success in connecting to nodes it will
+     *     segment itself.
+     * </p>
+     * <p>
+     *     Cannot be greater than {@link #failureDetectionTimeout()}.
+     * </p>
+     * <p>
+     *     Default is {@link #DFLT_CONNECTION_RECOVERY_TIMEOUT}.
+     * </p>
+     *
+     * @param connRecoveryTimeout Timeout that defines how long server node would try to recovery connection.
+     * {@code 0} means node will not recheck failed nodes.
+     */
+    public void setConnectionRecoveryTimeout(long connRecoveryTimeout) {
+        this.connRecoveryTimeout = connRecoveryTimeout;
+    }
+
     /** {@inheritDoc} */
     @Override public void setNodeAttributes(Map<String, Object> attrs, IgniteProductVersion ver) {
         assert locNodeAttrs == null;
@@ -2438,6 +2492,11 @@
         }
 
         /** {@inheritDoc} */
+        @Override public long getConnectionCheckInterval() {
+            return impl.connectionCheckInterval();
+        }
+
+        /** {@inheritDoc} */
         @Override public boolean isClientMode() {
             return TcpDiscoverySpi.this.isClientMode();
         }
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiMBean.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiMBean.java
index cb0fd36..176cc07 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiMBean.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiMBean.java
@@ -56,6 +56,14 @@
     public int getReconnectCount();
 
     /**
+     * Gets connection check interval in ms.
+     *
+     * @return Number of connection attempts.
+     */
+    @MXBeanDescription("Connection check interval.")
+    public long getConnectionCheckInterval();
+
+    /**
      * Gets network timeout.
      *
      * @return Network timeout.
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
index 54ddc9e..7fc394b 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
@@ -17,18 +17,6 @@
 
 package org.apache.ignite.spi.discovery.tcp.internal;
 
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.util.tostring.GridToStringExclude;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.P1;
-import org.apache.ignite.internal.util.typedef.PN;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgnitePredicate;
-import org.apache.ignite.lang.IgniteProductVersion;
-import org.jetbrains.annotations.Nullable;
-
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -40,6 +28,17 @@
 import java.util.UUID;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.P1;
+import org.apache.ignite.internal.util.typedef.PN;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lang.IgniteProductVersion;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Convenient way to represent topology for {@link org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi}
@@ -506,6 +505,42 @@
     }
 
     /**
+     * Finds previous node in the topology filtering excluded nodes from search.
+     * <p>
+     * This may be used when detecting and handling nodes failure.
+     *
+     * @param excluded Nodes to exclude from the search (optional). If provided,
+     * cannot contain local node.
+     * @return Previous node or {@code null} if all nodes were filtered out or
+     * topology contains less than two nodes.
+     */
+    @Nullable public TcpDiscoveryNode previousNode(@Nullable Collection<TcpDiscoveryNode> excluded) {
+        rwLock.readLock().lock();
+
+        try {
+            Collection<TcpDiscoveryNode> filtered = serverNodes(excluded);
+
+            if (filtered.size() < 2)
+                return null;
+
+            TcpDiscoveryNode previous = null;
+
+            // Get last node that is previous in a ring
+            for (TcpDiscoveryNode node : filtered) {
+                if (locNode.equals(node) && previous != null)
+                    break;
+
+                previous = node;
+            }
+
+            return previous;
+        }
+        finally {
+            rwLock.readLock().unlock();
+        }
+    }
+
+    /**
      * Gets current topology version.
      *
      * @return Current topology version.
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
index a23cb63..93d8bcf 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
@@ -46,6 +46,9 @@
     protected static final int CLIENT_RECON_SUCCESS_FLAG_POS = 2;
 
     /** */
+    protected static final int CHANGE_TOPOLOGY_FLAG_POS = 3;
+
+    /** */
     protected static final int CLIENT_ACK_FLAG_POS = 4;
 
     /** */
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHandshakeRequest.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHandshakeRequest.java
index ea5b868..90c9f94 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHandshakeRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHandshakeRequest.java
@@ -36,8 +36,29 @@
         super(creatorNodeId);
     }
 
+    /**
+     * Gets topology change flag.<br>
+     * {@code True} means node intent to fail nodes in a ring.
+     *
+     * @return Change topology flag.
+     */
+    public boolean changeTopology() {
+        return getFlag(CHANGE_TOPOLOGY_FLAG_POS);
+    }
+
+    /**
+     * Gets topology change flag.<br>
+     * {@code True} means node intent to fail nodes in a ring.
+     *
+     * @param changeTop Change topology flag.
+     */
+    public void changeTopology(boolean changeTop) {
+        setFlag(CHANGE_TOPOLOGY_FLAG_POS, changeTop);
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
-        return S.toString(TcpDiscoveryHandshakeRequest.class, this, "super", super.toString());
+        return S.toString(TcpDiscoveryHandshakeRequest.class, this, "super", super.toString(),
+            "isChangeTopology", changeTopology());
     }
 }
\ No newline at end of file
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHandshakeResponse.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHandshakeResponse.java
index 0d350af..75df5c2 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHandshakeResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHandshakeResponse.java
@@ -43,6 +43,26 @@
     }
 
     /**
+     * Gets previous node alive flag.<br>
+     * {@code True} means node has connectivity to it's previous node in a ring.
+     *
+     * @return previous node alive flag.
+     */
+    public boolean previousNodeAlive() {
+        return getFlag(CHANGE_TOPOLOGY_FLAG_POS);
+    }
+
+    /**
+     * Sets topology change flag.<br>
+     * {@code True} means node has connectivity to it's previous node in a ring.
+     *
+     * @param prevNodeAlive previous node alive flag.
+     */
+    public void previousNodeAlive(boolean prevNodeAlive) {
+        setFlag(CHANGE_TOPOLOGY_FLAG_POS, prevNodeAlive);
+    }
+
+    /**
      * Gets order of the node sent the response.
      *
      * @return Order of the node sent the response.
@@ -76,6 +96,7 @@
 
     /** {@inheritDoc} */
     @Override public String toString() {
-        return S.toString(TcpDiscoveryHandshakeResponse.class, this, "super", super.toString());
+        return S.toString(TcpDiscoveryHandshakeResponse.class, this, "super", super.toString(),
+            "isPreviousNodeAlive", previousNodeAlive());
     }
 }
\ No newline at end of file
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteDiscoveryMassiveNodeFailTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteDiscoveryMassiveNodeFailTest.java
new file mode 100644
index 0000000..32ce978
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteDiscoveryMassiveNodeFailTest.java
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Tests checks case when one node is unable to connect to next in a ring,
+ * but those nodes are not experiencing any connectivity troubles between
+ * each other.
+ */
+public class IgniteDiscoveryMassiveNodeFailTest extends GridCommonAbstractTest {
+    /** */
+    private static final int FAILURE_DETECTION_TIMEOUT = 5_000;
+
+    /** */
+    private Set<InetSocketAddress> failedAddrs = new GridConcurrentHashSet<>();
+
+    /** */
+    private volatile TcpDiscoveryNode compromisedNode;
+
+    /** */
+    private volatile boolean forceFailConnectivity;
+
+    /** */
+    private volatile boolean failNodes;
+
+    /** */
+    private long timeout;
+
+    /** */
+    private volatile Set<ClusterNode> failedNodes = Collections.emptySet();
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        FailDiscoverySpi disco = new FailDiscoverySpi();
+
+        disco.setIpFinder(LOCAL_IP_FINDER);
+
+        cfg.setDiscoverySpi(disco);
+
+        disco.setConnectionRecoveryTimeout(timeout);
+
+        cfg.setFailureDetectionTimeout(FAILURE_DETECTION_TIMEOUT);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        System.setProperty(IgniteSystemProperties.IGNITE_DUMP_THREADS_ON_FAILURE, "false");
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        System.setProperty(IgniteSystemProperties.IGNITE_DUMP_THREADS_ON_FAILURE, "true");
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        timeout = 2_000;
+        failNodes = false;
+        forceFailConnectivity = false;
+    }
+
+    /**
+     * Node fails 2 nodes when connection check is disabled.
+     *
+     * @throws Exception If failed.
+     */
+    public void testMassiveFailDisabledRecovery() throws Exception {
+        timeout = 0; // Disable previous node check.
+
+        doFailNodes(false);
+    }
+
+    /**
+     *
+     */
+    private void doFailNodes(boolean simulateNodeFailure) throws Exception {
+        startGrids(5);
+
+        grid(0).events().enabledEvents();
+
+        failedNodes = new HashSet<>(Arrays.asList(grid(3).cluster().localNode(), grid(4).cluster().localNode()));
+
+        CountDownLatch latch = new CountDownLatch(failedNodes.size());
+
+        grid(0).events().localListen(e -> {
+            DiscoveryEvent evt = (DiscoveryEvent)e;
+
+            if (failedNodes.contains(evt.eventNode()))
+                latch.countDown();
+
+            return true;
+        }, EventType.EVT_NODE_FAILED);
+
+        compromisedNode = (TcpDiscoveryNode)grid(2).localNode();
+
+        for (int i = 3; i < 5; i++)
+            failedAddrs.addAll(((TcpDiscoveryNode)grid(i).localNode()).socketAddresses());
+
+        System.out.println(">> Start failing nodes");
+
+        forceFailConnectivity = true;
+
+        if (simulateNodeFailure) {
+            for (int i = 3; i < 5; i++)
+                ((TcpDiscoverySpi)grid(i).configuration().getDiscoverySpi()).simulateNodeFailure();
+        }
+
+        assert latch.await(waitTime(), TimeUnit.MILLISECONDS);
+
+        assertEquals(3, grid(0).cluster().forServers().nodes().size());
+    }
+
+    /**
+     *
+     */
+    private long waitTime() {
+        return timeout + 5000;
+    }
+
+    /**
+     * Node fail itself.
+     *
+     * @throws Exception If failed.
+     */
+    public void testMassiveFailSelfKill() throws Exception {
+        startGrids(5);
+
+        grid(0).events().enabledEvents();
+
+        CountDownLatch latch = new CountDownLatch(1);
+
+        grid(0).events().localListen((e) -> {
+            DiscoveryEvent evt = (DiscoveryEvent)e;
+
+            if (evt.eventNode().equals(compromisedNode))
+                latch.countDown();
+
+            return true;
+        }, EventType.EVT_NODE_FAILED);
+
+        compromisedNode = (TcpDiscoveryNode)grid(2).localNode();
+
+        for (int i = 3; i < 5; i++)
+            failedAddrs.addAll(((TcpDiscoveryNode)grid(i).localNode()).socketAddresses());
+
+        System.out.println(">> Start failing nodes");
+
+        forceFailConnectivity = true;
+
+        assert latch.await(waitTime(), TimeUnit.MILLISECONDS);
+
+        assertEquals(4, grid(0).cluster().forServers().nodes().size());
+    }
+
+    /**
+     * When connectivity restored, no topology changes will be applied.
+     *
+     * @throws Exception If failed.
+     */
+    public void testMassiveFailAndRecovery() throws Exception {
+        startGrids(5);
+
+        grid(0).events().enabledEvents();
+
+        CountDownLatch latch = new CountDownLatch(1);
+
+        grid(0).events().localListen(e -> {
+            DiscoveryEvent evt = (DiscoveryEvent)e;
+
+            if (evt.eventNode().equals(compromisedNode))
+                latch.countDown();
+
+            return true;
+        }, EventType.EVT_NODE_FAILED);
+
+        compromisedNode = (TcpDiscoveryNode)grid(2).localNode();
+
+        for (int i = 3; i < 5; i++)
+            failedAddrs.addAll(((TcpDiscoveryNode)grid(i).localNode()).socketAddresses());
+
+        System.out.println(">> Start failing nodes");
+
+        forceFailConnectivity = true;
+
+        doSleep(timeout / 4); // wait 1 try
+
+        forceFailConnectivity = false;
+
+        System.out.println(">> Stop failing nodes");
+
+        assert !latch.await(waitTime(), TimeUnit.MILLISECONDS);
+
+        // Topology is not changed
+        assertEquals(5, grid(0).cluster().forServers().nodes().size());
+        assertEquals(5, grid(0).cluster().topologyVersion());
+    }
+
+    /**
+     * Regular nodes fail by timeout.
+     *
+     * @throws Exception If failed.
+     */
+    public void testMassiveFail() throws Exception {
+        failNodes = true;
+
+        // Must be greater than failureDetectionTimeout / 3 as it calculated into
+        // connection check frequency.
+        timeout = FAILURE_DETECTION_TIMEOUT;
+
+        doFailNodes(false);
+    }
+
+    /**
+     * Regular node fail by crash. Should be faster due to
+     *
+     *
+     * @throws Exception If failed.
+     */
+    public void testMassiveFailForceNodeFail() throws Exception {
+        failNodes = true;
+
+        // Must be greater than failureDetectionTimeout / 3 as it calculated into
+        // connection check frequency.
+        timeout = FAILURE_DETECTION_TIMEOUT / 2;
+
+        doFailNodes(true);
+    }
+
+    /**
+     * Check that cluster recovers from temporal connection breakage.
+     *
+     * @throws Exception If failed.
+     */
+    public void testRecoveryOnDisconnect() throws Exception {
+        startGrids(3);
+
+        IgniteEx ignite1 = grid(1);
+        IgniteEx ignite2 = grid(2);
+
+        ((TcpDiscoverySpi)ignite1.configuration().getDiscoverySpi()).brakeConnection();
+        ((TcpDiscoverySpi)ignite2.configuration().getDiscoverySpi()).brakeConnection();
+
+        doSleep(FAILURE_DETECTION_TIMEOUT);
+
+        assertEquals(3, grid(0).cluster().nodes().size());
+        assertEquals(3, grid(1).cluster().nodes().size());
+        assertEquals(3, grid(2).cluster().nodes().size());
+    }
+
+    /**
+     *
+     */
+    private class FailDiscoverySpi extends TcpDiscoverySpi {
+        /** {@inheritDoc} */
+        @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, byte[] data,
+            long timeout) throws IOException {
+            assertNotFailedNode(sock);
+
+            if (isDrop(msg))
+                return;
+
+            super.writeToSocket(sock, msg, data, timeout);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
+            long timeout) throws IOException, IgniteCheckedException {
+            assertNotFailedNode(sock);
+
+            if (isDrop(msg))
+                return;
+
+            super.writeToSocket(sock, msg, timeout);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void writeToSocket(ClusterNode node, Socket sock, OutputStream out,
+            TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, IgniteCheckedException {
+            assertNotFailedNode(sock);
+
+            if (isDrop(msg))
+                return;
+
+            super.writeToSocket(node, sock, out, msg, timeout);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg,
+            long timeout) throws IOException, IgniteCheckedException {
+            assertNotFailedNode(sock);
+
+            if (isDrop(msg))
+                return;
+
+            super.writeToSocket(sock, out, msg, timeout);
+        }
+
+        /**
+         *
+         */
+        private boolean isDrop(TcpDiscoveryAbstractMessage msg) {
+            boolean drop = failNodes && forceFailConnectivity && failedNodes.contains(ignite.cluster().localNode());
+
+            if (drop)
+                ignite.log().info(">> Drop message " + msg);
+
+            return drop;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res,
+            long timeout) throws IOException {
+            assertNotFailedNode(sock);
+
+            if (isDrop(msg))
+                return;
+
+            super.writeToSocket(msg, sock, res, timeout);
+        }
+
+        /**
+         * @param sock Socket.
+         * @throws IOException To break connection.
+         */
+        @SuppressWarnings("SuspiciousMethodCalls")
+        private void assertNotFailedNode(Socket sock) throws IOException {
+            if (forceFailConnectivity && getLocalNode().equals(compromisedNode) && failedAddrs.contains(sock.getRemoteSocketAddress())) {
+                log.info(">> Force fail connection " + sock.getRemoteSocketAddress());
+
+                throw new IOException("Force fail connection " + sock.getRemoteSocketAddress());
+            }
+        }
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
index f1c826a..c167a90 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
@@ -62,6 +62,9 @@
     /** */
     private static boolean useTestSpi;
 
+    /** */
+    private static boolean disableTopChangeRecovery;
+
     /** {@inheritDoc} */
     @Override protected boolean useFailureDetectionTimeout() {
         return true;
@@ -89,7 +92,19 @@
 
     /** {@inheritDoc} */
     @Override protected TcpDiscoverySpi getDiscoverySpi() {
-        return useTestSpi ? new TestTcpDiscoverySpi2() : super.getDiscoverySpi();
+        TcpDiscoverySpi spi = useTestSpi ? new TestTcpDiscoverySpi2() : super.getDiscoverySpi();
+
+        if (disableTopChangeRecovery)
+            spi.setConnectionRecoveryTimeout(0);
+
+        return spi;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        disableTopChangeRecovery = false;
     }
 
     /**
@@ -202,6 +217,7 @@
         failureThreshold = 1000;
         clientFailureDetectionTimeout = 10000;
         useTestSpi = true;
+        disableTopChangeRecovery = true;
 
         try {
             startServerNodes(3);
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySegmentationPolicyTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySegmentationPolicyTest.java
index df76afc..7d4d802 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySegmentationPolicyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySegmentationPolicyTest.java
@@ -45,6 +45,9 @@
         if (igniteInstanceName.endsWith("2"))
             cfg.setFailureHandler(new TestFailureHandler());
 
+        // Disable recovery
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setConnectionRecoveryTimeout(0);
+
         return cfg;
     }
 
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
index d50a967..1aae8fb 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
@@ -90,6 +90,7 @@
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.eclipse.jetty.util.ConcurrentHashSet;
+import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 import static java.util.concurrent.TimeUnit.SECONDS;
@@ -1675,15 +1676,15 @@
         try {
             final int FAIL_ORDER = 3;
 
-            nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER));
+            nodeSpi.set(createFailedNodeSpi(FAIL_ORDER));
 
             final Ignite ignite0 = startGrid(0);
 
-            nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER));
+            nodeSpi.set(createFailedNodeSpi(FAIL_ORDER));
 
             startGrid(1);
 
-            nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER));
+            nodeSpi.set(createFailedNodeSpi(FAIL_ORDER));
 
             Ignite ignite2 = startGrid(2);
 
@@ -1699,6 +1700,18 @@
     }
 
     /**
+     * @param failOrder Fail order.
+     * @return Failed node spi.
+     */
+    @NotNull private TestFailedNodesSpi createFailedNodeSpi(int failOrder) {
+        TestFailedNodesSpi spi = new TestFailedNodesSpi(failOrder);
+
+        spi.setConnectionRecoveryTimeout(0);
+
+        return spi;
+    }
+
+    /**
      * Coordinator is added in failed list, concurrent nodes start.
      *
      * @throws Exception If failed.
@@ -1707,11 +1720,11 @@
         try {
             final int FAIL_ORDER = 3;
 
-            nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER));
+            nodeSpi.set(createFailedNodeSpi(FAIL_ORDER));
 
             Ignite ignite0 = startGrid(0);
 
-            nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER));
+            nodeSpi.set(createFailedNodeSpi(FAIL_ORDER));
 
             startGrid(1);
 
@@ -1721,7 +1734,7 @@
                 @Override public Void call() throws Exception {
                     int idx = nodeIdx.incrementAndGet();
 
-                    nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER));
+                    nodeSpi.set(createFailedNodeSpi(FAIL_ORDER));
 
                     startGrid(idx);
 
@@ -1749,11 +1762,11 @@
      */
     public void testFailedNodes3() throws Exception {
         try {
-            nodeSpi.set(new TestFailedNodesSpi(-1));
+            nodeSpi.set(createFailedNodeSpi(-1));
 
             Ignite ignite0 = startGrid(0);
 
-            nodeSpi.set(new TestFailedNodesSpi(2));
+            nodeSpi.set(createFailedNodeSpi(2));
 
             Ignite ignite1 = startGrid(1);
 
@@ -1784,15 +1797,15 @@
         try {
             final int FAIL_ORDER = 3;
 
-            nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER));
+            nodeSpi.set(createFailedNodeSpi(FAIL_ORDER));
 
             final Ignite ignite0 = startGrid(0);
 
-            nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER));
+            nodeSpi.set(createFailedNodeSpi(FAIL_ORDER));
 
             Ignite ignite1 = startGrid(1);
 
-            TestFailedNodesSpi spi = new TestFailedNodesSpi(FAIL_ORDER);
+            TestFailedNodesSpi spi = createFailedNodeSpi(FAIL_ORDER);
 
             spi.stopBeforeSndFail = true;
 
@@ -1831,7 +1844,7 @@
                 final int NODES = iter == 0 ? 2 : rnd.nextInt(3, 6);
 
                 for (int i = 0; i < NODES; i++) {
-                    nodeSpi.set(new TestFailedNodesSpi(-1));
+                    nodeSpi.set(createFailedNodeSpi(-1));
 
                     startGrid(i);
                 }
@@ -2084,7 +2097,11 @@
             TestRestoreConnectedSpi.startTest = false;
 
             for (int i = 1; i < 5; i++) {
-                nodeSpi.set(new TestRestoreConnectedSpi(3));
+                TestRestoreConnectedSpi spi = new TestRestoreConnectedSpi(3);
+
+                spi.setConnectionRecoveryTimeout(0);
+
+                nodeSpi.set(spi);
 
                 startGrid(i);
             }
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
index ef582a5..1d10b4e 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.testsuites;
 
 import junit.framework.TestSuite;
+import org.apache.ignite.internal.IgniteDiscoveryMassiveNodeFailTest;
 import org.apache.ignite.spi.GridTcpSpiForwardingSelfTest;
 import org.apache.ignite.spi.discovery.AuthenticationRestartTest;
 import org.apache.ignite.spi.discovery.FilterDataForClientNodeDiscoveryTest;
@@ -106,6 +107,8 @@
 
         suite.addTest(new TestSuite(TcpDiscoverySpiReconnectDelayTest.class));
 
+        suite.addTest(new TestSuite(IgniteDiscoveryMassiveNodeFailTest.class));
+
         // Client connect.
         suite.addTest(new TestSuite(IgniteClientConnectTest.class));
         suite.addTest(new TestSuite(IgniteClientReconnectMassiveShutdownTest.class));