IGNITE-13705 : Another node fails with failure of target node. (#8484)

(cherry picked from commit edb736dcd8d1d57c875ce7de2b2b2b786d1f8d51)
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index d959dc7..6a25de5 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -214,6 +214,9 @@
     /** Interval of checking connection to next node in the ring. */
     private long connCheckInterval;
 
+    /** Fundamental value for connection checking actions. */
+    private long connCheckTick;
+
     /** */
     private IgniteThreadPoolExecutor utilityPool;
 
@@ -385,9 +388,12 @@
 
         lastRingMsgSentTime = 0;
 
+        // Foundumental timeout value for actions related to connection check.
+        connCheckTick = effectiveExchangeTimeout() / 3;
+
         // Since we take in account time of last sent message, the interval should be quite short to give enough piece
         // of failure detection timeout as send-and-acknowledge timeout of the message to send.
-        connCheckInterval = Math.min(effectiveExchangeTimeout() / 4, MAX_CON_CHECK_INTERVAL);
+        connCheckInterval = Math.min(connCheckTick, MAX_CON_CHECK_INTERVAL);
 
         utilityPool = new IgniteThreadPoolExecutor("disco-pool",
             spi.ignite().name(),
@@ -3510,12 +3516,19 @@
                                 if (changeTop)
                                     hndMsg.changeTopology(ring.previousNodeOf(next).id());
 
-                                if (log.isDebugEnabled())
-                                    log.debug("Sending handshake [hndMsg=" + hndMsg + ", sndState=" + sndState + ']');
+                                if (log.isDebugEnabled()) {
+                                    log.debug("Sending handshake [hndMsg=" + hndMsg + ", sndState=" + sndState +
+                                        "] with timeout " + timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
+                                }
 
                                 spi.writeToSocket(sock, out, hndMsg,
                                     timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
 
+                                if (log.isDebugEnabled()) {
+                                    log.debug("Reading handshake response with timeout " +
+                                        timeoutHelper.nextTimeoutChunk(ackTimeout0));
+                                }
+
                                 TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null,
                                     timeoutHelper.nextTimeoutChunk(ackTimeout0));
 
@@ -6526,6 +6539,26 @@
         }
     }
 
+    /**
+     * Creates proper timeout helper taking in account current send state and ring state.
+     *
+     * @param sndState Current connection recovering state. Ignored if {@code null}.
+     * @param lastOperationNanos Time of last related operation. Ignored if negative or 0.
+     * @return Timeout helper.
+     */
+    private IgniteSpiOperationTimeoutHelper serverOperationTimeoutHelper(@Nullable CrossRingMessageSendState sndState,
+        long lastOperationNanos) {
+        long absoluteThreshold = -1;
+
+        // Active send-state means we lost connection to next node and have to find another. We don't know how many
+        // nodes failed. May be several failed in a row. But we got only one connectionRecoveryTimeout to establish new
+        // connection. We should travers rest of the cluster with sliced timeout for each node.
+        if (sndState != null)
+            absoluteThreshold = Math.min(sndState.failTimeNanos, System.nanoTime() + U.millisToNanos(connCheckTick));
+
+        return new IgniteSpiOperationTimeoutHelper(spi, true, lastOperationNanos, absoluteThreshold);
+    }
+
     /** Fixates time of last sent message. */
     private void updateLastSentMessageTime() {
         lastRingMsgSentTime = System.nanoTime();
@@ -6887,13 +6920,22 @@
                                 (req.checkPreviousNodeId() == null || previous.id().equals(req.checkPreviousNodeId()))) {
                                 Collection<InetSocketAddress> nodeAddrs = spi.getNodeAddresses(previous, false);
 
-                                liveAddr = checkConnection(new ArrayList<>(nodeAddrs),
-                                    (int)U.nanosToMillis(timeThreshold - now));
+                                // The connection recovery connection to one node is connCheckTick.
+                                // We need to suppose network delays. So we use half of this time.
+                                int backwardCheckTimeout = (int)(connCheckTick / 2);
 
-                                if (log.isInfoEnabled())
-                                    log.info("Connection check done [liveAddr=" + liveAddr
-                                        + ", previousNode=" + previous + ", addressesToCheck=" + nodeAddrs
-                                        + ", connectingNodeId=" + nodeId + ']');
+                                if (log.isDebugEnabled()) {
+                                    log.debug("Remote node requests topology change. Checking connection to " +
+                                        "previous [" + previous + "] with timeout " + backwardCheckTimeout);
+                                }
+
+                                liveAddr = checkConnection(new ArrayList<>(nodeAddrs), backwardCheckTimeout);
+
+                                if (log.isInfoEnabled()) {
+                                    log.info("Connection check to previous node done: [liveAddr=" + liveAddr
+                                        + ", previousNode=" + U.toShortString(previous) + ", addressesToCheck=" +
+                                        nodeAddrs + ", connectingNodeId=" + nodeId + ']');
+                                }
                             }
 
                             // If local node was able to connect to previous, confirm that it's alive.
@@ -6912,6 +6954,11 @@
                         }
                     }
 
+                    if (log.isDebugEnabled()) {
+                        log.debug("Sending handshake response [" + res + "] with timeout " +
+                            spi.getEffectiveSocketTimeout(srvSock) + " to " + rmtAddr + ":" + sock.getPort());
+                    }
+
                     spi.writeToSocket(sock, res, spi.getEffectiveSocketTimeout(srvSock));
 
                     // It can happen if a remote node is stopped and it has a loopback address in the list of addresses,
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java
index 992b0dd..3bca1b2 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java
@@ -21,7 +21,11 @@
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.SocketTimeoutException;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
 import org.apache.ignite.Ignite;
@@ -206,6 +210,74 @@
     }
 
     /**
+     * Ensures sequential failure of two nodes has no additional issues.
+     */
+    @Test
+    public void testSequentialFailTwoNodes() throws Exception {
+        simulateFailureOfTwoNodes(true);
+    }
+
+    /**
+     * Ensures sequential failure of two nodes has no additional issues.
+     */
+    @Test
+    public void testNotSequentialFailTwoNodes() throws Exception {
+        simulateFailureOfTwoNodes(false);
+    }
+
+    /** */
+    private void simulateFailureOfTwoNodes(boolean sequentionally) throws Exception {
+        failureDetectionTimeout = 1000;
+
+        int gridCnt = 7;
+
+        startGrids(gridCnt);
+
+        awaitPartitionMapExchange();
+
+        final CountDownLatch failLatch = new CountDownLatch(2);
+
+        for (int i = 0; i < gridCnt; i++) {
+            ignite(i).events().localListen(evt -> {
+                failLatch.countDown();
+
+                return true;
+            }, EVT_NODE_FAILED);
+
+            int nodeIdx = i;
+
+            ignite(i).events().localListen(evt -> {
+                segmentedNodes.add(nodeIdx);
+
+                return true;
+            }, EVT_NODE_SEGMENTED);
+        }
+
+        Set<Integer> failedNodes = new HashSet<>();
+
+        failedNodes.add(2);
+
+        if (sequentionally)
+            failedNodes.add(3);
+        else
+            failedNodes.add(4);
+
+        failedNodes.forEach(idx -> processNetworkThreads(ignite(idx), Thread::suspend));
+
+        try {
+            failLatch.await(10, TimeUnit.SECONDS);
+        }
+        finally {
+            failedNodes.forEach(idx -> processNetworkThreads(ignite(idx), Thread::resume));
+        }
+
+        for (int i = 0; i < gridCnt; i++) {
+            if (!failedNodes.contains(i))
+                assertFalse(segmentedNodes.contains(i));
+        }
+    }
+
+    /**
      * @param ig Ignite instance to get failedNodes collection from.
      */
     private Map getFailedNodesCollection(IgniteEx ig) {
@@ -244,9 +316,9 @@
 
         CommunicationSpi<?> comm = ignite.configuration().getCommunicationSpi();
 
-        GridNioServer<?> nioServerWrapper = U.field(comm, "nioSrvr");
+        GridNioServer<?> gridNioServer = U.field(comm, "nioSrvr");
 
-        for (GridWorker worker : nioServerWrapper.workers())
+        for (GridWorker worker : gridNioServer.workers())
             proc.accept(worker.runner());
     }
 }