IGNITE-17507 Fixed an issue that could lead to unexpected partition map exchange on client nodes. Fixes #10187
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java
index 3d9bfcc..29aa940 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java
@@ -50,9 +50,8 @@
}
/** {@inheritDoc} */
- @Deprecated
@Override public boolean stopProcess() {
- return false;
+ return delegate.stopProcess();
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java
index 2309fc0..c3cd0ab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java
@@ -20,6 +20,7 @@
import java.io.Serializable;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage;
import org.jetbrains.annotations.Nullable;
@@ -92,6 +93,15 @@
public boolean isMutable();
/**
+ * See {@link DiscoverySpiCustomMessage#stopProcess()}.
+ *
+ * @return {@code True} if message should not be sent to others nodes after it was processed on coordinator.
+ */
+ public default boolean stopProcess() {
+ return false;
+ }
+
+ /**
* Creates new discovery cache if message caused topology version change.
*
* @param mgr Discovery manager.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinityChangeMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinityChangeMessage.java
index 708e3c1..1330e88 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinityChangeMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinityChangeMessage.java
@@ -28,6 +28,7 @@
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.spi.discovery.DiscoverySpiMutableCustomMessageSupport;
import org.jetbrains.annotations.Nullable;
/**
@@ -55,8 +56,18 @@
/** */
private GridDhtPartitionsFullMessage partsMsg;
- /** */
- private transient boolean exchangeNeeded;
+ /** If this flag is {@code true} then this message should lead to partition map exchnage. */
+ private boolean exchangeNeeded;
+
+ /**
+ * This flag indicates that this message should not be passed to other nodes except the coordinator.
+ * Instead of this message, the message which is returned by {@link #ackMessage()} will be sent.
+ * See {@link DiscoveryCustomMessage#stopProcess()}.
+ *
+ * This flag is used when discovery SPI does not support mutable custom messages.
+ * See {@link DiscoverySpiMutableCustomMessageSupport}.
+ */
+ private transient boolean stopProc;
/**
* Constructor used when message is created after cache rebalance finished.
@@ -76,7 +87,8 @@
* @param partsMsg Partitions messages.
* @param assignmentChange Assignment change.
*/
- public CacheAffinityChangeMessage(GridDhtPartitionExchangeId exchId,
+ public CacheAffinityChangeMessage(
+ GridDhtPartitionExchangeId exchId,
GridDhtPartitionsFullMessage partsMsg,
Map<Integer, Map<Integer, List<UUID>>> assignmentChange) {
this.exchId = exchId;
@@ -140,17 +152,44 @@
/** {@inheritDoc} */
@Nullable @Override public DiscoveryCustomMessage ackMessage() {
- return null;
+ if (!stopProc)
+ return null;
+
+ // If stopProc is equal to true, then Discovery SPI does not support mutable custom messages.
+ // Let's return the same message, that was muted on the coordinator node. This message will be sent to all nodes
+ // instead of the original one.
+ return this;
}
/** {@inheritDoc} */
@Override public boolean isMutable() {
- return false;
+ return true;
}
/** {@inheritDoc} */
- @Nullable @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr,
- AffinityTopologyVersion topVer, DiscoCache discoCache) {
+ @Override public boolean stopProcess() {
+ return stopProc;
+ }
+
+ /**
+ * Sets stop processing flag. If this flag is {@code true} then this message is not passed to other nodes after
+ * the coordinator node notitied its own listner. If method {@link #ackMessage()} returns non-null ack message,
+ * it is sent to all nodes.
+ * This flag is used when discovery SPI does not support mutable custom messages.
+ * See {@link DiscoverySpiMutableCustomMessageSupport}.
+ *
+ * @param stopProc If {@code true} then this message is not passed to other nodes.
+ */
+ public void stopProcess(boolean stopProc) {
+ this.stopProc = stopProc;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public DiscoCache createDiscoCache(
+ GridDiscoveryManager mgr,
+ AffinityTopologyVersion topVer,
+ DiscoCache discoCache
+ ) {
return discoCache.copy(topVer, null);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index f6f4a95..fda6598 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -77,6 +77,7 @@
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.cache.CacheRebalanceMode.NONE;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
@@ -219,11 +220,21 @@
return false;
}
+ boolean isClient = cctx.discovery().localNode().isClient();
+
+ if (cctx.kernalContext().config().getDiscoverySpi() instanceof TcpDiscoverySpi)
+ isClient &= !((TcpDiscoverySpi)cctx.kernalContext().config().getDiscoverySpi()).isForceServerMode();
+
// Skip message if affinity was already recalculated.
- boolean exchangeNeeded = lastAffVer == null || lastAffVer.equals(msg.topologyVersion());
+ // Client node should just accept the flag from the mutated message.
+ boolean exchangeNeeded = (isClient) ? msg.exchangeNeeded() :
+ lastAffVer == null || lastAffVer.equals(msg.topologyVersion());
msg.exchangeNeeded(exchangeNeeded);
+ if (!cctx.discovery().mutableCustomMessages() && !isClient)
+ msg.stopProcess(true);
+
if (exchangeNeeded) {
if (log.isDebugEnabled()) {
log.debug("Need process affinity change message [lastAffVer=" + lastAffVer +
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
index afd7093..6e11673 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
@@ -45,10 +45,9 @@
* then message is not passed to others nodes, if after this method {@link #ackMessage()} returns non-null ack
* message, it is sent to all nodes.
*
- * @return {@code True} if message should not be sent to all nodes.
+ * Note: this method is used then and only then the zookeeper discovery is configured.
*
- * @deprecated Is not used anymore and will be removed at 3.0.
+ * @return {@code True} if message should not be sent to all nodes.
*/
- @Deprecated
public boolean stopProcess();
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
index 0309447..d50c2c5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
@@ -105,7 +105,10 @@
import static org.apache.ignite.cache.CacheRebalanceMode.ASYNC;
import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.internal.TestRecordingCommunicationSpi.blockSingleExhangeMessage;
import static org.apache.ignite.internal.processors.cache.ExchangeContext.IGNITE_EXCHANGE_COMPATIBILITY_VER_1;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
/**
*
@@ -1352,7 +1355,7 @@
Ignite ignite = grids.get(i);
if (!blocked.contains(ignite.name())) {
- GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
return fut.isDone();
}
@@ -1527,7 +1530,7 @@
});
}
- IgniteInternalFuture<?> stopFut = GridTestUtils.runAsync(new Callable<Void>() {
+ IgniteInternalFuture<?> stopFut = runAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
for (int j = 1; j < NODES; j++) {
TestRecordingCommunicationSpi spi =
@@ -1561,6 +1564,106 @@
}
/**
+ * Wait for rebalance, send affinity change message, but affinity already changed
+ * (new nodes joined: server + client). Checks that tere is no race that could lead to
+ * unexpected partition map exchange on the client node.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testDelayAssignmentAffinityChangedUnexpectedPME() throws Exception {
+ Ignite ignite0 = startServer(0, 1);
+
+ for (int i = 0; i < 1024; i++)
+ ignite0.cache(CACHE_NAME1).put(i, i);
+
+ DiscoverySpiTestListener lsnr = new DiscoverySpiTestListener();
+
+ ((IgniteDiscoverySpi)ignite0.configuration().getDiscoverySpi()).setInternalListener(lsnr);
+
+ TestRecordingCommunicationSpi commSpi0 =
+ (TestRecordingCommunicationSpi)ignite0.configuration().getCommunicationSpi();
+
+ // Starting a new client node should not lead to a rebalance obviously.
+ // So, it is expected that data distribution is ideal (ideal assignment).
+ startClient(1, 2);
+
+ checkAffinity(2, topVer(2, 0), true);
+
+ // Block late affinity assignment. (*)
+ lsnr.blockCustomEvent(CacheAffinityChangeMessage.class);
+
+ // Starting a new server node triggers data rebalancing.
+ // [3, 0] - is not ideal (expected)
+ startServer(2, 3);
+
+ checkAffinity(3, topVer(3, 0), false);
+
+ // Wait for sending late affinity assignment message (1) from the coordinator node.
+ // This message will be blocked (*)
+ lsnr.waitCustomEvent();
+
+ // Block rebalance messages.
+ blockSupplySend(commSpi0, CACHE_NAME1);
+
+ // Starting a new server node means that the late affinity assignment message (1) should be skipped.
+ startServer(3, 4);
+
+ TestRecordingCommunicationSpi clientSpi = new TestRecordingCommunicationSpi();
+ clientSpi.blockMessages(blockSingleExhangeMessage());
+ spiC = igniteInstanceName -> clientSpi;
+
+ IgniteInternalFuture<?> startClientFut = runAsync(() -> {
+ startClient(4, 5);
+ });
+
+ clientSpi.waitForBlocked();
+
+ // Unblock the late affinity assignment message (1).
+ lsnr.stopBlockCustomEvents();
+
+ clientSpi.stopBlock();
+
+ startClientFut.get(15_000);
+
+ // [5, 0] - is not ideal (expected)
+ checkAffinity(5, topVer(5, 0), false);
+
+ // Rebalance is blocked at this moment, so [5, 1] is not ready.
+ checkNoExchange(5, topVer(5, 1));
+
+ // Unblock rebalancing.
+ // The late affinity assignments message (2) should be fired after all.
+ commSpi0.stopBlock();
+
+ // [5, 1] should be ideal
+ checkAffinity(5, topVer(5, 1), true);
+
+ // The following output demonstrates the issue.
+ // The coordinator node and client initiate PME on the same toplogy version,
+ // but it relies to different custom messages.
+ // client:
+ // Started exchange init [
+ // topVer=AffinityTopologyVersion [topVer=5, minorTopVer=1],
+ // crd=false,
+ // evt=DISCOVERY_CUSTOM_EVT, evtNode=00ac9434-fd34-4aae-95d3-ceb477700000,
+ // customEvt=CacheAffinityChangeMessage [
+ // id=3ccc8984181-ea41279c-71cb-4b8c-8b48-1dee1baa6fe0, <<< (1)
+ // topVer=AffinityTopologyVersion [topVer=3, minorTopVer=0], ...] <<< !!!
+ // coordinator:
+ // Started exchange init
+ // [topVer=AffinityTopologyVersion [topVer=5, minorTopVer=1],
+ // crd=true,
+ // evt=DISCOVERY_CUSTOM_EVT, evtNode=00ac9434-fd34-4aae-95d3-ceb477700000,
+ // customEvt=CacheAffinityChangeMessage [
+ // id=d2ec8984181-ea41279c-71cb-4b8c-8b48-1dee1baa6fe0, <<< (2)
+ // topVer=AffinityTopologyVersion [topVer=4, minorTopVer=0], ...] <<< !!!
+ awaitPartitionMapExchange(true, true, null, false);
+
+ assertPartitionsSame(idleVerify(grid(0), CACHE_NAME1));
+ }
+
+ /**
* Wait for rebalance, send affinity change message, but affinity already changed (new node joined).
*
* @throws Exception If failed.
@@ -1657,7 +1760,7 @@
lsnr.stopBlockCustomEvents();
- boolean started = GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ boolean started = waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
return startedFuture.isDone();
}
@@ -2174,7 +2277,7 @@
//Ensure exchanges merge.
spiC = igniteInstanceName -> testSpis[getTestIgniteInstanceIndex(igniteInstanceName)];
- GridTestUtils.runAsync(() -> {
+ runAsync(() -> {
try {
for (int j = 1; j < NODES; j++)
testSpis[j].waitForBlocked();
@@ -2321,7 +2424,7 @@
}
}, NODES, "update-thread");
- IgniteInternalFuture<?> srvRestartFut = GridTestUtils.runAsync(new Callable<Void>() {
+ IgniteInternalFuture<?> srvRestartFut = runAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
while (!fail.get() && System.currentTimeMillis() < stopTime) {
Ignite node = startGrid(NODES);
@@ -2462,7 +2565,7 @@
final ClusterNode srvcNode = affinity.get(part).get(0);
- boolean wait = GridTestUtils.waitForCondition(new PA() {
+ boolean wait = waitForCondition(new PA() {
@Override public boolean apply() {
TestService srvc = grid(srvcNode).services().service(srvcName);
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java
index 41a4761..9b7476c 100644
--- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java
@@ -58,7 +58,6 @@
}
/** {@inheritDoc} */
- @Deprecated
@Override public boolean stopProcess() {
return false;
}
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveStartMessage.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveStartMessage.java
index 4c8d996..0c79c36 100644
--- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveStartMessage.java
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveStartMessage.java
@@ -50,7 +50,6 @@
}
/** {@inheritDoc} */
- @Deprecated
@Override public boolean stopProcess() {
return false;
}
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java
index bd6e1e8..de7291c 100644
--- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java
@@ -54,7 +54,6 @@
}
/** {@inheritDoc} */
- @Deprecated
@Override public boolean stopProcess() {
return false;
}
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNoServersMessage.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNoServersMessage.java
index 957310d..626fe74 100644
--- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNoServersMessage.java
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNoServersMessage.java
@@ -39,7 +39,6 @@
}
/** {@inheritDoc} */
- @Deprecated
@Override public boolean stopProcess() {
return false;
}
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index 97f7b86..5225c9a 100644
--- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -2588,11 +2588,39 @@
if (log.isDebugEnabled())
log.debug("Generated CUSTOM event [evt=" + evtData + ", msg=" + msg + ']');
+ boolean fastStopProcess = false;
+
if (msg instanceof ZkInternalMessage)
processInternalMessage(evtData, (ZkInternalMessage)msg);
- else
+ else {
notifyCustomEvent(evtData, msg);
+ if (msg.stopProcess()) {
+ if (log.isDebugEnabled())
+ log.debug("Fast stop process custom event [evt=" + evtData + ", msg=" + msg + ']');
+
+ fastStopProcess = true;
+
+ // No need to process this event on others nodes, skip this event.
+ evtsData.evts.remove(evtData.eventId());
+
+ evtsData.evtIdGen--;
+
+ DiscoverySpiCustomMessage ack = msg.ackMessage();
+
+ if (ack != null) {
+ evtData = createAckEvent(ack, evtData);
+
+ if (log.isDebugEnabled())
+ log.debug("Generated CUSTOM event (ack for fast stop process) [evt=" + evtData + ", msg=" + msg + ']');
+
+ notifyCustomEvent(evtData, ack);
+ }
+ else
+ evtData = null;
+ }
+ }
+
if (evtData != null) {
evtsData.addEvent(rtState.top.nodesByOrder.values(), evtData);
@@ -2600,6 +2628,9 @@
saveAndProcessNewEvents();
+ if (fastStopProcess)
+ deleteCustomEventDataAsync(zkClient, evtPath);
+
if (failedNode != null) {
deleteAliveNode(failedNode.internalId());