fixed slow exchange on topology startup
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteEventsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteEventsImpl.java
index 505bc9d..3c6218d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteEventsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteEventsImpl.java
@@ -108,9 +108,16 @@
guard();
try {
+ GridEventConsumeHandler hnd = new GridEventConsumeHandler((IgniteBiPredicate<UUID, Event>)locLsnr,
+ (IgnitePredicate<Event>)rmtFilter, types);
+
return saveOrGet(ctx.continuous().startRoutine(
- new GridEventConsumeHandler((IgniteBiPredicate<UUID, Event>)locLsnr,
- (IgnitePredicate<Event>)rmtFilter, types), bufSize, interval, autoUnsubscribe, prj.predicate()));
+ hnd,
+ false,
+ bufSize,
+ interval,
+ autoUnsubscribe,
+ prj.predicate()));
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java
index 17c06fc..2800777 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java
@@ -184,7 +184,12 @@
try {
GridContinuousHandler hnd = new GridMessageListenHandler(topic, (IgniteBiPredicate<UUID, Object>)p);
- return saveOrGet(ctx.continuous().startRoutine(hnd, 1, 0, false, prj.predicate()));
+ return saveOrGet(ctx.continuous().startRoutine(hnd,
+ false,
+ 1,
+ 0,
+ false,
+ prj.predicate()));
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index 68ad8b6..5d74a6d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -1240,7 +1240,7 @@
}
/**
- * Gets a name of the grid, which is owner of current thread. An Exception is thrown if
+ * Gets the grid, which is owner of current thread. An Exception is thrown if
* current thread is not an {@link IgniteThread}.
*
* @return Grid instance related to current thread
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index c01f636..f9c33c1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -648,6 +648,7 @@
UUID id = cctx.kernalContext().continuous().startRoutine(
hnd,
+ internal && loc,
bufSize,
timeInterval,
autoUnsubscribe,
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index c6503e0..fd798df 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -541,11 +541,13 @@
* @param bufSize Buffer size.
* @param interval Time interval.
* @param autoUnsubscribe Automatic unsubscribe flag.
+ * @param locOnly Local only flag.
* @param prjPred Projection predicate.
* @return Future.
*/
@SuppressWarnings("TooBroadScope")
public IgniteInternalFuture<UUID> startRoutine(GridContinuousHandler hnd,
+ boolean locOnly,
int bufSize,
long interval,
boolean autoUnsubscribe,
@@ -554,12 +556,30 @@
assert bufSize > 0;
assert interval >= 0;
- // Whether local node is included in routine.
- boolean locIncluded = prjPred == null || prjPred.apply(ctx.discovery().localNode());
-
// Generate ID.
final UUID routineId = UUID.randomUUID();
+ // Register routine locally.
+ locInfos.put(routineId, new LocalRoutineInfo(prjPred, hnd, bufSize, interval, autoUnsubscribe));
+
+ if (locOnly) {
+ try {
+ registerHandler(ctx.localNodeId(), routineId, hnd, bufSize, interval, autoUnsubscribe, true);
+
+ hnd.onListenerRegistered(routineId, ctx);
+
+ return new GridFinishedFuture<>(routineId);
+ }
+ catch (IgniteCheckedException e) {
+ unregisterHandler(routineId, hnd, true);
+
+ return new GridFinishedFuture<>(e);
+ }
+ }
+
+ // Whether local node is included in routine.
+ boolean locIncluded = prjPred == null || prjPred.apply(ctx.discovery().localNode());
+
StartRequestData reqData = new StartRequestData(prjPred, hnd.clone(), bufSize, interval, autoUnsubscribe);
try {
@@ -614,9 +634,6 @@
});
}
- // Register routine locally.
- locInfos.put(routineId, new LocalRoutineInfo(prjPred, hnd, bufSize, interval, autoUnsubscribe));
-
StartFuture fut = new StartFuture(ctx, routineId);
startFuts.put(routineId, fut);
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteNodeValidationResult.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteNodeValidationResult.java
index 2473a9e..3dd4caf 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteNodeValidationResult.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteNodeValidationResult.java
@@ -18,6 +18,7 @@
package org.apache.ignite.spi;
import java.util.UUID;
+import org.apache.ignite.internal.util.typedef.internal.S;
/**
* Result of joining node validation.
@@ -63,4 +64,9 @@
public String sendMessage() {
return sndMsg;
}
-}
\ No newline at end of file
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(IgniteNodeValidationResult.class, this);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 5c7ffbd..b082ba2 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -2570,6 +2570,9 @@
}
finally {
if (!success) {
+ if (log.isDebugEnabled())
+ log.debug("Closing socket to next: " + next);
+
U.closeQuiet(sock);
sock = null;
@@ -2733,6 +2736,9 @@
forceSndPending = false;
if (!sent) {
+ if (log.isDebugEnabled())
+ log.debug("Closing socket to next (not sent): " + next);
+
U.closeQuiet(sock);
sock = null;
@@ -2883,12 +2889,12 @@
*
* @param msg Join request message.
*/
- private void processJoinRequestMessage(TcpDiscoveryJoinRequestMessage msg) {
+ private void processJoinRequestMessage(final TcpDiscoveryJoinRequestMessage msg) {
assert msg != null;
- TcpDiscoveryNode node = msg.node();
+ final TcpDiscoveryNode node = msg.node();
- UUID locNodeId = getLocalNodeId();
+ final UUID locNodeId = getLocalNodeId();
if (!msg.client()) {
boolean rmtHostLoopback = node.socketAddresses().size() == 1 &&
@@ -3096,132 +3102,151 @@
}
}
- IgniteNodeValidationResult err = spi.getSpiContext().validateNode(node);
+ final IgniteNodeValidationResult err = spi.getSpiContext().validateNode(node);
if (err != null) {
- boolean ping = node.id().equals(err.nodeId()) ? pingNode(node) : pingNode(err.nodeId());
-
- if (!ping) {
- if (log.isDebugEnabled())
- log.debug("Conflicting node has already left, need to wait for event. " +
- "Will ignore join request for now since it will be recent [req=" + msg +
- ", err=" + err.message() + ']');
-
- // Ignore join request.
- return;
- }
-
- LT.warn(log, null, err.message());
-
- // Always output in debug.
if (log.isDebugEnabled())
- log.debug(err.message());
+ log.debug("Node validation failed [res=" + err + ", node=" + node + ']');
- try {
- trySendMessageDirectly(node,
- new TcpDiscoveryCheckFailedMessage(locNodeId, err.sendMessage()));
- }
- catch (IgniteSpiException e) {
- if (log.isDebugEnabled())
- log.debug("Failed to send hash ID resolver validation failed message to node " +
- "[node=" + node + ", err=" + e.getMessage() + ']');
+ utilityPool.submit(
+ new Runnable() {
+ @Override public void run() {
+ boolean ping = node.id().equals(err.nodeId()) ? pingNode(node) : pingNode(err.nodeId());
- onException("Failed to send hash ID resolver validation failed message to node " +
- "[node=" + node + ", err=" + e.getMessage() + ']', e);
- }
+ if (!ping) {
+ if (log.isDebugEnabled())
+ log.debug("Conflicting node has already left, need to wait for event. " +
+ "Will ignore join request for now since it will be recent [req=" + msg +
+ ", err=" + err.message() + ']');
+
+ // Ignore join request.
+ return;
+ }
+
+ LT.warn(log, null, err.message());
+
+ // Always output in debug.
+ if (log.isDebugEnabled())
+ log.debug(err.message());
+
+ try {
+ trySendMessageDirectly(node,
+ new TcpDiscoveryCheckFailedMessage(locNodeId, err.sendMessage()));
+ }
+ catch (IgniteSpiException e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to send hash ID resolver validation failed message to node " +
+ "[node=" + node + ", err=" + e.getMessage() + ']');
+
+ onException("Failed to send hash ID resolver validation failed message to node " +
+ "[node=" + node + ", err=" + e.getMessage() + ']', e);
+ }
+ }
+ }
+ );
// Ignore join request.
return;
}
- String locMarsh = locNode.attribute(ATTR_MARSHALLER);
- String rmtMarsh = node.attribute(ATTR_MARSHALLER);
+ final String locMarsh = locNode.attribute(ATTR_MARSHALLER);
+ final String rmtMarsh = node.attribute(ATTR_MARSHALLER);
if (!F.eq(locMarsh, rmtMarsh)) {
- String errMsg = "Local node's marshaller differs from remote node's marshaller " +
- "(to make sure all nodes in topology have identical marshaller, " +
- "configure marshaller explicitly in configuration) " +
- "[locMarshaller=" + locMarsh + ", rmtMarshaller=" + rmtMarsh +
- ", locNodeAddrs=" + U.addressesAsString(locNode) +
- ", rmtNodeAddrs=" + U.addressesAsString(node) +
- ", locNodeId=" + locNode.id() + ", rmtNodeId=" + msg.creatorNodeId() + ']';
+ utilityPool.submit(
+ new Runnable() {
+ @Override public void run() {
+ String errMsg = "Local node's marshaller differs from remote node's marshaller " +
+ "(to make sure all nodes in topology have identical marshaller, " +
+ "configure marshaller explicitly in configuration) " +
+ "[locMarshaller=" + locMarsh + ", rmtMarshaller=" + rmtMarsh +
+ ", locNodeAddrs=" + U.addressesAsString(locNode) +
+ ", rmtNodeAddrs=" + U.addressesAsString(node) +
+ ", locNodeId=" + locNode.id() + ", rmtNodeId=" + msg.creatorNodeId() + ']';
- LT.warn(log, null, errMsg);
+ LT.warn(log, null, errMsg);
- // Always output in debug.
- if (log.isDebugEnabled())
- log.debug(errMsg);
+ // Always output in debug.
+ if (log.isDebugEnabled())
+ log.debug(errMsg);
- try {
- String sndMsg = "Local node's marshaller differs from remote node's marshaller " +
- "(to make sure all nodes in topology have identical marshaller, " +
- "configure marshaller explicitly in configuration) " +
- "[locMarshaller=" + rmtMarsh + ", rmtMarshaller=" + locMarsh +
- ", locNodeAddrs=" + U.addressesAsString(node) + ", locPort=" + node.discoveryPort() +
- ", rmtNodeAddr=" + U.addressesAsString(locNode) + ", locNodeId=" + node.id() +
- ", rmtNodeId=" + locNode.id() + ']';
+ try {
+ String sndMsg = "Local node's marshaller differs from remote node's marshaller " +
+ "(to make sure all nodes in topology have identical marshaller, " +
+ "configure marshaller explicitly in configuration) " +
+ "[locMarshaller=" + rmtMarsh + ", rmtMarshaller=" + locMarsh +
+ ", locNodeAddrs=" + U.addressesAsString(node) + ", locPort=" + node.discoveryPort() +
+ ", rmtNodeAddr=" + U.addressesAsString(locNode) + ", locNodeId=" + node.id() +
+ ", rmtNodeId=" + locNode.id() + ']';
- trySendMessageDirectly(node,
- new TcpDiscoveryCheckFailedMessage(locNodeId, sndMsg));
- }
- catch (IgniteSpiException e) {
- if (log.isDebugEnabled())
- log.debug("Failed to send marshaller check failed message to node " +
- "[node=" + node + ", err=" + e.getMessage() + ']');
+ trySendMessageDirectly(node,
+ new TcpDiscoveryCheckFailedMessage(locNodeId, sndMsg));
+ }
+ catch (IgniteSpiException e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to send marshaller check failed message to node " +
+ "[node=" + node + ", err=" + e.getMessage() + ']');
- onException("Failed to send marshaller check failed message to node " +
- "[node=" + node + ", err=" + e.getMessage() + ']', e);
- }
+ onException("Failed to send marshaller check failed message to node " +
+ "[node=" + node + ", err=" + e.getMessage() + ']', e);
+ }
+ }
+ }
+ );
// Ignore join request.
return;
}
// If node have no value for this attribute then we treat it as true.
- Boolean locMarshUseDfltSuid = locNode.attribute(ATTR_MARSHALLER_USE_DFLT_SUID);
+ final Boolean locMarshUseDfltSuid = locNode.attribute(ATTR_MARSHALLER_USE_DFLT_SUID);
boolean locMarshUseDfltSuidBool = locMarshUseDfltSuid == null ? true : locMarshUseDfltSuid;
- Boolean rmtMarshUseDfltSuid = node.attribute(ATTR_MARSHALLER_USE_DFLT_SUID);
+ final Boolean rmtMarshUseDfltSuid = node.attribute(ATTR_MARSHALLER_USE_DFLT_SUID);
boolean rmtMarshUseDfltSuidBool = rmtMarshUseDfltSuid == null ? true : rmtMarshUseDfltSuid;
if (locMarshUseDfltSuidBool != rmtMarshUseDfltSuidBool) {
- String errMsg = "Local node's " + IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID +
- " property value differs from remote node's value " +
- "(to make sure all nodes in topology have identical marshaller settings, " +
- "configure system property explicitly) " +
- "[locMarshUseDfltSuid=" + locMarshUseDfltSuid + ", rmtMarshUseDfltSuid=" + rmtMarshUseDfltSuid +
- ", locNodeAddrs=" + U.addressesAsString(locNode) +
- ", rmtNodeAddrs=" + U.addressesAsString(node) +
- ", locNodeId=" + locNode.id() + ", rmtNodeId=" + msg.creatorNodeId() + ']';
+ utilityPool.submit(new Runnable() {
+ @Override public void run() {
+ String errMsg = "Local node's " + IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID +
+ " property value differs from remote node's value " +
+ "(to make sure all nodes in topology have identical marshaller settings, " +
+ "configure system property explicitly) " +
+ "[locMarshUseDfltSuid=" + locMarshUseDfltSuid + ", rmtMarshUseDfltSuid=" + rmtMarshUseDfltSuid +
+ ", locNodeAddrs=" + U.addressesAsString(locNode) +
+ ", rmtNodeAddrs=" + U.addressesAsString(node) +
+ ", locNodeId=" + locNode.id() + ", rmtNodeId=" + msg.creatorNodeId() + ']';
- LT.warn(log, null, errMsg);
+ LT.warn(log, null, errMsg);
- // Always output in debug.
- if (log.isDebugEnabled())
- log.debug(errMsg);
+ // Always output in debug.
+ if (log.isDebugEnabled())
+ log.debug(errMsg);
- try {
- String sndMsg = "Local node's " + IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID +
- " property value differs from remote node's value " +
- "(to make sure all nodes in topology have identical marshaller settings, " +
- "configure system property explicitly) " +
- "[locMarshUseDfltSuid=" + rmtMarshUseDfltSuid +
- ", rmtMarshUseDfltSuid=" + locMarshUseDfltSuid +
- ", locNodeAddrs=" + U.addressesAsString(node) + ", locPort=" + node.discoveryPort() +
- ", rmtNodeAddr=" + U.addressesAsString(locNode) + ", locNodeId=" + node.id() +
- ", rmtNodeId=" + locNode.id() + ']';
+ try {
+ String sndMsg = "Local node's " + IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID +
+ " property value differs from remote node's value " +
+ "(to make sure all nodes in topology have identical marshaller settings, " +
+ "configure system property explicitly) " +
+ "[locMarshUseDfltSuid=" + rmtMarshUseDfltSuid +
+ ", rmtMarshUseDfltSuid=" + locMarshUseDfltSuid +
+ ", locNodeAddrs=" + U.addressesAsString(node) + ", locPort=" + node.discoveryPort() +
+ ", rmtNodeAddr=" + U.addressesAsString(locNode) + ", locNodeId=" + node.id() +
+ ", rmtNodeId=" + locNode.id() + ']';
- trySendMessageDirectly(node,
- new TcpDiscoveryCheckFailedMessage(locNodeId, sndMsg));
- }
- catch (IgniteSpiException e) {
- if (log.isDebugEnabled())
- log.debug("Failed to send marshaller check failed message to node " +
- "[node=" + node + ", err=" + e.getMessage() + ']');
+ trySendMessageDirectly(node,
+ new TcpDiscoveryCheckFailedMessage(locNodeId, sndMsg));
+ }
+ catch (IgniteSpiException e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to send marshaller check failed message to node " +
+ "[node=" + node + ", err=" + e.getMessage() + ']');
- onException("Failed to send marshaller check failed message to node " +
- "[node=" + node + ", err=" + e.getMessage() + ']', e);
- }
+ onException("Failed to send marshaller check failed message to node " +
+ "[node=" + node + ", err=" + e.getMessage() + ']', e);
+ }
+ }
+ });
// Ignore join request.
return;
@@ -3229,45 +3254,51 @@
// Validate compact footer flags.
Boolean locMarshCompactFooter = locNode.attribute(ATTR_MARSHALLER_COMPACT_FOOTER);
- boolean locMarshCompactFooterBool = locMarshCompactFooter != null ? locMarshCompactFooter : false;
+ final boolean locMarshCompactFooterBool = locMarshCompactFooter != null ? locMarshCompactFooter : false;
Boolean rmtMarshCompactFooter = node.attribute(ATTR_MARSHALLER_COMPACT_FOOTER);
- boolean rmtMarshCompactFooterBool = rmtMarshCompactFooter != null ? rmtMarshCompactFooter : false;
+ final boolean rmtMarshCompactFooterBool = rmtMarshCompactFooter != null ? rmtMarshCompactFooter : false;
if (locMarshCompactFooterBool != rmtMarshCompactFooterBool) {
- String errMsg = "Local node's binary marshaller \"compactFooter\" property differs from " +
- "the same property on remote node (make sure all nodes in topology have the same value " +
- "of \"compactFooter\" property) [locMarshallerCompactFooter=" + locMarshCompactFooterBool +
- ", rmtMarshallerCompactFooter=" + rmtMarshCompactFooterBool +
- ", locNodeAddrs=" + U.addressesAsString(locNode) +
- ", rmtNodeAddrs=" + U.addressesAsString(node) +
- ", locNodeId=" + locNode.id() + ", rmtNodeId=" + msg.creatorNodeId() + ']';
+ utilityPool.submit(
+ new Runnable() {
+ @Override public void run() {
+ String errMsg = "Local node's binary marshaller \"compactFooter\" property differs from " +
+ "the same property on remote node (make sure all nodes in topology have the same value " +
+ "of \"compactFooter\" property) [locMarshallerCompactFooter=" + locMarshCompactFooterBool +
+ ", rmtMarshallerCompactFooter=" + rmtMarshCompactFooterBool +
+ ", locNodeAddrs=" + U.addressesAsString(locNode) +
+ ", rmtNodeAddrs=" + U.addressesAsString(node) +
+ ", locNodeId=" + locNode.id() + ", rmtNodeId=" + msg.creatorNodeId() + ']';
- LT.warn(log, null, errMsg);
+ LT.warn(log, null, errMsg);
- // Always output in debug.
- if (log.isDebugEnabled())
- log.debug(errMsg);
+ // Always output in debug.
+ if (log.isDebugEnabled())
+ log.debug(errMsg);
- try {
- String sndMsg = "Local node's binary marshaller \"compactFooter\" property differs from " +
- "the same property on remote node (make sure all nodes in topology have the same value " +
- "of \"compactFooter\" property) [locMarshallerCompactFooter=" + rmtMarshCompactFooterBool +
- ", rmtMarshallerCompactFooter=" + locMarshCompactFooterBool +
- ", locNodeAddrs=" + U.addressesAsString(node) + ", locPort=" + node.discoveryPort() +
- ", rmtNodeAddr=" + U.addressesAsString(locNode) + ", locNodeId=" + node.id() +
- ", rmtNodeId=" + locNode.id() + ']';
+ try {
+ String sndMsg = "Local node's binary marshaller \"compactFooter\" property differs from " +
+ "the same property on remote node (make sure all nodes in topology have the same value " +
+ "of \"compactFooter\" property) [locMarshallerCompactFooter=" + rmtMarshCompactFooterBool +
+ ", rmtMarshallerCompactFooter=" + locMarshCompactFooterBool +
+ ", locNodeAddrs=" + U.addressesAsString(node) + ", locPort=" + node.discoveryPort() +
+ ", rmtNodeAddr=" + U.addressesAsString(locNode) + ", locNodeId=" + node.id() +
+ ", rmtNodeId=" + locNode.id() + ']';
- trySendMessageDirectly(node, new TcpDiscoveryCheckFailedMessage(locNodeId, sndMsg));
- }
- catch (IgniteSpiException e) {
- if (log.isDebugEnabled())
- log.debug("Failed to send marshaller check failed message to node " +
- "[node=" + node + ", err=" + e.getMessage() + ']');
+ trySendMessageDirectly(node, new TcpDiscoveryCheckFailedMessage(locNodeId, sndMsg));
+ }
+ catch (IgniteSpiException e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to send marshaller check failed message to node " +
+ "[node=" + node + ", err=" + e.getMessage() + ']');
- onException("Failed to send marshaller check failed message to node " +
- "[node=" + node + ", err=" + e.getMessage() + ']', e);
- }
+ onException("Failed to send marshaller check failed message to node " +
+ "[node=" + node + ", err=" + e.getMessage() + ']', e);
+ }
+ }
+ }
+ );
// Ignore join request.
return;
@@ -5823,6 +5854,16 @@
* @param msg Message to add.
*/
void addMessage(TcpDiscoveryAbstractMessage msg) {
+ if ((msg instanceof TcpDiscoveryStatusCheckMessage ||
+ msg instanceof TcpDiscoveryJoinRequestMessage ||
+ msg instanceof TcpDiscoveryCustomEventMessage) &&
+ queue.contains(msg)) {
+ if (log.isDebugEnabled())
+ log.debug("Ignoring duplicate message: " + msg);
+
+ return;
+ }
+
if (msg.highPriority())
queue.addFirst(msg);
else
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
index 362fa2f..29f664e 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
@@ -606,4 +606,4 @@
@Override public String toString() {
return S.toString(TcpDiscoveryNode.class, this, "isClient", isClient());
}
-}
\ No newline at end of file
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
index 9cb47af..24f2a5a 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
@@ -274,7 +274,7 @@
}
/** {@inheritDoc} */
- @Override public final boolean equals(Object obj) {
+ @Override public boolean equals(Object obj) {
if (this == obj)
return true;
else if (obj instanceof TcpDiscoveryAbstractMessage)
@@ -292,4 +292,4 @@
@Override public String toString() {
return S.toString(TcpDiscoveryAbstractMessage.class, this, "isClient", getFlag(CLIENT_FLAG_POS));
}
-}
\ No newline at end of file
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
index 7776e72..ca5dd56 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
@@ -18,8 +18,8 @@
package org.apache.ignite.spi.discovery.tcp.messages;
import java.util.UUID;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.jetbrains.annotations.NotNull;
@@ -92,7 +92,16 @@
}
/** {@inheritDoc} */
+ @Override public boolean equals(Object obj) {
+ return super.equals(obj) &&
+ obj instanceof TcpDiscoveryCustomEventMessage &&
+ F.eq(
+ ((TcpDiscoveryCustomEventMessage)obj).verifierNodeId(),
+ verifierNodeId());
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(TcpDiscoveryCustomEventMessage.class, this, "super", super.toString());
}
-}
\ No newline at end of file
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java
index 2586a8b..22ffae8 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java
@@ -18,6 +18,7 @@
package org.apache.ignite.spi.discovery.tcp.messages;
import java.util.Map;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
@@ -79,7 +80,20 @@
}
/** {@inheritDoc} */
+ @Override public boolean equals(Object obj) {
+ // NOTE!
+ // Do not call super. As IDs will differ, but we can ignore this.
+
+ if (!(obj instanceof TcpDiscoveryJoinRequestMessage))
+ return false;
+
+ TcpDiscoveryJoinRequestMessage other = (TcpDiscoveryJoinRequestMessage)obj;
+
+ return F.eqNodes(other.node, node);
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(TcpDiscoveryJoinRequestMessage.class, this, "super", super.toString());
}
-}
\ No newline at end of file
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryStatusCheckMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryStatusCheckMessage.java
index 70b0080..fdbeb75 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryStatusCheckMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryStatusCheckMessage.java
@@ -18,6 +18,7 @@
package org.apache.ignite.spi.discovery.tcp.messages;
import java.util.UUID;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
@@ -109,7 +110,22 @@
}
/** {@inheritDoc} */
+ @Override public boolean equals(Object obj) {
+ // NOTE!
+ // Do not call super. As IDs will differ, but we can ignore this.
+
+ if (!(obj instanceof TcpDiscoveryStatusCheckMessage))
+ return false;
+
+ TcpDiscoveryStatusCheckMessage other = (TcpDiscoveryStatusCheckMessage)obj;
+
+ return F.eqNodes(other.creatorNode, creatorNode) &&
+ F.eq(other.failedNodeId, failedNodeId) &&
+ status == other.status;
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(TcpDiscoveryStatusCheckMessage.class, this, "super", super.toString());
}
-}
\ No newline at end of file
+}