IGNITE-13705 : Another node fails with failure of target node. (#8484)
(cherry picked from commit edb736dcd8d1d57c875ce7de2b2b2b786d1f8d51)
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index d959dc7..6a25de5 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -214,6 +214,9 @@
/** Interval of checking connection to next node in the ring. */
private long connCheckInterval;
+ /** Fundamental value for connection checking actions. */
+ private long connCheckTick;
+
/** */
private IgniteThreadPoolExecutor utilityPool;
@@ -385,9 +388,12 @@
lastRingMsgSentTime = 0;
+ // Foundumental timeout value for actions related to connection check.
+ connCheckTick = effectiveExchangeTimeout() / 3;
+
// Since we take in account time of last sent message, the interval should be quite short to give enough piece
// of failure detection timeout as send-and-acknowledge timeout of the message to send.
- connCheckInterval = Math.min(effectiveExchangeTimeout() / 4, MAX_CON_CHECK_INTERVAL);
+ connCheckInterval = Math.min(connCheckTick, MAX_CON_CHECK_INTERVAL);
utilityPool = new IgniteThreadPoolExecutor("disco-pool",
spi.ignite().name(),
@@ -3510,12 +3516,19 @@
if (changeTop)
hndMsg.changeTopology(ring.previousNodeOf(next).id());
- if (log.isDebugEnabled())
- log.debug("Sending handshake [hndMsg=" + hndMsg + ", sndState=" + sndState + ']');
+ if (log.isDebugEnabled()) {
+ log.debug("Sending handshake [hndMsg=" + hndMsg + ", sndState=" + sndState +
+ "] with timeout " + timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
+ }
spi.writeToSocket(sock, out, hndMsg,
timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
+ if (log.isDebugEnabled()) {
+ log.debug("Reading handshake response with timeout " +
+ timeoutHelper.nextTimeoutChunk(ackTimeout0));
+ }
+
TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null,
timeoutHelper.nextTimeoutChunk(ackTimeout0));
@@ -6526,6 +6539,26 @@
}
}
+ /**
+ * Creates proper timeout helper taking in account current send state and ring state.
+ *
+ * @param sndState Current connection recovering state. Ignored if {@code null}.
+ * @param lastOperationNanos Time of last related operation. Ignored if negative or 0.
+ * @return Timeout helper.
+ */
+ private IgniteSpiOperationTimeoutHelper serverOperationTimeoutHelper(@Nullable CrossRingMessageSendState sndState,
+ long lastOperationNanos) {
+ long absoluteThreshold = -1;
+
+ // Active send-state means we lost connection to next node and have to find another. We don't know how many
+ // nodes failed. May be several failed in a row. But we got only one connectionRecoveryTimeout to establish new
+ // connection. We should travers rest of the cluster with sliced timeout for each node.
+ if (sndState != null)
+ absoluteThreshold = Math.min(sndState.failTimeNanos, System.nanoTime() + U.millisToNanos(connCheckTick));
+
+ return new IgniteSpiOperationTimeoutHelper(spi, true, lastOperationNanos, absoluteThreshold);
+ }
+
/** Fixates time of last sent message. */
private void updateLastSentMessageTime() {
lastRingMsgSentTime = System.nanoTime();
@@ -6887,13 +6920,22 @@
(req.checkPreviousNodeId() == null || previous.id().equals(req.checkPreviousNodeId()))) {
Collection<InetSocketAddress> nodeAddrs = spi.getNodeAddresses(previous, false);
- liveAddr = checkConnection(new ArrayList<>(nodeAddrs),
- (int)U.nanosToMillis(timeThreshold - now));
+ // The connection recovery connection to one node is connCheckTick.
+ // We need to suppose network delays. So we use half of this time.
+ int backwardCheckTimeout = (int)(connCheckTick / 2);
- if (log.isInfoEnabled())
- log.info("Connection check done [liveAddr=" + liveAddr
- + ", previousNode=" + previous + ", addressesToCheck=" + nodeAddrs
- + ", connectingNodeId=" + nodeId + ']');
+ if (log.isDebugEnabled()) {
+ log.debug("Remote node requests topology change. Checking connection to " +
+ "previous [" + previous + "] with timeout " + backwardCheckTimeout);
+ }
+
+ liveAddr = checkConnection(new ArrayList<>(nodeAddrs), backwardCheckTimeout);
+
+ if (log.isInfoEnabled()) {
+ log.info("Connection check to previous node done: [liveAddr=" + liveAddr
+ + ", previousNode=" + U.toShortString(previous) + ", addressesToCheck=" +
+ nodeAddrs + ", connectingNodeId=" + nodeId + ']');
+ }
}
// If local node was able to connect to previous, confirm that it's alive.
@@ -6912,6 +6954,11 @@
}
}
+ if (log.isDebugEnabled()) {
+ log.debug("Sending handshake response [" + res + "] with timeout " +
+ spi.getEffectiveSocketTimeout(srvSock) + " to " + rmtAddr + ":" + sock.getPort());
+ }
+
spi.writeToSocket(sock, res, spi.getEffectiveSocketTimeout(srvSock));
// It can happen if a remote node is stopped and it has a loopback address in the list of addresses,
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java
index 992b0dd..3bca1b2 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java
@@ -21,7 +21,11 @@
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketTimeoutException;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.apache.ignite.Ignite;
@@ -206,6 +210,74 @@
}
/**
+ * Ensures sequential failure of two nodes has no additional issues.
+ */
+ @Test
+ public void testSequentialFailTwoNodes() throws Exception {
+ simulateFailureOfTwoNodes(true);
+ }
+
+ /**
+ * Ensures sequential failure of two nodes has no additional issues.
+ */
+ @Test
+ public void testNotSequentialFailTwoNodes() throws Exception {
+ simulateFailureOfTwoNodes(false);
+ }
+
+ /** */
+ private void simulateFailureOfTwoNodes(boolean sequentionally) throws Exception {
+ failureDetectionTimeout = 1000;
+
+ int gridCnt = 7;
+
+ startGrids(gridCnt);
+
+ awaitPartitionMapExchange();
+
+ final CountDownLatch failLatch = new CountDownLatch(2);
+
+ for (int i = 0; i < gridCnt; i++) {
+ ignite(i).events().localListen(evt -> {
+ failLatch.countDown();
+
+ return true;
+ }, EVT_NODE_FAILED);
+
+ int nodeIdx = i;
+
+ ignite(i).events().localListen(evt -> {
+ segmentedNodes.add(nodeIdx);
+
+ return true;
+ }, EVT_NODE_SEGMENTED);
+ }
+
+ Set<Integer> failedNodes = new HashSet<>();
+
+ failedNodes.add(2);
+
+ if (sequentionally)
+ failedNodes.add(3);
+ else
+ failedNodes.add(4);
+
+ failedNodes.forEach(idx -> processNetworkThreads(ignite(idx), Thread::suspend));
+
+ try {
+ failLatch.await(10, TimeUnit.SECONDS);
+ }
+ finally {
+ failedNodes.forEach(idx -> processNetworkThreads(ignite(idx), Thread::resume));
+ }
+
+ for (int i = 0; i < gridCnt; i++) {
+ if (!failedNodes.contains(i))
+ assertFalse(segmentedNodes.contains(i));
+ }
+ }
+
+ /**
* @param ig Ignite instance to get failedNodes collection from.
*/
private Map getFailedNodesCollection(IgniteEx ig) {
@@ -244,9 +316,9 @@
CommunicationSpi<?> comm = ignite.configuration().getCommunicationSpi();
- GridNioServer<?> nioServerWrapper = U.field(comm, "nioSrvr");
+ GridNioServer<?> gridNioServer = U.field(comm, "nioSrvr");
- for (GridWorker worker : nioServerWrapper.workers())
+ for (GridWorker worker : gridNioServer.workers())
proc.accept(worker.runner());
}
}