IGNITE-7163 Validate connection from a pre-previous node. - Fixes #4088.
Signed-off-by: Dmitriy Pavlov <dpavlov@apache.org>
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index bb76895..50bb383 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -251,6 +251,9 @@
/** Discovery state. */
protected TcpDiscoverySpiState spiState = DISCONNECTED;
+ /** Last time received message from ring. */
+ private volatile long lastRingMsgReceivedTime;
+
/** Map with proceeding ping requests. */
private final ConcurrentMap<InetSocketAddress, GridPingFutureAdapter<IgniteBiTuple<UUID, Boolean>>> pingMap =
new ConcurrentHashMap<>();
@@ -313,11 +316,18 @@
}
/** {@inheritDoc} */
+ @Override public long connectionCheckInterval() {
+ return msgWorker.connCheckFreq;
+ }
+
+ /** {@inheritDoc} */
@Override public void spiStart(String igniteInstanceName) throws IgniteSpiException {
synchronized (mux) {
spiState = DISCONNECTED;
}
+ lastRingMsgReceivedTime = 0;
+
utilityPool = new IgniteThreadPoolExecutor("disco-pool",
spi.ignite().name(),
0,
@@ -2726,8 +2736,8 @@
assert connCheckFreq > 0;
- if (log.isDebugEnabled())
- log.debug("Connection check frequency is calculated: " + connCheckFreq);
+ if (log.isInfoEnabled())
+ log.info("Connection check frequency is calculated: " + connCheckFreq);
}
/**
@@ -2905,7 +2915,7 @@
sendMessageToClients(msg);
- Collection<TcpDiscoveryNode> failedNodes;
+ List<TcpDiscoveryNode> failedNodes;
TcpDiscoverySpiState state;
@@ -2921,9 +2931,12 @@
boolean newNextNode = false;
+ // Used only if spi.getEffectiveConnectionRecoveryTimeout > 0
+ CrossRingMessageSendState sndState = null;
+
UUID locNodeId = getLocalNodeId();
- while (true) {
+ ringLoop: while (true) {
TcpDiscoveryNode newNext = ring.nextNode(failedNodes);
if (newNext == null) {
@@ -2947,6 +2960,8 @@
if (log.isDebugEnabled())
log.debug("New next node [newNext=" + newNext + ", formerNext=" + next +
", ring=" + ring + ", failedNodes=" + failedNodes + ']');
+ else if (log.isInfoEnabled())
+ log.info("New next node [newNext=" + newNext + ']');
if (debugMode)
debugLog(msg, "New next node [newNext=" + newNext + ", formerNext=" + next +
@@ -3003,12 +3018,52 @@
openSock = true;
// Handshake.
- spi.writeToSocket(sock, out, new TcpDiscoveryHandshakeRequest(locNodeId),
+ TcpDiscoveryHandshakeRequest hndMsg = new TcpDiscoveryHandshakeRequest(locNodeId);
+
+ // Topology treated as changes if next node is not available.
+ hndMsg.changeTopology(sndState != null && !sndState.isStartingPoint());
+
+ if (log.isDebugEnabled())
+ log.debug("Sending handshake [hndMsg=" + hndMsg + ", sndState=" + sndState + ']');
+
+ spi.writeToSocket(sock, out, hndMsg,
timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null,
timeoutHelper.nextTimeoutChunk(ackTimeout0));
+ if (log.isDebugEnabled())
+ log.debug("Handshake response: " + res);
+
+ if (res.previousNodeAlive() && sndState != null) {
+ // Remote node checked connection to it's previous and got success.
+ boolean previousNode = sndState.markLastFailedNodeAlive();
+
+ if (previousNode)
+ failedNodes.remove(failedNodes.size() - 1);
+ else {
+ newNextNode = false;
+
+ next = ring.nextNode(failedNodes);
+ }
+
+ U.closeQuiet(sock);
+
+ sock = null;
+
+ if (sndState.isFailed()) {
+ segmentLocalNodeOnSendFail();
+
+ return; // Nothing to do here.
+ }
+
+ if (previousNode)
+ U.warn(log, "New next node has connection to it's previous, trying previous " +
+ "again. [next=" + next + ']');
+
+ continue ringLoop;
+ }
+
if (locNodeId.equals(res.creatorNodeId())) {
if (log.isDebugEnabled())
log.debug("Handshake response from local node: " + res);
@@ -3298,7 +3353,12 @@
} // Iterating node's addresses.
if (!sent) {
- if (!failedNodes.contains(next)) {
+ if (sndState == null && spi.getEffectiveConnectionRecoveryTimeout() > 0)
+ sndState = new CrossRingMessageSendState();
+
+ boolean failedNextNode = sndState == null || sndState.markNextNodeFailed();
+
+ if (failedNextNode && !failedNodes.contains(next)) {
failedNodes.add(next);
if (state == CONNECTED) {
@@ -3313,6 +3373,26 @@
", errMsg=" + (err != null ? err.getMessage() : "N/A") + ']');
}
}
+ else if (!failedNextNode && sndState != null && sndState.isBackward()) {
+ boolean prev = sndState.markLastFailedNodeAlive();
+
+ U.warn(log, "Failed to send message to next node, try previous [msg=" + msg +
+ ", next=" + next + ']');
+
+ if (prev)
+ failedNodes.remove(failedNodes.size() - 1);
+ else {
+ newNextNode = false;
+
+ next = ring.nextNode(failedNodes);
+ }
+ }
+
+ if (sndState != null && sndState.isFailed()) {
+ segmentLocalNodeOnSendFail();
+
+ return; // Nothing to do here.
+ }
next = null;
@@ -3384,6 +3464,23 @@
}
/**
+ * Segment local node on failed message send.
+ */
+ private void segmentLocalNodeOnSendFail() {
+ U.warn(log, "Unable to connect to next nodes in a ring, " +
+ "it seems local node is experiencing connectivity issues. Segmenting local node " +
+ "to avoid case when one node fails a big part of cluster. To disable" +
+ " that behavior set TcpDiscoverySpi.setConnectionRecoveryTimeout() to 0. " +
+ "[connRecoveryTimeout=" + spi.connRecoveryTimeout + ", effectiveConnRecoveryTimeout="
+ + spi.getEffectiveConnectionRecoveryTimeout() + ']');
+
+ // Remove any queued messages to avoid new connect tries.
+ queue.clear();
+
+ notifyDiscovery(EVT_NODE_SEGMENTED, ring.topologyVersion(), locNode);
+ }
+
+ /**
* @param msg Message.
* @return Whether to redirect message to client nodes.
*/
@@ -5923,6 +6020,60 @@
if (req.client())
res.clientAck(true);
+ else if (req.changeTopology()) {
+ // Node cannot connect to it's next (for local node it's previous).
+ // Need to check connectivity to it.
+ long rcvdTime = lastRingMsgReceivedTime;
+ long now = U.currentTimeMillis();
+
+ // We got message from previous in less than double connection check interval.
+ boolean ok = rcvdTime + msgWorker.connCheckFreq * 2 >= now;
+
+ if (ok) {
+ // Check case when previous node suddenly died. This will speed up
+ // node failing.
+ Set<TcpDiscoveryNode> failed;
+
+ synchronized (mux) {
+ failed = failedNodes.keySet();
+ }
+
+ TcpDiscoveryNode previous = ring.previousNode(failed);
+
+ InetSocketAddress liveAddr = null;
+
+ if (previous != null && !previous.id().equals(nodeId)) {
+ Collection<InetSocketAddress> nodeAddrs =
+ spi.getNodeAddresses(previous, false);
+
+ for (InetSocketAddress addr : nodeAddrs) {
+ // Connection refused may be got if node doesn't listen
+ // (or blocked by firewall, but anyway assume it is dead).
+ if (!isConnectionRefused(addr)) {
+ liveAddr = addr;
+
+ break;
+ }
+ }
+
+ if (log.isInfoEnabled())
+ log.info("Connection check done: [liveAddr=" + liveAddr
+ + ", previousNode=" + previous + ", addressesToCheck=" + nodeAddrs
+ + ", connectingNodeId=" + nodeId + ']');
+ }
+
+ // If local node was able to connect to previous, confirm that it's alive.
+ ok = liveAddr != null && (!liveAddr.getAddress().isLoopbackAddress()
+ || !locNode.socketAddresses().contains(liveAddr));
+ }
+
+ res.previousNodeAlive(ok);
+
+ if (log.isInfoEnabled()) {
+ log.info("Previous node alive: [alive=" + ok + ", lastMessageReceivedTime="
+ + rcvdTime + ", now=" + now + ", connCheckFreq=" + msgWorker.connCheckFreq + ']');
+ }
+ }
spi.writeToSocket(sock, res, spi.getEffectiveSocketTimeout(srvSock));
@@ -6062,6 +6213,8 @@
debugLog(msg, "Message has been received: " + msg);
if (msg instanceof TcpDiscoveryConnectionCheckMessage) {
+ ringMessageReceived();
+
spi.writeToSocket(msg, sock, RES_OK, sockTimeout);
continue;
@@ -6239,6 +6392,8 @@
continue;
}
else if (msg instanceof TcpDiscoveryRingLatencyCheckMessage) {
+ ringMessageReceived();
+
if (log.isInfoEnabled())
log.info("Latency check message has been read: " + msg.id());
@@ -6249,8 +6404,11 @@
if (msg instanceof TcpDiscoveryClientMetricsUpdateMessage)
metricsUpdateMsg = (TcpDiscoveryClientMetricsUpdateMessage)msg;
- else
+ else {
+ ringMessageReceived();
+
msgWorker.addMessage(msg);
+ }
// Send receipt back.
if (clientMsgWrk != null) {
@@ -6339,6 +6497,31 @@
}
/**
+ * Update last ring message received timestamp.
+ */
+ private void ringMessageReceived() {
+ lastRingMsgReceivedTime = U.currentTimeMillis();
+ }
+
+ /**
+ * @param addr Address to check.
+ * @return {@code True} if got connection refused on connect try.
+ */
+ private boolean isConnectionRefused(SocketAddress addr) {
+ try (Socket sock = new Socket()) {
+ sock.connect(addr, 100);
+ }
+ catch (ConnectException e) {
+ return true;
+ }
+ catch (IOException e) {
+ return false;
+ }
+
+ return false;
+ }
+
+ /**
* Processes client reconnect message.
*
* @param msg Client reconnect message.
@@ -6967,4 +7150,131 @@
this.sock = sock;
}
}
+
+ /**
+ *
+ */
+ private enum RingMessageSendState {
+ /** */
+ STARTING_POINT,
+
+ /** */
+ FORWARD_PASS,
+
+ /** */
+ BACKWARD_PASS,
+
+ /** */
+ FAILED
+ }
+
+ /**
+ * Initial state is {@link RingMessageSendState#STARTING_POINT}.<br>
+ * States could be switched:<br>
+ * {@link RingMessageSendState#STARTING_POINT} => {@link RingMessageSendState#FORWARD_PASS} when next node failed.<br>
+ * {@link RingMessageSendState#FORWARD_PASS} => {@link RingMessageSendState#FORWARD_PASS} when new next node failed.<br>
+ * {@link RingMessageSendState#FORWARD_PASS} => {@link RingMessageSendState#BACKWARD_PASS} when new next node has
+ * connection to it's previous node and forces local node to try it again.<br>
+ * {@link RingMessageSendState#BACKWARD_PASS} => {@link RingMessageSendState#BACKWARD_PASS} when previously tried node
+ * has connection to it's previous and forces local node to try it again.<br>
+ * {@link RingMessageSendState#BACKWARD_PASS} => {@link RingMessageSendState#STARTING_POINT} when local node came back
+ * to initial next node and no topology changes should be performed.<br>
+ * {@link RingMessageSendState#BACKWARD_PASS} => {@link RingMessageSendState#FAILED} when recovery timeout is over and
+ * all new next nodes have connections to their previous nodes. That means local node has connectivity
+ * issue and should be stopped.<br>
+ */
+ private class CrossRingMessageSendState {
+ /** */
+ private RingMessageSendState state = RingMessageSendState.STARTING_POINT;
+
+ /** */
+ private int failedNodes;
+
+ /** */
+ private final long failTime;
+
+ /**
+ *
+ */
+ CrossRingMessageSendState() {
+ failTime = spi.getEffectiveConnectionRecoveryTimeout() + U.currentTimeMillis();
+ }
+
+ /**
+ * @return {@code True} if state is {@link RingMessageSendState#STARTING_POINT}.
+ */
+ boolean isStartingPoint() {
+ return state == RingMessageSendState.STARTING_POINT;
+ }
+
+ /**
+ * @return {@code True} if state is {@link RingMessageSendState#BACKWARD_PASS}.
+ */
+ boolean isBackward() {
+ return state == RingMessageSendState.BACKWARD_PASS;
+ }
+
+ /**
+ * @return {@code True} if state is {@link RingMessageSendState#FAILED}.
+ */
+ boolean isFailed() {
+ return state == RingMessageSendState.FAILED;
+ }
+
+ /**
+ * Marks next node as failed.
+ *
+ * @return {@code True} node marked as failed.
+ */
+ boolean markNextNodeFailed() {
+ if (state == RingMessageSendState.STARTING_POINT || state == RingMessageSendState.FORWARD_PASS) {
+ state = RingMessageSendState.FORWARD_PASS;
+
+ failedNodes++;
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * Marks last failed node as alive.
+ *
+ * @return {@code False} if all failed nodes marked as alive or incorrect state.
+ */
+ boolean markLastFailedNodeAlive() {
+ if (state == RingMessageSendState.FORWARD_PASS || state == RingMessageSendState.BACKWARD_PASS) {
+ state = RingMessageSendState.BACKWARD_PASS;
+
+ if (--failedNodes <= 0) {
+ failedNodes = 0;
+
+ if (U.currentTimeMillis() >= failTime) {
+ state = RingMessageSendState.FAILED;
+
+ return false;
+ }
+
+ state = RingMessageSendState.STARTING_POINT;
+
+ try {
+ Thread.sleep(200);
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(CrossRingMessageSendState.class, this);
+ }
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
index 00d83dd..64cc2b9 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
@@ -248,6 +248,13 @@
}
/**
+ * @return connection check interval.
+ */
+ public long connectionCheckInterval() {
+ return 0;
+ }
+
+ /**
* @throws IgniteSpiException If failed.
*/
public abstract void spiStop() throws IgniteSpiException;
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 801f2b6..0c7a56f 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -55,9 +55,9 @@
import org.apache.ignite.configuration.AddressResolver;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener;
-import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
@@ -277,6 +277,9 @@
/** Maximum ack timeout value for receiving message acknowledgement in milliseconds (value is <tt>600,000ms</tt>). */
public static final long DFLT_MAX_ACK_TIMEOUT = 10 * 60 * 1000;
+ /** Default connection recovery timeout in ms. */
+ public static final long DFLT_CONNECTION_RECOVERY_TIMEOUT = IgniteConfiguration.DFLT_FAILURE_DETECTION_TIMEOUT;
+
/** Ssl message pattern for StreamCorruptedException. */
private static Pattern sslMsgPattern = Pattern.compile("invalid stream header: 150\\d0\\d00");
@@ -311,6 +314,9 @@
/** Size of topology snapshots history. */
protected int topHistSize = DFLT_TOP_HISTORY_SIZE;
+ /** Default connection recovery timeout in ms. */
+ protected long connRecoveryTimeout = DFLT_CONNECTION_RECOVERY_TIMEOUT;
+
/** Grid discovery listener. */
protected volatile DiscoverySpiListener lsnr;
@@ -994,6 +1000,54 @@
return this;
}
+ /**
+ * Gets timeout that defines how long server node would try to recovery connection.<br>
+ * See {@link #setConnectionRecoveryTimeout(long)} for details.
+ *
+ * @return Timeout that defines how long server node would try to recovery connection.
+ */
+ public long getConnectionRecoveryTimeout() {
+ return connRecoveryTimeout;
+ }
+
+ /**
+ * @return Connection recovery timeout that is not greater than failureDetectionTimeout if enabled.
+ */
+ long getEffectiveConnectionRecoveryTimeout() {
+ if (failureDetectionTimeoutEnabled() && failureDetectionTimeout() < connRecoveryTimeout)
+ return failureDetectionTimeout();
+
+ return connRecoveryTimeout;
+ }
+
+ /**
+ * Sets timeout that defines how long server node would try to recovery connection.
+ * <p>In case local node has temporary connectivity issues with part of the cluster,
+ * it may sequentially fail nodes one-by-one till successfully connect to one that
+ * has a fine connection with.
+ * This leads to fail of big number of nodes.
+ * </p>
+ * <p>
+ * To overcome that issue, local node will do a sequential connection tries to next
+ * nodes. But if new next node has connection to previous it forces local node to
+ * retry connect to previous. These tries will last till timeout will not
+ * finished. When timeout is over, but no success in connecting to nodes it will
+ * segment itself.
+ * </p>
+ * <p>
+ * Cannot be greater than {@link #failureDetectionTimeout()}.
+ * </p>
+ * <p>
+ * Default is {@link #DFLT_CONNECTION_RECOVERY_TIMEOUT}.
+ * </p>
+ *
+ * @param connRecoveryTimeout Timeout that defines how long server node would try to recovery connection.
+ * {@code 0} means node will not recheck failed nodes.
+ */
+ public void setConnectionRecoveryTimeout(long connRecoveryTimeout) {
+ this.connRecoveryTimeout = connRecoveryTimeout;
+ }
+
/** {@inheritDoc} */
@Override public void setNodeAttributes(Map<String, Object> attrs, IgniteProductVersion ver) {
assert locNodeAttrs == null;
@@ -2438,6 +2492,11 @@
}
/** {@inheritDoc} */
+ @Override public long getConnectionCheckInterval() {
+ return impl.connectionCheckInterval();
+ }
+
+ /** {@inheritDoc} */
@Override public boolean isClientMode() {
return TcpDiscoverySpi.this.isClientMode();
}
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiMBean.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiMBean.java
index cb0fd36..176cc07 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiMBean.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiMBean.java
@@ -56,6 +56,14 @@
public int getReconnectCount();
/**
+ * Gets connection check interval in ms.
+ *
+ * @return Number of connection attempts.
+ */
+ @MXBeanDescription("Connection check interval.")
+ public long getConnectionCheckInterval();
+
+ /**
* Gets network timeout.
*
* @return Network timeout.
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
index 54ddc9e..7fc394b 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
@@ -17,18 +17,6 @@
package org.apache.ignite.spi.discovery.tcp.internal;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.util.tostring.GridToStringExclude;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.P1;
-import org.apache.ignite.internal.util.typedef.PN;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgnitePredicate;
-import org.apache.ignite.lang.IgniteProductVersion;
-import org.jetbrains.annotations.Nullable;
-
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -40,6 +28,17 @@
import java.util.UUID;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.P1;
+import org.apache.ignite.internal.util.typedef.PN;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lang.IgniteProductVersion;
+import org.jetbrains.annotations.Nullable;
/**
* Convenient way to represent topology for {@link org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi}
@@ -506,6 +505,42 @@
}
/**
+ * Finds previous node in the topology filtering excluded nodes from search.
+ * <p>
+ * This may be used when detecting and handling nodes failure.
+ *
+ * @param excluded Nodes to exclude from the search (optional). If provided,
+ * cannot contain local node.
+ * @return Previous node or {@code null} if all nodes were filtered out or
+ * topology contains less than two nodes.
+ */
+ @Nullable public TcpDiscoveryNode previousNode(@Nullable Collection<TcpDiscoveryNode> excluded) {
+ rwLock.readLock().lock();
+
+ try {
+ Collection<TcpDiscoveryNode> filtered = serverNodes(excluded);
+
+ if (filtered.size() < 2)
+ return null;
+
+ TcpDiscoveryNode previous = null;
+
+ // Get last node that is previous in a ring
+ for (TcpDiscoveryNode node : filtered) {
+ if (locNode.equals(node) && previous != null)
+ break;
+
+ previous = node;
+ }
+
+ return previous;
+ }
+ finally {
+ rwLock.readLock().unlock();
+ }
+ }
+
+ /**
* Gets current topology version.
*
* @return Current topology version.
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
index a23cb63..93d8bcf 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
@@ -46,6 +46,9 @@
protected static final int CLIENT_RECON_SUCCESS_FLAG_POS = 2;
/** */
+ protected static final int CHANGE_TOPOLOGY_FLAG_POS = 3;
+
+ /** */
protected static final int CLIENT_ACK_FLAG_POS = 4;
/** */
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHandshakeRequest.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHandshakeRequest.java
index ea5b868..90c9f94 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHandshakeRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHandshakeRequest.java
@@ -36,8 +36,29 @@
super(creatorNodeId);
}
+ /**
+ * Gets topology change flag.<br>
+ * {@code True} means node intent to fail nodes in a ring.
+ *
+ * @return Change topology flag.
+ */
+ public boolean changeTopology() {
+ return getFlag(CHANGE_TOPOLOGY_FLAG_POS);
+ }
+
+ /**
+ * Gets topology change flag.<br>
+ * {@code True} means node intent to fail nodes in a ring.
+ *
+ * @param changeTop Change topology flag.
+ */
+ public void changeTopology(boolean changeTop) {
+ setFlag(CHANGE_TOPOLOGY_FLAG_POS, changeTop);
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
- return S.toString(TcpDiscoveryHandshakeRequest.class, this, "super", super.toString());
+ return S.toString(TcpDiscoveryHandshakeRequest.class, this, "super", super.toString(),
+ "isChangeTopology", changeTopology());
}
}
\ No newline at end of file
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHandshakeResponse.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHandshakeResponse.java
index 0d350af..75df5c2 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHandshakeResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHandshakeResponse.java
@@ -43,6 +43,26 @@
}
/**
+ * Gets previous node alive flag.<br>
+ * {@code True} means node has connectivity to it's previous node in a ring.
+ *
+ * @return previous node alive flag.
+ */
+ public boolean previousNodeAlive() {
+ return getFlag(CHANGE_TOPOLOGY_FLAG_POS);
+ }
+
+ /**
+ * Sets topology change flag.<br>
+ * {@code True} means node has connectivity to it's previous node in a ring.
+ *
+ * @param prevNodeAlive previous node alive flag.
+ */
+ public void previousNodeAlive(boolean prevNodeAlive) {
+ setFlag(CHANGE_TOPOLOGY_FLAG_POS, prevNodeAlive);
+ }
+
+ /**
* Gets order of the node sent the response.
*
* @return Order of the node sent the response.
@@ -76,6 +96,7 @@
/** {@inheritDoc} */
@Override public String toString() {
- return S.toString(TcpDiscoveryHandshakeResponse.class, this, "super", super.toString());
+ return S.toString(TcpDiscoveryHandshakeResponse.class, this, "super", super.toString(),
+ "isPreviousNodeAlive", previousNodeAlive());
}
}
\ No newline at end of file
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteDiscoveryMassiveNodeFailTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteDiscoveryMassiveNodeFailTest.java
new file mode 100644
index 0000000..32ce978
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteDiscoveryMassiveNodeFailTest.java
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Tests checks case when one node is unable to connect to next in a ring,
+ * but those nodes are not experiencing any connectivity troubles between
+ * each other.
+ */
+public class IgniteDiscoveryMassiveNodeFailTest extends GridCommonAbstractTest {
+ /** */
+ private static final int FAILURE_DETECTION_TIMEOUT = 5_000;
+
+ /** */
+ private Set<InetSocketAddress> failedAddrs = new GridConcurrentHashSet<>();
+
+ /** */
+ private volatile TcpDiscoveryNode compromisedNode;
+
+ /** */
+ private volatile boolean forceFailConnectivity;
+
+ /** */
+ private volatile boolean failNodes;
+
+ /** */
+ private long timeout;
+
+ /** */
+ private volatile Set<ClusterNode> failedNodes = Collections.emptySet();
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ FailDiscoverySpi disco = new FailDiscoverySpi();
+
+ disco.setIpFinder(LOCAL_IP_FINDER);
+
+ cfg.setDiscoverySpi(disco);
+
+ disco.setConnectionRecoveryTimeout(timeout);
+
+ cfg.setFailureDetectionTimeout(FAILURE_DETECTION_TIMEOUT);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ System.setProperty(IgniteSystemProperties.IGNITE_DUMP_THREADS_ON_FAILURE, "false");
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ super.afterTestsStopped();
+
+ System.setProperty(IgniteSystemProperties.IGNITE_DUMP_THREADS_ON_FAILURE, "true");
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ timeout = 2_000;
+ failNodes = false;
+ forceFailConnectivity = false;
+ }
+
+ /**
+ * Node fails 2 nodes when connection check is disabled.
+ *
+ * @throws Exception If failed.
+ */
+ public void testMassiveFailDisabledRecovery() throws Exception {
+ timeout = 0; // Disable previous node check.
+
+ doFailNodes(false);
+ }
+
+ /**
+ *
+ */
+ private void doFailNodes(boolean simulateNodeFailure) throws Exception {
+ startGrids(5);
+
+ grid(0).events().enabledEvents();
+
+ failedNodes = new HashSet<>(Arrays.asList(grid(3).cluster().localNode(), grid(4).cluster().localNode()));
+
+ CountDownLatch latch = new CountDownLatch(failedNodes.size());
+
+ grid(0).events().localListen(e -> {
+ DiscoveryEvent evt = (DiscoveryEvent)e;
+
+ if (failedNodes.contains(evt.eventNode()))
+ latch.countDown();
+
+ return true;
+ }, EventType.EVT_NODE_FAILED);
+
+ compromisedNode = (TcpDiscoveryNode)grid(2).localNode();
+
+ for (int i = 3; i < 5; i++)
+ failedAddrs.addAll(((TcpDiscoveryNode)grid(i).localNode()).socketAddresses());
+
+ System.out.println(">> Start failing nodes");
+
+ forceFailConnectivity = true;
+
+ if (simulateNodeFailure) {
+ for (int i = 3; i < 5; i++)
+ ((TcpDiscoverySpi)grid(i).configuration().getDiscoverySpi()).simulateNodeFailure();
+ }
+
+ assert latch.await(waitTime(), TimeUnit.MILLISECONDS);
+
+ assertEquals(3, grid(0).cluster().forServers().nodes().size());
+ }
+
+ /**
+ *
+ */
+ private long waitTime() {
+ return timeout + 5000;
+ }
+
+ /**
+ * Node fail itself.
+ *
+ * @throws Exception If failed.
+ */
+ public void testMassiveFailSelfKill() throws Exception {
+ startGrids(5);
+
+ grid(0).events().enabledEvents();
+
+ CountDownLatch latch = new CountDownLatch(1);
+
+ grid(0).events().localListen((e) -> {
+ DiscoveryEvent evt = (DiscoveryEvent)e;
+
+ if (evt.eventNode().equals(compromisedNode))
+ latch.countDown();
+
+ return true;
+ }, EventType.EVT_NODE_FAILED);
+
+ compromisedNode = (TcpDiscoveryNode)grid(2).localNode();
+
+ for (int i = 3; i < 5; i++)
+ failedAddrs.addAll(((TcpDiscoveryNode)grid(i).localNode()).socketAddresses());
+
+ System.out.println(">> Start failing nodes");
+
+ forceFailConnectivity = true;
+
+ assert latch.await(waitTime(), TimeUnit.MILLISECONDS);
+
+ assertEquals(4, grid(0).cluster().forServers().nodes().size());
+ }
+
+ /**
+ * When connectivity restored, no topology changes will be applied.
+ *
+ * @throws Exception If failed.
+ */
+ public void testMassiveFailAndRecovery() throws Exception {
+ startGrids(5);
+
+ grid(0).events().enabledEvents();
+
+ CountDownLatch latch = new CountDownLatch(1);
+
+ grid(0).events().localListen(e -> {
+ DiscoveryEvent evt = (DiscoveryEvent)e;
+
+ if (evt.eventNode().equals(compromisedNode))
+ latch.countDown();
+
+ return true;
+ }, EventType.EVT_NODE_FAILED);
+
+ compromisedNode = (TcpDiscoveryNode)grid(2).localNode();
+
+ for (int i = 3; i < 5; i++)
+ failedAddrs.addAll(((TcpDiscoveryNode)grid(i).localNode()).socketAddresses());
+
+ System.out.println(">> Start failing nodes");
+
+ forceFailConnectivity = true;
+
+ doSleep(timeout / 4); // wait 1 try
+
+ forceFailConnectivity = false;
+
+ System.out.println(">> Stop failing nodes");
+
+ assert !latch.await(waitTime(), TimeUnit.MILLISECONDS);
+
+ // Topology is not changed
+ assertEquals(5, grid(0).cluster().forServers().nodes().size());
+ assertEquals(5, grid(0).cluster().topologyVersion());
+ }
+
+ /**
+ * Regular nodes fail by timeout.
+ *
+ * @throws Exception If failed.
+ */
+ public void testMassiveFail() throws Exception {
+ failNodes = true;
+
+ // Must be greater than failureDetectionTimeout / 3 as it calculated into
+ // connection check frequency.
+ timeout = FAILURE_DETECTION_TIMEOUT;
+
+ doFailNodes(false);
+ }
+
+ /**
+ * Regular node fail by crash. Should be faster due to
+ *
+ *
+ * @throws Exception If failed.
+ */
+ public void testMassiveFailForceNodeFail() throws Exception {
+ failNodes = true;
+
+ // Must be greater than failureDetectionTimeout / 3 as it calculated into
+ // connection check frequency.
+ timeout = FAILURE_DETECTION_TIMEOUT / 2;
+
+ doFailNodes(true);
+ }
+
+ /**
+ * Check that cluster recovers from temporal connection breakage.
+ *
+ * @throws Exception If failed.
+ */
+ public void testRecoveryOnDisconnect() throws Exception {
+ startGrids(3);
+
+ IgniteEx ignite1 = grid(1);
+ IgniteEx ignite2 = grid(2);
+
+ ((TcpDiscoverySpi)ignite1.configuration().getDiscoverySpi()).brakeConnection();
+ ((TcpDiscoverySpi)ignite2.configuration().getDiscoverySpi()).brakeConnection();
+
+ doSleep(FAILURE_DETECTION_TIMEOUT);
+
+ assertEquals(3, grid(0).cluster().nodes().size());
+ assertEquals(3, grid(1).cluster().nodes().size());
+ assertEquals(3, grid(2).cluster().nodes().size());
+ }
+
+ /**
+ *
+ */
+ private class FailDiscoverySpi extends TcpDiscoverySpi {
+ /** {@inheritDoc} */
+ @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, byte[] data,
+ long timeout) throws IOException {
+ assertNotFailedNode(sock);
+
+ if (isDrop(msg))
+ return;
+
+ super.writeToSocket(sock, msg, data, timeout);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
+ long timeout) throws IOException, IgniteCheckedException {
+ assertNotFailedNode(sock);
+
+ if (isDrop(msg))
+ return;
+
+ super.writeToSocket(sock, msg, timeout);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeToSocket(ClusterNode node, Socket sock, OutputStream out,
+ TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, IgniteCheckedException {
+ assertNotFailedNode(sock);
+
+ if (isDrop(msg))
+ return;
+
+ super.writeToSocket(node, sock, out, msg, timeout);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg,
+ long timeout) throws IOException, IgniteCheckedException {
+ assertNotFailedNode(sock);
+
+ if (isDrop(msg))
+ return;
+
+ super.writeToSocket(sock, out, msg, timeout);
+ }
+
+ /**
+ *
+ */
+ private boolean isDrop(TcpDiscoveryAbstractMessage msg) {
+ boolean drop = failNodes && forceFailConnectivity && failedNodes.contains(ignite.cluster().localNode());
+
+ if (drop)
+ ignite.log().info(">> Drop message " + msg);
+
+ return drop;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res,
+ long timeout) throws IOException {
+ assertNotFailedNode(sock);
+
+ if (isDrop(msg))
+ return;
+
+ super.writeToSocket(msg, sock, res, timeout);
+ }
+
+ /**
+ * @param sock Socket.
+ * @throws IOException To break connection.
+ */
+ @SuppressWarnings("SuspiciousMethodCalls")
+ private void assertNotFailedNode(Socket sock) throws IOException {
+ if (forceFailConnectivity && getLocalNode().equals(compromisedNode) && failedAddrs.contains(sock.getRemoteSocketAddress())) {
+ log.info(">> Force fail connection " + sock.getRemoteSocketAddress());
+
+ throw new IOException("Force fail connection " + sock.getRemoteSocketAddress());
+ }
+ }
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
index f1c826a..c167a90 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
@@ -62,6 +62,9 @@
/** */
private static boolean useTestSpi;
+ /** */
+ private static boolean disableTopChangeRecovery;
+
/** {@inheritDoc} */
@Override protected boolean useFailureDetectionTimeout() {
return true;
@@ -89,7 +92,19 @@
/** {@inheritDoc} */
@Override protected TcpDiscoverySpi getDiscoverySpi() {
- return useTestSpi ? new TestTcpDiscoverySpi2() : super.getDiscoverySpi();
+ TcpDiscoverySpi spi = useTestSpi ? new TestTcpDiscoverySpi2() : super.getDiscoverySpi();
+
+ if (disableTopChangeRecovery)
+ spi.setConnectionRecoveryTimeout(0);
+
+ return spi;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ disableTopChangeRecovery = false;
}
/**
@@ -202,6 +217,7 @@
failureThreshold = 1000;
clientFailureDetectionTimeout = 10000;
useTestSpi = true;
+ disableTopChangeRecovery = true;
try {
startServerNodes(3);
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySegmentationPolicyTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySegmentationPolicyTest.java
index df76afc..7d4d802 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySegmentationPolicyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySegmentationPolicyTest.java
@@ -45,6 +45,9 @@
if (igniteInstanceName.endsWith("2"))
cfg.setFailureHandler(new TestFailureHandler());
+ // Disable recovery
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setConnectionRecoveryTimeout(0);
+
return cfg;
}
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
index d50a967..1aae8fb 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
@@ -90,6 +90,7 @@
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.eclipse.jetty.util.ConcurrentHashSet;
+import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import static java.util.concurrent.TimeUnit.SECONDS;
@@ -1675,15 +1676,15 @@
try {
final int FAIL_ORDER = 3;
- nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER));
+ nodeSpi.set(createFailedNodeSpi(FAIL_ORDER));
final Ignite ignite0 = startGrid(0);
- nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER));
+ nodeSpi.set(createFailedNodeSpi(FAIL_ORDER));
startGrid(1);
- nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER));
+ nodeSpi.set(createFailedNodeSpi(FAIL_ORDER));
Ignite ignite2 = startGrid(2);
@@ -1699,6 +1700,18 @@
}
/**
+ * @param failOrder Fail order.
+ * @return Failed node spi.
+ */
+ @NotNull private TestFailedNodesSpi createFailedNodeSpi(int failOrder) {
+ TestFailedNodesSpi spi = new TestFailedNodesSpi(failOrder);
+
+ spi.setConnectionRecoveryTimeout(0);
+
+ return spi;
+ }
+
+ /**
* Coordinator is added in failed list, concurrent nodes start.
*
* @throws Exception If failed.
@@ -1707,11 +1720,11 @@
try {
final int FAIL_ORDER = 3;
- nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER));
+ nodeSpi.set(createFailedNodeSpi(FAIL_ORDER));
Ignite ignite0 = startGrid(0);
- nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER));
+ nodeSpi.set(createFailedNodeSpi(FAIL_ORDER));
startGrid(1);
@@ -1721,7 +1734,7 @@
@Override public Void call() throws Exception {
int idx = nodeIdx.incrementAndGet();
- nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER));
+ nodeSpi.set(createFailedNodeSpi(FAIL_ORDER));
startGrid(idx);
@@ -1749,11 +1762,11 @@
*/
public void testFailedNodes3() throws Exception {
try {
- nodeSpi.set(new TestFailedNodesSpi(-1));
+ nodeSpi.set(createFailedNodeSpi(-1));
Ignite ignite0 = startGrid(0);
- nodeSpi.set(new TestFailedNodesSpi(2));
+ nodeSpi.set(createFailedNodeSpi(2));
Ignite ignite1 = startGrid(1);
@@ -1784,15 +1797,15 @@
try {
final int FAIL_ORDER = 3;
- nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER));
+ nodeSpi.set(createFailedNodeSpi(FAIL_ORDER));
final Ignite ignite0 = startGrid(0);
- nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER));
+ nodeSpi.set(createFailedNodeSpi(FAIL_ORDER));
Ignite ignite1 = startGrid(1);
- TestFailedNodesSpi spi = new TestFailedNodesSpi(FAIL_ORDER);
+ TestFailedNodesSpi spi = createFailedNodeSpi(FAIL_ORDER);
spi.stopBeforeSndFail = true;
@@ -1831,7 +1844,7 @@
final int NODES = iter == 0 ? 2 : rnd.nextInt(3, 6);
for (int i = 0; i < NODES; i++) {
- nodeSpi.set(new TestFailedNodesSpi(-1));
+ nodeSpi.set(createFailedNodeSpi(-1));
startGrid(i);
}
@@ -2084,7 +2097,11 @@
TestRestoreConnectedSpi.startTest = false;
for (int i = 1; i < 5; i++) {
- nodeSpi.set(new TestRestoreConnectedSpi(3));
+ TestRestoreConnectedSpi spi = new TestRestoreConnectedSpi(3);
+
+ spi.setConnectionRecoveryTimeout(0);
+
+ nodeSpi.set(spi);
startGrid(i);
}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
index ef582a5..1d10b4e 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
@@ -18,6 +18,7 @@
package org.apache.ignite.testsuites;
import junit.framework.TestSuite;
+import org.apache.ignite.internal.IgniteDiscoveryMassiveNodeFailTest;
import org.apache.ignite.spi.GridTcpSpiForwardingSelfTest;
import org.apache.ignite.spi.discovery.AuthenticationRestartTest;
import org.apache.ignite.spi.discovery.FilterDataForClientNodeDiscoveryTest;
@@ -106,6 +107,8 @@
suite.addTest(new TestSuite(TcpDiscoverySpiReconnectDelayTest.class));
+ suite.addTest(new TestSuite(IgniteDiscoveryMassiveNodeFailTest.class));
+
// Client connect.
suite.addTest(new TestSuite(IgniteClientConnectTest.class));
suite.addTest(new TestSuite(IgniteClientReconnectMassiveShutdownTest.class));